在 RX dart 中有 RX.combineLatest
使用回调函数组合流结果的方法。
问题是它仅在每个 流发出一个值时发出一个值。如果没有,它就不会发射。
Merges the given Streams into a single Stream sequence by using the combiner function whenever any of the stream sequences emits an item. The Stream will not emit until all streams have emitted at least one item.
我正在尝试将多个流合并为一个流以进行验证,当流未发出或发出空值时应发出 false 或 true。
class FormBloc {
final BehaviorSubject<bool> _result = BehaviorSubject();
final BehaviorSubject<String?> _usernameController = BehaviorSubject();
final BehaviorSubject<String?> _emailController = BehaviorSubject();
// Will only emit if each stream emitted a value
// If only username is emitted valid is not emitted
Stream<bool> get valid$ => Rx.combineLatest2(
_usernameController.stream,
_emailController.stream,
(username, email) => username != null || email != null
);
}
如果 任何 流发生变化,我如何加入这些流以便 valid$
发出一个值?
最佳答案
因为这里的所有解决方案都是解决方法,所以我已经实现了自己的流类。实现等同于原始的 CombineLatestStream 实现,除了它在发射之前不等待所有流发射:
import 'dart:async';
import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/subscription.dart';
class CombineAnyLatestStream<T, R> extends StreamView<R> {
CombineAnyLatestStream(List<Stream<T>> streams, R Function(List<T?>) combiner) : super(_buildController(streams, combiner).stream);
static StreamController<R> _buildController<T, R>(
Iterable<Stream<T>> streams,
R Function(List<T?> values) combiner,
) {
int completed = 0;
late List<StreamSubscription<T>> subscriptions;
List<T?>? values;
final _controller = StreamController<R>(sync: true);
_controller.onListen = () {
void onDone() {
if (++completed == streams.length) {
_controller.close();
}
}
subscriptions = streams.mapIndexed((index, stream) {
return stream.listen(
(T event) {
final R combined;
if (values == null) return;
values![index] = event;
try {
combined = combiner(List<T?>.unmodifiable(values!));
} catch (e, s) {
_controller.addError(e, s);
return;
}
_controller.add(combined);
},
onError: _controller.addError,
onDone: onDone
);
}).toList(growable: false);
if (subscriptions.isEmpty) {
_controller.close();
} else {
values = List<T?>.filled(subscriptions.length, null);
}
};
_controller.onPause = () => subscriptions.pauseAll();
_controller.onResume = () => subscriptions.resumeAll();
_controller.onCancel = () {
values = null;
return subscriptions.cancelAll();
};
return _controller;
}
}
关于flutter - rx dart 组合多个流以在任何流发出值时发出值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70361479/