java - RxJava 并行获取 Observables

标签 java asynchronous reactive-programming observable rx-java

在 RxJava 中实现并行异步调用时,我需要一些帮助。我选择了一个简单的用例,其中 FIRST 调用获取(相当搜索)要显示的产品列表(平铺)。随后的调用出去并获取(A)评论和(B)产品图像

经过几次尝试,我到达了这个地方。

 1    Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
 2    List<Tile> allTiles = new ArrayList<Tile>();
 3    ClientResponse response = new ClientResponse();

 4    searchTile.parallel(oTile -> {
 5      return oTile.flatMap(t -> {
 6        Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
 7        Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());

 8        return Observable.zip(reviews, imageUrl, (r, u) -> {
 9          t.setReviews(r);
10          t.setImageUrl(u);

11          return t;
12        });

13      });
14    }).subscribe(e -> {
15      allTiles.add((Tile) e);
16    });

第 1 行:取出要显示的产品(Tile)

第 4 行:我们获取 Observable 的列表并对其进行 SHARD 以获取评论和 imageUrls

谎言 6,7:获取 Observable 评论和 Observable url

第 8 行:最后 2 个 observable 被压缩以返回更新后的 Observable

第 15 行:最后第 15 行整理了所有要显示在集合中的单个产品,该集合可以返回给调用层

虽然 Observable 已被分片并且在我们的测试中运行了 4 个不同的线程;获取评论和图像似乎是一个接一个。我怀疑第 8 行的 zip 步骤基本上导致了 2 个 observables(reviews 和 url)的顺序调用。

enter image description here

这个小组对并行获取评论和图片网址有什么建议吗?本质上,上面附加的瀑布图应该看起来更垂直堆叠。对评论和图像的调用应该是并行的

谢谢 阿南德拉曼

最佳答案

并行运算符被证明是几乎所有用例的问题,并且没有达到大多数人的预期,因此在 1.0.0.rc.4 版本中将其删除:https://github.com/ReactiveX/RxJava/pull/1716

可以看到 here 的一个很好的例子来说明如何执行这种类型的行为并获得并行执行。 .

在您的示例代码中,不清楚 searchServiceClient 是同步的还是异步的。它会稍微影响如何解决问题,就好像它已经是异步的,不需要额外的调度。如果需要同步额外调度。

首先这里有一些显示同步和异步行为的简单示例:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
        System.out.println("------------ mergingSync");
        mergingSync();
        System.out.println("------------ mergingSyncMadeAsync");
        mergingSyncMadeAsync();
        System.out.println("------------ flatMapExampleSync");
        flatMapExampleSync();
        System.out.println("------------ flatMapExampleAsync");
        flatMapExampleAsync();
        System.out.println("------------");
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSync() {
        // here you'll see the delay as each is executed synchronously
        Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSyncMadeAsync() {
        // if you have something synchronous and want to make it async, you can schedule it like this
        // so here we see both executed concurrently
        Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleAsync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataAsync(i);
        }).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleSync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataSync(i);
        }).toBlocking().forEach(System.out::println);
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                s.onNext(i);
                s.onCompleted();
            });
    }
}

以下是尝试提供与您的代码更匹配的示例:

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecutionExample {

    public static void main(String[] args) {
        final long startTime = System.currentTimeMillis();

        Observable<Tile> searchTile = getSearchResults("search term")
                .doOnSubscribe(() -> logTime("Search started ", startTime))
                .doOnCompleted(() -> logTime("Search completed ", startTime));

        Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
            Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
                    .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
            Observable<String> imageUrl = getProductImage(t.getProductId())
                    .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));

            return Observable.zip(reviews, imageUrl, (r, u) -> {
                return new TileResponse(t, r, u);
            }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
        });

        List<TileResponse> allTiles = populatedTiles.toList()
                .doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
                .toBlocking().single();
    }

    private static Observable<Tile> getSearchResults(String string) {
        return mockClient(new Tile(1), new Tile(2), new Tile(3));
    }

    private static Observable<Reviews> getSellerReviews(int id) {
        return mockClient(new Reviews());
    }

    private static Observable<String> getProductImage(int id) {
        return mockClient("image_" + id);
    }

    private static void logTime(String message, long startTime) {
        System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static <T> Observable<T> mockClient(T... ts) {
        return Observable.create((Subscriber<? super T> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                }
                for (T t : ts) {
                    s.onNext(t);
                }
                s.onCompleted();
            }).subscribeOn(Schedulers.io());
        // note the use of subscribeOn to make an otherwise synchronous Observable async
    }

    public static class TileResponse {

        public TileResponse(Tile t, Reviews r, String u) {
            // store the values
        }

    }

    public static class Tile {

        private final int id;

        public Tile(int i) {
            this.id = i;
        }

        public int getSellerId() {
            return id;
        }

        public int getProductId() {
            return id;
        }

    }

    public static class Reviews {

    }
}

这个输出:

Search started  => 65ms
Search completed  => 1094ms
getProductImage[1] completed  => 2095ms
getSellerReviews[2] completed  => 2095ms
getProductImage[3] completed  => 2095ms
zip[1] completed  => 2096ms
zip[2] completed  => 2096ms
getProductImage[2] completed  => 2096ms
getSellerReviews[1] completed  => 2096ms
zip[3] completed  => 2096ms
All Tiles Completed  => 2097ms
getSellerReviews[3] completed  => 2097ms

我已将每个 IO 调用模拟为花费 1000 毫秒,因此很明显延迟在哪里并且它是并行发生的。它以经过的毫秒数打印出进度。

这里的技巧是 flatMap 会合并异步调用,所以只要被合并的 Observable 是异步的,它们都会被并发执行。

如果像 getProductImage(t.getProductId()) 这样的调用是同步的,可以像这样异步调用:getProductImage(t.getProductId()).subscribeOn(Schedulers.io)。

这是上面示例的重要部分,没有所有的日志记录和样板类型:

    Observable<Tile> searchTile = getSearchResults("search term");;

    Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
        Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
        Observable<String> imageUrl = getProductImage(t.getProductId());

        return Observable.zip(reviews, imageUrl, (r, u) -> {
            return new TileResponse(t, r, u);
        });
    });

    List<TileResponse> allTiles = populatedTiles.toList()
            .toBlocking().single();

我希望这会有所帮助。

关于java - RxJava 并行获取 Observables,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26249030/

相关文章:

swift - ReactiveCocoa 与 RxSwift - 优缺点?

Java Slider - 触发 fireStateChanged() 而不更改值?

node.js - 当某些条件发生时如何退出async.forEach循环+ Node

java - Android USB 主机异步批量传输示例

javascript - 如何使用 RxJS5 延迟重试发送 HTTP 请求?

Angular Reactive 形式 : change vs valueChanges

java - 如何设置包版本

java - 无法使用通配符泛型类型向 Java 集合添加值

java - 搜索 ArrayList

asynchronous - 如何在不等待的情况下轮询 Future 状态?