java - 使用另一个 Observable<List<AnotherObject>> 过滤 Observable<List<Object>>

标签 java rx-java rx-android rx-java2

有什么方法可以实现以下目标吗?

我有两个可观察对象,

  • Observable<List<String>> broadcastIdsObservable .
  • Observable<List<EventData>> eventDatasObservable .

有没有办法用broadcastIdsObsevable内部的List来过滤eventDatasObservable?我需要获取List<EventData>eventData满足条件eventData.getBroadcastId().equals(broadCastId)哪里broadCastId是broadcastIds的每一项。

我尝试了使用 flatMap、map 和 Observable.concat 的不同方法...但我无法实现它。

提前致谢。

最佳答案

您可以使用RxJavaJoins 。您需要包括

compile 'io.reactivex:rxjava-joins:0.22.0'

然后创建联接:

@NonNull
@SuppressWarnings("unchecked")
private Func0<Observable<List<EventData>>> loadAndFilterEvents() {
    return new Func0<Observable<List<EventData>>>() {

        @Override
        public Observable call() {

            Observable<List<String>> broadcastIdsObservable = ...;
            Observable<List<EventData>> eventDatasObservable = ...;

            Plan0<List<EventData>> plan = JoinObservable.from(broadcastIdsObservable)
                                                        .and(eventDatasObservable)
                                                        .then(filterJoin);
            return JoinObservable.when(plan).toObservable();
        }

        private Func2<List<String>, List<EventData>, List<EventData>> filterJoin = (broadcastIds, events) -> {
            List<EventData> filtered = new ArrayList<>();
            for (EventData event : events) {
                if (broadcastIds.contains(event.getBroadcastId())) {
                    filtered.add(event);
                }
            }
            return filtered;
        };

    };
}

并订阅它:

Observable.defer(loadAndFilterEvents())
              .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(filteredEvents -> Timber.d("Events: %s", filteredEvents),
                         Timber::e);

关于java - 使用另一个 Observable<List<AnotherObject>> 过滤 Observable<List<Object>>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45946781/

相关文章:

java - 在 DataWeave 1.0 中将字符串转换为数字时如何避免指数

java - 在 RxJava 中,如何在 Observable 之外反射(reflect)/提取故障?

android - RxJava 中的 CompositeDisposable 是什么

java - RxJava门机制

java - 使用 retrofit2 和 RxAndroid 从 Spring WebFlux 获取响应

java - 在 Scala 的案例类和类字段中使用 Optional 是否有代码味道?

java - executeUpdate() 总是返回 1

java - @PostAuthorize 失败时返回 404 而不是 403

android - rxjava Realm - 使用 UI 线程以外的其他处理程序线程的灵活性?

android - 使用 rxjava2 遍历列表