java - RxJava 相当于简单的 ThreadPoolExecutor 示例

标签 java rx-java rx-java2 micronaut resilience4j

我已经退出 Java 游戏大约 8 年了,从那时起发生了很多变化。对我来说最大的挑战是 RxJava/响应式(Reactive)。我正在寻找有关如何以完全响应式方式执行以下等效操作的粗略指导。

下面使用 ThreadPoolExecutor 实现的基本要求是通过调用远程 Web 服务来处理大量的Stuff,该服务的记录速率限制为 100 个请求/分钟。我的目标是尽可能快地处理尽可能多的数据,不丢失任何 Stuff,但仍遵守下游速率限制。此代码已经过简化,以避免错误、隔板、断路器、重试逻辑等。

这段代码目前工作正常,但在所有非阻塞 react 选项的情况下,它会导致感觉浪费了很多线程。甚至我用来调用服务的 HTTP 客户端也提供了一个 Flowable,我只是在执行器的 20 个线程中的每个线程中阻塞它。

我很想了解响应式(Reactive)等价物应该是什么。我遇到困难的地方几乎是我发现的所有文档都展示了使用 Observable 的静态源(例如:Observable.fromArray(1,2,3,4,5))。我知道解决方案可能涉及 IoScheduler 以及 groupBy,但我还没有弄清楚如何合并来 self 的 HTTP 客户端的 Flowable进入一些完整的链,该链执行并行化(最多一个限制,例如 20)和速率限制。

public class Example {
    private static final int THREADS = 20;

    // using https://docs.micronaut.io/latest/guide/index.html#httpClient
    @Client("http://stuff-processor.internal:8080")
    @Inject
    RxHttpClient httpClient;

    private ThreadPoolExecutor executor;
    private final RateLimiter rateLimiter;

    public Example() {
        // up to 20 threads to process the unbounded queue
        // incoming Stuff is very bursty...
        // ...we could go hours without anything and then hundreds could come in
        this.executor = new ThreadPoolExecutor(THREADS, THREADS,
                30,TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        this.executor.allowCoreThreadTimeOut(true);

        // using https://resilience4j.readme.io/docs/ratelimiter
        RateLimiterConfig config = RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(60))
                .limitForPeriod(100)
                .timeoutDuration(Duration.ofSeconds(90))
                .build();
        RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
        rateLimiter = rateLimiterRegistry.rateLimiter("stuff-processor", config);
    }

    /**
     * Called when the user takes an action that can cause 1 or 1000s of new
     * Stuff to be entered into the system. Each instance of Stuff results in
     * a separate call to this method. Ex: 100 Stuffs = 100 calls.
     */
    void onNewStuff(Stuff stuff) {
        final Runnable task = () -> {
            final Flowable<HttpResponse<Boolean>> flowable = httpClient.exchange(
                    HttpRequest.POST("/process", stuff),
                    Boolean.class);

            final HttpResponse<Boolean> response = flowable.blockingFirst();
            if (response.body()) {
                System.out.println("Success!");
            } else {
                System.out.println("Fail :(");
            }
        };

        final Runnable rateLimitedTask = 
                RateLimiter.decorateRunnable(rateLimiter, task);
        executor.submit(rateLimitedTask);
    }
}

谢谢!

最佳答案

首先,要以完全非阻塞的方式构建它,您需要使用像 Netty 这样的非阻塞、异步 HTTP 客户端库。我不确定 RxHttpClient 是如何工作的。

假设您有一个列表东西。我就是这样做的:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io())).subscribe();

flatMap 合并响应。

为了限制速率,flatMap 有第二个参数,它限制它并行订阅的内部流的数量。假设您想同时调用不超过 10 个电话。这样做:

Observable.fromIterable(stuffs).flatMap(a -> client.nonBlockingPost(a).subscribeOn(Schedulers.io()), 10).subscribe();

关于java - RxJava 相当于简单的 ThreadPoolExecutor 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58254939/

相关文章:

java - 部署失败: repository element was not specified in the POM

java - 生成长度为 10 的唯一、随机字母数字主键

java - RxJava 而不是 AsyncTask?

rx-java2 - 我们如何在 retryWhen() 停止重试后获取 Throwable?

java - 在menu.xml中使用actionLayout时android中的NullPointerException

java - 制作商业 Java 软件 (DRM)

android - BehaviorSubject 处理中的 BehaviorSubject

安卓,RX : what put to UI thread - collections or elements?

java - 如何记住 Single 发出的值?

java - 在 Android 中使用 Room 和 RxJava 链接多个网络调用和数据库插入