流背压(Backpressure)

文章目录[x]
  1. 1:Kotlin Flow 背压处理方式
  2. 1.1:Flow 背压策略
  3. 2:Kotlin Channel 背压处理
  4. 2.1:Channel 背压策略(capacity参数决定行为)
  5. 3:RxJava Flowable (Reactive Streams Backpressure)
  6. 4:Dart Stream 的默认背压机制
  7. 4.1:自动反压
  8. 5:StreamController.broadcast()(热流)背压风险
  9. 5.1:手动 Buffer + 批量消费
  10. 5.2:Drop Oldest / Keep Latest
  11. 5.3:Throttle (节流)

什么是流背压 (Backpressure)

数据生产方(上游)生产数据太快,下游来不及处理。下游(消费者)无法跟上数据处理,导致数据堆积在中间层、如果数据量过大可能会导致系统崩溃。
通常出现在:
- 异步数据流(如 Stream、Flow)
- 网络数据流(如Socket、视频流)
- 数据库流式查询

流背压的通用处理策略

分类 具体策略 适用场景/特点 限制
缓冲(Buffering) 设定缓冲区,先存储数据等待处理 数据量短期突发可控,允许一定延迟 流量持续过高会导致内存溢出
丢弃(Dropping) 丢弃超出处理能力的数据(old/new) 高频不重要数据流,如点击事件 只适用于:非关键性数据(如 UI 滚动、点击频率控制)
节流(Throttling) 限制单位时间内最多处理多少数据 限流、限资源消耗(硬性节奏控制) 适用于:资源受限系统,如 I/O、网络请求。
防抖(Debouncing) 一段时间内只处理最后一次事件 用户输入、搜索等场景 一般用于前端输入框实时搜索、按钮快速点击防抖
拉模型(Pull-based) 消费者主动请求数据(反压机制) 上下游解耦,Backpressure-aware 需要全链路流量感知的系统。
暂停/恢复(Pause/Resume) 上游等待下游准备好再继续发送 流量控制,强一致性场景 一般适用于资源敏感型应用(如视频播放、文件流处理)。
批处理(Batching) 将多条数据合并后批量处理 高吞吐、弱实时性场景 一般适用于不需要强实时的后台任务、日志上传。
降级处理(Degrade) 动态调整数据质量或内容简化 高并发下的体验保障 实时性要求高但数据可降级的系统。|
流优先级(Priority Scheduling) 为不同类型流设置优先级 保证关键流优先(如实时告警) 一般适用于关键告警流、核心业务流优先保障。

流背压 3 大核心思想

  1. 削峰:减少高峰期数据量(Throttling, Dropping, Debounce)。

  2. 移峰:将流量延后处理(Buffering, Batching)。

  3. 控流:根据下游能力调整上游生产(Pull-based, Pause/Resume)。

Kotlin 中处理背压的方式

Kotlin Flow 背压处理方式

Kotlin Flow 本身是 冷流(Cold Stream),默认是按需发射(suspend),但当上游 emit 很快、下游处理慢时,仍然会遇到背压问题。

Flow 背压策略

方法 说明
buffer() 添加缓冲区,允许上游和下游异步运行。
conflate() 丢弃中间值,只保留最新值。
collectLatest() 如果正在处理旧值,取消旧值处理,只处理最新值。
debounce() 一段时间内只接收最后一个事件(防抖)。
sample() 定时采样流数据,忽略非采样点数据。

Flow 背压处理示例

flow {
    repeat(1000) {
        emit(it)
        println("Emit it")
    }
}
.buffer()  // 允许emit与collect异步(可能堆积)
.conflate()  // 丢弃中间值,只保留最新
.collect {
    delay(100)  // 消费很慢
    println("Collectit")
}

collectLatest(取消旧事件的处理)

flow {
    repeat(100) {
        emit(it)
        println("Emit it")
    }
}
.collectLatest {
    println("Processingit")
    delay(100)  // 处理耗时
    println("Done $it")
}

Kotlin Channel 背压处理

Kotlin Channel 是“热流”,天然具备背压风险。

Channel 背压策略(capacity参数决定行为)

策略 示例
RENDEZVOUS(默认,阻塞发送) Channel()
BUFFERED(缓冲N个元素) Channel(capacity = Channel.BUFFERED)
CONFLATED(只保留最新值) Channel(capacity = Channel.CONFLATED)
UNLIMITED(无限缓存) Channel(capacity = Channel.UNLIMITED)
CUSTOM N 缓冲 Channel(capacity = N)

Channel 背压示例

val channel = Channel<Int>(capacity = Channel.CONFLATED)

launch {
    repeat(1000) {
        channel.send(it)
        println("Sent it")
    }
}

launch {
    for (item in channel) {
        delay(100)  // 消费速度慢
        println("Receiveditem")
    }
}

RxJava Flowable (Reactive Streams Backpressure)

如果你用 RxJava in Kotlin,可以用 Flowable 处理背压。

策略 方法
缓冲 onBackpressureBuffer()
丢弃最新 onBackpressureDrop()
丢弃旧值(保留最新) onBackpressureLatest()

RxJava Flowable 示例

Flowable.interval(10, TimeUnit.MILLISECONDS)
    .onBackpressureLatest()  // 只保留最新值
    .observeOn(Schedulers.io())
    .subscribe {
        Thread.sleep(100)  // 消费慢
        println("Received $it")
    }

Dart 中流背压(Backpressure)处理方式

Dart Stream 的默认背压机制

  • Dart 的 Stream 本身有基础的“订阅驱动”机制(类似 Pull-based)。
  • 只要下游的 await for、listen 还没处理完,上游 async 的 yield 就会暂停*。
  • 这叫 “自然背压”(基于 async/await 挂起)。

自动反压

Dart Stream 是“冷流”,只会在下游准备好时继续 emit,不会主动堆积数据。

Stream<int> generateNumbers() async* {
  for (int i = 0; i < 100; i++) {
    print('Emit i');
    yield i;  // 会根据下游处理速度自动暂停
  }
}

void main() async {
  await for (var value in generateNumbers()) {
    await Future.delayed(Duration(milliseconds: 500)); // 下游处理很慢
    print('Processedvalue');
  }
}

StreamController.broadcast()(热流)背压风险

  • 使用 broadcast() 的 Stream,生产者是主动推送的(不会暂停)。
  • 如果下游来不及处理,就会有背压问题。
  • 需要手动做节流、丢弃、缓冲策略。
    方式 说明
    Buffer(缓冲) 自己实现 List 缓冲区,异步慢慢消费
    Drop(丢弃) 丢弃来不及处理的事件(只保留最新)
    Throttle / Debounce 限制流速(节流、防抖)
    Pause/Resume Subscription 手动暂停流订阅,处理完成后恢复
    Isolate 分流处理 重负载场景,把流消费分配到后台Isolate

手动 Buffer + 批量消费

final _controller = StreamController<int>.broadcast();
final List<int> _buffer = [];

void main() {
  _controller.stream.listen((data) {
    _buffer.add(data);
    if (_buffer.length >= 10) {
      processBatch(_buffer.toList());
      _buffer.clear();
    }
  });

  // 模拟高速发射
  for (int i = 0; i < 1000; i++) {
    _controller.add(i);
  }
}

void processBatch(List<int> batch) {
  print('Processing batch: $batch');
}

Drop Oldest / Keep Latest

final _controller = StreamController<int>.broadcast();
int? _latestValue;
bool _isProcessing = false;

void main() {
  _controller.stream.listen((data) {
    _latestValue = data;
    if (!_isProcessing) {
      _processLatest();
    }
  });

  // 模拟高速发射
  for (int i = 0; i < 1000; i++) {
    _controller.add(i);
  }
}

void _processLatest() async {
  _isProcessing = true;
  while (_latestValue != null) {
    int value = _latestValue!;
    _latestValue = null;
    print('Processing $value');
    await Future.delayed(Duration(milliseconds: 500)); // 模拟慢处理
  }
  _isProcessing = false;
}

Throttle (节流)

Stream<int> throttle(Stream<int> source, Duration duration) async* {
  DateTime? lastEmit;
  await for (var event in source) {
    if (lastEmit == null || DateTime.now().difference(lastEmit!) > duration) {
      yield event;
      lastEmit = DateTime.now();
    }
  }
}

void main() async {
  final source = Stream.periodic(Duration(milliseconds: 50), (i) => i);
  await for (var event in throttle(source, Duration(milliseconds: 500))) {
    print('Processed $event');
  }
}
点赞

发表评论

昵称和uid可以选填一个,填邮箱必填(留言回复后将会发邮件给你)
tips:输入uid可以快速获得你的昵称和头像

Title - Artist
0:00