java - Disruptor - 未调用 EventHandlers

标签 java disruptor-pattern

我正在玩弄 Disruptor框架,我发现我的事件处理程序没有被调用。

这是我的设置代码:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

在其他地方,我发布事件。我尝试了以下两种方法:

事件发布方法 A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

在这种情况下,我发现第一个 EventHandler 被调用,但除此之外没有任何其他东西被调用。

事件发布方法 B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

在这种情况下,我发现根本没有调用任何事件处理程序。

我做错了什么?

更新

这是我的 EventHandler 的全部内容。我应该如何表示处理已完成?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}

最佳答案

每个事件处理程序都需要在自己的线程中运行,该线程在您关闭中断器之前不会退出。由于您使用的是单线程执行程序,因此只有碰巧执行的第一个事件处理程序才会运行。 (Disruptor 类将每个处理程序存储在 HashMap 中,因此最终运行的处理程序会有所不同)

如果你切换到缓存线程池,你会发现它全部开始运行。您不需要对序列号进行任何管理,因为这一切都由 Disruptor 类为您设置和管理的 EventProcessor 处理。只处理你得到的每个事件是完全正确的。

关于java - Disruptor - 未调用 EventHandlers,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8309735/

相关文章:

java - 在解决构建路径错误之前无法构建项目 eclipse

java - 已安装 eclipse 但无法启动 java 返回退出代码 = 1

java - 来自指针的 JNA Java 结构

java - 给定一个 IP 地址列表,你如何找到最小值、最大值?

java - 如果 Disruptor 中的 next() 和publish() 之间抛出异常会发生什么?

java - 创建像环形缓冲区一样批处理的 Observable(需要建议)

Java Disruptor 模式和低延迟

java - 从 servlet 向 html 页面发送警报

c++ - LMAX 的干扰模式 : is there a port to C++?

c++ - 在 C++ 中构建 Disruptor 在 autoconf 上出现错误