文章目录[x]
- 1:Kotlin Flow 背压处理方式
- 1.1:Flow 背压策略
- 2:Kotlin Channel 背压处理
- 2.1:Channel 背压策略(capacity参数决定行为)
- 3:RxJava Flowable (Reactive Streams Backpressure)
- 4:Dart Stream 的默认背压机制
- 4.1:自动反压
- 5:StreamController.broadcast()(热流)背压风险
- 5.1:手动 Buffer + 批量消费
- 5.2:Drop Oldest / Keep Latest
- 5.3:Throttle (节流)
什么是流背压 (Backpressure)
数据生产方(上游)生产数据太快,下游来不及处理。下游(消费者)无法跟上数据处理,导致数据堆积在中间层、如果数据量过大可能会导致系统崩溃。
通常出现在:
- 异步数据流(如 Stream、Flow)
- 网络数据流(如Socket、视频流)
- 数据库流式查询
流背压的通用处理策略
| 分类 | 具体策略 | 适用场景/特点 | 限制 |
|---|---|---|---|
| 缓冲(Buffering) | 设定缓冲区,先存储数据等待处理 | 数据量短期突发可控,允许一定延迟 | 流量持续过高会导致内存溢出 |
| 丢弃(Dropping) | 丢弃超出处理能力的数据(old/new) | 高频不重要数据流,如点击事件 | 只适用于:非关键性数据(如 UI 滚动、点击频率控制) |
| 节流(Throttling) | 限制单位时间内最多处理多少数据 | 限流、限资源消耗(硬性节奏控制) | 适用于:资源受限系统,如 I/O、网络请求。 |
| 防抖(Debouncing) | 一段时间内只处理最后一次事件 | 用户输入、搜索等场景 | 一般用于前端输入框实时搜索、按钮快速点击防抖 |
| 拉模型(Pull-based) | 消费者主动请求数据(反压机制) | 上下游解耦,Backpressure-aware | 需要全链路流量感知的系统。 |
| 暂停/恢复(Pause/Resume) | 上游等待下游准备好再继续发送 | 流量控制,强一致性场景 | 一般适用于资源敏感型应用(如视频播放、文件流处理)。 |
| 批处理(Batching) | 将多条数据合并后批量处理 | 高吞吐、弱实时性场景 | 一般适用于不需要强实时的后台任务、日志上传。 |
| 降级处理(Degrade) | 动态调整数据质量或内容简化 | 高并发下的体验保障 | 实时性要求高但数据可降级的系统。| |
| 流优先级(Priority Scheduling) | 为不同类型流设置优先级 | 保证关键流优先(如实时告警) | 一般适用于关键告警流、核心业务流优先保障。 |
流背压 3 大核心思想
- 削峰:减少高峰期数据量(Throttling, Dropping, Debounce)。
-
移峰:将流量延后处理(Buffering, Batching)。
-
控流:根据下游能力调整上游生产(Pull-based, Pause/Resume)。
Kotlin 中处理背压的方式
Kotlin Flow 背压处理方式
Kotlin Flow 本身是 冷流(Cold Stream),默认是按需发射(suspend),但当上游 emit 很快、下游处理慢时,仍然会遇到背压问题。
Flow 背压策略
| 方法 | 说明 |
|---|---|
| buffer() | 添加缓冲区,允许上游和下游异步运行。 |
| conflate() | 丢弃中间值,只保留最新值。 |
| collectLatest() | 如果正在处理旧值,取消旧值处理,只处理最新值。 |
| debounce() | 一段时间内只接收最后一个事件(防抖)。 |
| sample() | 定时采样流数据,忽略非采样点数据。 |
Flow 背压处理示例
flow {
repeat(1000) {
emit(it)
println("Emit it")
}
}
.buffer() // 允许emit与collect异步(可能堆积)
.conflate() // 丢弃中间值,只保留最新
.collect {
delay(100) // 消费很慢
println("Collectit")
}
collectLatest(取消旧事件的处理)
flow {
repeat(100) {
emit(it)
println("Emit it")
}
}
.collectLatest {
println("Processingit")
delay(100) // 处理耗时
println("Done $it")
}
Kotlin Channel 背压处理
Kotlin Channel 是“热流”,天然具备背压风险。
Channel 背压策略(capacity参数决定行为)
| 策略 | 示例 |
|---|---|
| RENDEZVOUS(默认,阻塞发送) | Channel |
| BUFFERED(缓冲N个元素) | Channel(capacity = Channel.BUFFERED) |
| CONFLATED(只保留最新值) | Channel(capacity = Channel.CONFLATED) |
| UNLIMITED(无限缓存) | Channel(capacity = Channel.UNLIMITED) |
| CUSTOM N 缓冲 | Channel(capacity = N) |
Channel 背压示例
val channel = Channel<Int>(capacity = Channel.CONFLATED)
launch {
repeat(1000) {
channel.send(it)
println("Sent it")
}
}
launch {
for (item in channel) {
delay(100) // 消费速度慢
println("Receiveditem")
}
}
RxJava Flowable (Reactive Streams Backpressure)
如果你用 RxJava in Kotlin,可以用 Flowable 处理背压。
| 策略 | 方法 |
|---|---|
| 缓冲 | onBackpressureBuffer() |
| 丢弃最新 | onBackpressureDrop() |
| 丢弃旧值(保留最新) | onBackpressureLatest() |
RxJava Flowable 示例
Flowable.interval(10, TimeUnit.MILLISECONDS)
.onBackpressureLatest() // 只保留最新值
.observeOn(Schedulers.io())
.subscribe {
Thread.sleep(100) // 消费慢
println("Received $it")
}
Dart 中流背压(Backpressure)处理方式
Dart Stream 的默认背压机制
- Dart 的 Stream 本身有基础的“订阅驱动”机制(类似 Pull-based)。
- 只要下游的 await for、listen 还没处理完,上游 async 的 yield 就会暂停*。
- 这叫 “自然背压”(基于 async/await 挂起)。
自动反压
Dart Stream 是“冷流”,只会在下游准备好时继续 emit,不会主动堆积数据。
Stream<int> generateNumbers() async* {
for (int i = 0; i < 100; i++) {
print('Emit i');
yield i; // 会根据下游处理速度自动暂停
}
}
void main() async {
await for (var value in generateNumbers()) {
await Future.delayed(Duration(milliseconds: 500)); // 下游处理很慢
print('Processedvalue');
}
}
StreamController.broadcast()(热流)背压风险
- 使用 broadcast() 的 Stream,生产者是主动推送的(不会暂停)。
- 如果下游来不及处理,就会有背压问题。
- 需要手动做节流、丢弃、缓冲策略。
方式 说明 Buffer(缓冲) 自己实现 List 缓冲区,异步慢慢消费 Drop(丢弃) 丢弃来不及处理的事件(只保留最新) Throttle / Debounce 限制流速(节流、防抖) Pause/Resume Subscription 手动暂停流订阅,处理完成后恢复 Isolate 分流处理 重负载场景,把流消费分配到后台Isolate
手动 Buffer + 批量消费
final _controller = StreamController<int>.broadcast();
final List<int> _buffer = [];
void main() {
_controller.stream.listen((data) {
_buffer.add(data);
if (_buffer.length >= 10) {
processBatch(_buffer.toList());
_buffer.clear();
}
});
// 模拟高速发射
for (int i = 0; i < 1000; i++) {
_controller.add(i);
}
}
void processBatch(List<int> batch) {
print('Processing batch: $batch');
}
Drop Oldest / Keep Latest
final _controller = StreamController<int>.broadcast();
int? _latestValue;
bool _isProcessing = false;
void main() {
_controller.stream.listen((data) {
_latestValue = data;
if (!_isProcessing) {
_processLatest();
}
});
// 模拟高速发射
for (int i = 0; i < 1000; i++) {
_controller.add(i);
}
}
void _processLatest() async {
_isProcessing = true;
while (_latestValue != null) {
int value = _latestValue!;
_latestValue = null;
print('Processing $value');
await Future.delayed(Duration(milliseconds: 500)); // 模拟慢处理
}
_isProcessing = false;
}
Throttle (节流)
Stream<int> throttle(Stream<int> source, Duration duration) async* {
DateTime? lastEmit;
await for (var event in source) {
if (lastEmit == null || DateTime.now().difference(lastEmit!) > duration) {
yield event;
lastEmit = DateTime.now();
}
}
}
void main() async {
final source = Stream.periodic(Duration(milliseconds: 50), (i) => i);
await for (var event in throttle(source, Duration(milliseconds: 500))) {
print('Processed $event');
}
}