dart - 将 Stream<Stream<T>> 转换为 Stream<T>

标签 dart rxdart

我正在使用 rxdart在 dart 中处理流的包。我被困在处理一个特殊的问题上。

请看一下这个虚拟代码:

final userId = BehaviorSubject<String>();

Stream<T> getStream(String uid) {
  // a sample code that returns a stream
  return BehaviorSubject<T>().stream;
}

final Observable<Stream<T>> oops = userId.map((uid) => getStream(uid));

现在我想转换 oops仅获取变量 Observable<T> .

我发现很难解释清楚。但让我试试。我有一个流 A。我将流 A 的每个输出映射到另一个流 B。现在我有 Stream<Stream<B>> - 一种循环流。我只想听听这个模式产生的最新值。我怎样才能做到这一点?

最佳答案

我将列出几种压平 Stream<Stream<T>> 的方法成单Stream<T> .

1.使用纯 Dart

正如@ Irn 所回答的,这是一个纯 Dart 解决方案:

Stream<T> flattenStreams<T>(Stream<Stream<T>> source) async* {
  await for (var stream in source) yield* stream;
}

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Stream<int> s = flattenStreams(Stream.fromIterable(list).map(getStream));
    s.listen(print);
}

输出:1 2 3 4 1 2 3 4 1 2 3 4
2. 使用 Observable.flatMap

Observable 有一个 flatMap 方法来展平输出流并将其附加到正在进行的流中:
import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.fromIterable(list).flatMap(getStream);
    s.listen(print);
}

输出:1 2 3 4 1 2 3 4 1 2 3 4
3. 使用 Observable.switchLatest

Convert a Stream that emits Streams (aka a "Higher Order Stream") into a single Observable that emits the items emitted by the most-recently-emitted of those Streams.



这是我一直在寻找的解决方案!我只需要内部流发出的最新输出。
import 'package:rxdart/rxdart.dart';

Stream<int> getStream(String v) {
    return Stream.fromIterable([1, 2, 3, 4]);
}

void main() {
    List<String> list = ["a", "b", "c"];
    Observable<int> s = Observable.switchLatest(
            Observable.fromIterable(list).map(getStream));
    s.listen(print);
}

输出:1 1 1 2 3 4

关于dart - 将 Stream<Stream<T>> 转换为 Stream<T>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56770400/

相关文章:

flutter - 如果传递了渐变,则框装饰中的渐变或颜色

Firebase 不验证电子邮件

flutter - 为之前的 Flutter 项目关闭 Null Safety?

dart - 如何使用 RxDart BehaviourSubject 更新 flutter widget?

firebase - 用RxDart在dart-flutter中进行Concanete 2 Firestore QuerySnapShot流

dart - StreamBuilder TextField 在其他地方更改时不会更新其值

flutter - Flutter 中的 PageView 模块中的 PageController 未正确设置。弹出位置非空错误

flutter - 滑动(Flutter)时如何更改TabBar指示器颜色

integer - Dart :可以加2但不能加1

flutter - 如何比较 Dart 中的文件/流平等