asynchronous - OSGi PushStream慢

标签 asynchronous promise stream osgi

在尝试OSGi PushStream库时,我觉得它确实很慢。我创建了两个方法,它们使用PushStream来做相同的事情,另一个方法是简单的BlockingQueue(请参见下面的代码),结果如下:

Queue needs 3 milliseconds to process 1000 events.
PushStream needs 31331 milliseconds to process 1000 events.

为什么PushStream慢一些?我做错了什么?

代码

使用PushStream:
public class TestPush{

    @Test
    public void testPushStream() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
              psp.buildSimpleEventSource(Integer.class).withQueuePolicy(QueuePolicyOption.BLOCK).build();

        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();

        psp.createStream(source).onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
            if (i == 0) {
                startD.resolve( Instant.now() );
            }
        });

        final Promise<Long> nbEvent = psp.createStream(source).count();

        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();

        System.out.println("PushStream needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getValue() + " events.");
    }

使用ArrayBlockingQueue:
    @Test
    public void testBlockingQueue() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

        final Executor e = Executors.newFixedThreadPool(1);
        final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(32);

        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
        final Deferred<Integer> nbEvent = pf.deferred();

        e.execute( () -> {
            try {
                Integer i = 0;
                Integer last = 0;
                do {
                    i = abq.take();

                    if (i == 0) {
                        startD.resolve(Instant.now());
                    } else if (i != -1) {
                        last = i;
                    }
                }
                while (i != -1);
                endD.resolve(Instant.now());
                nbEvent.resolve(last + 1);
            }
            catch (final InterruptedException exception) {
                exception.printStackTrace();
            }
        });

        for (int i = 0; i < 1000; i++) {
            abq.put(i);
        }
        abq.put(-1);

        System.out.println("Queue needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getPromise().getValue() + " events.");
    }
}

最佳答案

这是一个有趣的问题:)

Why the PushStream is slower? What I am doing wrong?



感谢您不仅仅假定PushStream实现很烂。在这种情况下,它会变慢,因为(可能没有意识到)您要求它是这样!

第1部分-缓冲

默认情况下,PushStream被缓冲。这意味着它们包括一个队列,在处理事件之前将其放入其中。因此,缓冲会做一些不利于吞吐速度的事情。
  • 它将额外的队列/出队步骤添加到管道
  • 它在事件处理
  • 中添加了一个额外的线程开关
  • 缓冲区的默认策略是返回与缓冲区已满有关的压力。

  • 在这种情况下,减速的绝大部分是由于背压。使用psp.createStream(source)创建流时,将使用32个元素的缓冲区和基于缓冲区大小的线性反压策略来设置该流,满时返回一秒钟,如果其中一项返回31毫秒。值得注意的是,每个元素31毫秒总计30秒!

    重要的是,SimplePushEventSource始终接受来自添加到它的使用者的反压力请求。这意味着您可能会尽可能快地将事件泵入SimplePushEventSource,但它们只会按照管道请求的速度尽快传递。

    如果我们从正在创建的推送流中删除缓冲,那么我们将进行以下测试:
    @Test
    public void testPushStream2() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
    
        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
              psp.buildSimpleEventSource(Integer.class)
              .withQueuePolicy(QueuePolicyOption.BLOCK)
              .build();
    
        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
    
        psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
            if (i == 0) {
                startD.resolve( Instant.now() );
            }
        });
    
        final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();
    
        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();
    
        System.out.println("PushStream needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getValue() + " events.");
    }
    

    (在我的机器上)运行此命令的结果是:
    PushStream needs 39 milliseconds to process 1000 events.
    

    这显然更接近您的预期,但速度仍然明显较慢。请注意,我们可能仍然有一些缓冲,但是请调整PushbackPolicy。这样可以为我们提供更快的吞吐率,但还不如我们这么快。

    第2部分-管道长度

    接下来要注意的是,您正在使用onClose()处理程序。这为推送流管道增加了一个额外的阶段。实际上,您可以将onClose移动为promise的结果,从而减少了管道的长度(您只需要运行一次)。
    @Test
    public void testPushStream3() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
    
        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
                psp.buildSimpleEventSource(Integer.class)
                .withQueuePolicy(QueuePolicyOption.BLOCK)
                .build();
    
        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
    
        psp.buildStream(source).unbuffered().build().forEach((i) -> {
            if (i == 0) {
                startD.resolve( Instant.now() );
            }
        });
    
        final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
                .onResolve(() -> endD.resolve( Instant.now()));
    
        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();
    
        System.out.println("PushStream needs "
                + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
                + " milliseconds to process " + nbEvent.getValue() + " events.");
    }
    

    这个版本的结果(在我的机器上)是:
    PushStream needs 21 milliseconds to process 1000 events.
    

    第3部分-多路传输

    “原始数组阻止队列”示例与PushStream示例之间的主要区别在于,您实际上创建了两个PushStream。第一个工作是捕获开始时间,第二个工作是对事件进行计数。这将强制SimplePushEventSource在多个使用者上复用事件。

    如果将行为折叠到单个管道中,以便SimplePushEventSource可以使用快速路径传递怎么办?
    @Test
    public void testPushStream4() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
    
        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
                psp.buildSimpleEventSource(Integer.class)
                .withQueuePolicy(QueuePolicyOption.BLOCK)
                .build();
    
        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
    
        final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
                .filter(i -> {
                    if (i == 0) {
                        startD.resolve( Instant.now() );
                    }
                    return true;
                })
                .count()
                .onResolve(() -> endD.resolve( Instant.now()));
    
        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();
    
        System.out.println("PushStream needs "
                + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
                + " milliseconds to process " + nbEvent.getValue() + " events.");
    }
    

    这个版本的结果(在我的机器上)是:
    PushStream needs 3 milliseconds to process 1000 events.
    

    概括

    PushStream是一种消耗异步到达事件的快速有效的方法,但是了解哪种缓冲行为适合您的应用程序非常重要。如果您想快速地遍历大量数据,则需要注意如何进行设置,因为缓冲默认值是为不同的用例设计的!

    关于asynchronous - OSGi PushStream慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53692861/

    相关文章:

    mysql - 将 node.js 中的 mysql 模块用于大表

    c# - 在 Windows 应用商店应用程序中使用自定义流

    javascript - 在请求之前异步进行身份验证

    c# - C#中的异步操作和线程

    javascript - 如何减慢代码速度?

    javascript - promise 和并发

    c# - 使用 Task.Run 是一种不好的做法吗?

    javascript - 学习 Javascript/Jquery Promise&Deferred, Issue with $.when 多次使用

    javascript - 如何在 Javascript 中的 forEach 循环内正确链接 Promise

    javascript - 如何在 JavaScript 中解析大型 JSON 流中的项目?