flutter - rx dart 组合多个流以在任何流发出值时发出值

标签 flutter dart rxdart combinelatest

在 RX dart 中有 RX.combineLatest使用回调函数组合流结果的方法。 问题是它仅在每个 流发出一个值时发出一个值。如果没有,它就不会发射。

Merges the given Streams into a single Stream sequence by using the combiner function whenever any of the stream sequences emits an item. The Stream will not emit until all streams have emitted at least one item.

我正在尝试将多个流合并为一个流以进行验证,当流未发出或发出空值时应发出 false 或 true。

class FormBloc {
  final BehaviorSubject<bool> _result = BehaviorSubject();
  final BehaviorSubject<String?> _usernameController = BehaviorSubject();
  final BehaviorSubject<String?> _emailController = BehaviorSubject();

  // Will only emit if each stream emitted a value
  // If only username is emitted valid is not emitted
  Stream<bool> get valid$ => Rx.combineLatest2(
    _usernameController.stream, 
    _emailController.stream, 
    (username, email) => username != null || email != null
  );

}

如果 任何 流发生变化,我如何加入这些流以便 valid$ 发出一个值?

最佳答案

因为这里的所有解决方案都是解决方法,所以我已经实现了自己的流类。实现等同于原始的 CombineLatestStream 实现,除了它在发射之前不等待所有流发射:


import 'dart:async';

import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/subscription.dart';

class CombineAnyLatestStream<T, R> extends StreamView<R> {

  CombineAnyLatestStream(List<Stream<T>> streams, R Function(List<T?>) combiner) : super(_buildController(streams, combiner).stream);

  static StreamController<R> _buildController<T, R>(
    Iterable<Stream<T>> streams,
    R Function(List<T?> values) combiner,
  ) {

    int completed = 0;

    late List<StreamSubscription<T>> subscriptions;
    List<T?>? values;

    final _controller = StreamController<R>(sync: true);

    _controller.onListen = () {

      void onDone() {
        if (++completed == streams.length) {
          _controller.close();
        }
      }

      subscriptions = streams.mapIndexed((index, stream) {

        return stream.listen(
          (T event) {
            final R combined;

            if (values == null) return;

            values![index] = event;

            try {
              combined = combiner(List<T?>.unmodifiable(values!));
            } catch (e, s) {
              _controller.addError(e, s);
              return;
            }

            _controller.add(combined);
          },
          onError: _controller.addError,
          onDone: onDone
        );
      }).toList(growable: false);

      if (subscriptions.isEmpty) {
        _controller.close();
      } else {
        values = List<T?>.filled(subscriptions.length, null);
      }
    };

    _controller.onPause = () => subscriptions.pauseAll();
    _controller.onResume = () => subscriptions.resumeAll();
    _controller.onCancel = () {
      values = null;
      return subscriptions.cancelAll();
    };

    return _controller;
  }

}

关于flutter - rx dart 组合多个流以在任何流发出值时发出值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70361479/

相关文章:

dart - 如何在 Flutter 中加载超大图片?

flutter - 如何在 Flutter Widget(Center Widget)的子属性中使用条件语句

dart - 如何使用 Dart 语言获取所有子元素的数组?

dart - Flutter:自定义 ExpansionTile

dart - RxDart CombineLatest 不发出任何数据

ios - Xcode 11.6 失败并显示 : Command PhaseScriptExecution failed with a nonzero exit code

flutter - flutter一个构建函数返回null

flutter - 检测用户何时停止触摸屏幕

基于流的flutter snackbar显示

Flutter 组合可观测值