我正在玩弄 Disruptor框架,我发现我的事件处理程序没有被调用。
这是我的设置代码:
private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private void initializeDisruptor() {
if (disruptor != null)
return;
disruptor =
new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(searchTermMatchingHandler)
.then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);
this.ringBuffer = disruptor.start();
}
在其他地方,我发布事件。我尝试了以下两种方法:
事件发布方法 A:
private void handleStatus(final Status status)
{
long sequence = ringBuffer.next();
TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
ringBuffer.publish(sequence);
}
在这种情况下,我发现第一个 EventHandler
被调用,但除此之外没有任何其他东西被调用。
事件发布方法 B:
private void handleStatus(final Status status)
{
disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {
@Override
public TwitterStatusReceivedEvent translateTo(
TwitterStatusReceivedEvent event, long sequence) {
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
return event;
}
});
}
在这种情况下,我发现根本没有调用任何事件处理程序。
我做错了什么?
更新
这是我的 EventHandler 的全部内容。我应该如何表示处理已完成?
public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {
@Override
public void onEvent(TwitterStatusReceivedEvent event, long sequence,
boolean endOfBatch) throws Exception {
String statusText = event.getStatus().getText();
for (Instrument instrument : event.getSearchInstruments())
{
if (statusText.contains(instrument.getSearchTerm()))
{
event.setMatchedInstrument(instrument);
break;
}
}
}
}
最佳答案
每个事件处理程序都需要在自己的线程中运行,该线程在您关闭中断器之前不会退出。由于您使用的是单线程执行程序,因此只有碰巧执行的第一个事件处理程序才会运行。 (Disruptor 类将每个处理程序存储在 HashMap 中,因此最终运行的处理程序会有所不同)
如果你切换到缓存线程池,你会发现它全部开始运行。您不需要对序列号进行任何管理,因为这一切都由 Disruptor 类为您设置和管理的 EventProcessor 处理。只处理你得到的每个事件是完全正确的。
关于java - Disruptor - 未调用 EventHandlers,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8309735/