java - 创建一个以有限速率发出项目的 Flowable,以避免需要缓冲事件

标签 java multithreading java-8 rx-java2

我有一个数据访问对象,它将数据源中的每个项目传递给消费者:

public interface Dao<T> {
    void forEachItem(Consumer<T> item);
}

这总是以单线程方式生成项目 - 我目前无法更改此界面。

我想从这个界面创建一个Flowable:

private static Flowable<String> flowable(final Dao dao) {
    return Flowable.create(emitter -> {
        dao.forEachItem(item ->
                emitter.onNext(item));
        emitter.onComplete();
    }, ERROR);
}

如果我在处理时间长于发出项目的速率的情况下使用此Flowable,那么我可以理解地得到一个丢失的背压异常,因为我正在使用ERROR模式:

    Dao<String> exampleDao =
            itemConsumer ->
                    IntStream.range(0, 1_000).forEach(i ->
                            itemConsumer.accept(String.valueOf(i)));

    flowable(exampleDao)
            .map(v -> {
                Thread.sleep(100);
                return "id:" + v;
            })
            .blockingSubscribe(System.out::println);

我不希望缓冲项目 - 似乎这可能会导致非常大的数据集耗尽内存 - 如果操作明显慢于生产者。

我希望有一种背压模式,允许发射器在检测到背压时传递下一个/完成事件时阻塞,但情况似乎并非如此?

就我而言,因为我知道 dao 以单线程方式生成项目,所以我认为我能够执行以下操作:

  dao.forEachItem(item -> {
    while (emitter.requested() == 0) {
      waitABit();
    }         
    emitter.onNext(item)
  });

但这似乎永远挂起。

我的方法有多错误? :-) 考虑到我的(相对限制性的)环境,有没有一种方法可以尊重下游背压来生产产品?

我知道我可以通过一个单独的进程写入队列,然后根据该队列的消耗编写一个 Flowable - 这会是首选方法吗?

最佳答案

检查Flowable的部分,尤其是带有Supscription.request(long)的部分。我希望这能让您走上正确的道路。

<小时/>

TestProducer从这个例子产生 Integer给定范围内的对象并将它们推送到其 Subscriber 。它扩展了 Flowable<Integer>类(class)。对于新订阅者,它会创建 Subscriptionrequest(long) 的对象方法用于创建和发布整数值。

这对于 Subscription 很重要被传递到subscriberrequest()调用 onNext() 的方法可以从此 onNext() 中递归调用订阅者上的内容称呼。为了防止堆栈溢出,所示的实现使用 outStandingRequests计数器和isProducing标志。

class TestProducer extends Flowable<Integer> {
    static final Logger logger = LoggerFactory.getLogger(TestProducer.class);
    final int from, to;

    public TestProducer(int from, int to) {
        this.from = from;
        this.to = to;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Integer> subscriber) {
        subscriber.onSubscribe(new Subscription() {

            /** the next value. */
            public int next = from;
            /** cancellation flag. */
            private volatile boolean cancelled = false;
            private volatile boolean isProducing = false;
            private AtomicLong outStandingRequests = new AtomicLong(0);

            @Override
            public void request(long n) {
                if (!cancelled) {

                    outStandingRequests.addAndGet(n);

                    // check if already fulfilling request to prevent call  between request() an subscriber .onNext()
                    if (isProducing) {
                        return;
                    }

                    // start producing
                    isProducing = true;

                    while (outStandingRequests.get() > 0) {
                        if (next > to) {
                            logger.info("producer finished");
                            subscriber.onComplete();
                            break;
                        }
                        subscriber.onNext(next++);
                        outStandingRequests.decrementAndGet();
                    }
                    isProducing = false;
                }
            }

            @Override
            public void cancel() {
                cancelled = true;
            }
        });
    }
}

本例中的 Consumer 扩展了 DefaultSubscriber<Integer>并在开始时和消耗一个 Integer 后请求下一个。在使用整数值时,会有一点延迟,因此将为生产者建立背压。

class TestConsumer extends DefaultSubscriber<Integer> {

    private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);

    @Override
    protected void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer i) {
        logger.info("consuming {}", i);
        if (0 == (i % 5)) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException ignored) {
                // can be ignored, just used for pausing
            }
        }
        request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        logger.error("error received", throwable);
    }

    @Override
    public void onComplete() {
        logger.info("consumer finished");
    }
}

在测试类的以下主要方法中,创建并连接生产者和消费者:

public static void main(String[] args) {
    try {
        final TestProducer testProducer = new TestProducer(1, 1_000);
        final TestConsumer testConsumer = new TestConsumer();

        testProducer
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.single())
                .blockingSubscribe(testConsumer);

    } catch (Throwable t) {
        t.printStackTrace();
    }
}

运行示例时,日志文件显示消费者持续运行,而生产者仅在 rxjava2 的内部 Flowable 缓冲区需要重新填充时才会激活。

关于java - 创建一个以有限速率发出项目的 Flowable,以避免需要缓冲事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44473859/

相关文章:

windows - 如何找到线程本地存储的开始和结束?

java - awaitTermination 函数出现 IllegalMonitorStateException

Java 8 Stream 获取最小值

java - Akka ActorSystem 在 Java 中永远不会终止

java - JellyBean 中未调用 onNewIntent

java - Android 按钮 - 单击时触发定时事件

java - Arquillian 可以并行运行测试吗?

c# - 多线程无法正常执行

java - 在 javaFX 中通过 KeyCombination 触发事件

Java基数转换程序给定值、其基数和新基数