java - 破坏者消费者没有按预期工作

标签 java multithreading disruptor-pattern

当我运行这段代码时

    public class Test {
      public static void main(String[] args) {
        Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new EventFactoryImpl<MyEvent>(),
    Executors.newFixedThreadPool(2), new MultiThreadedClaimStrategy(32), new                    BusySpinWaitStrategy());

        MyEventHandler myEventHandler1 = new MyEventHandler("1");
        MyEventHandler myEventHandler2 = new MyEventHandler("2");

        disruptor.handleEventsWith(myEventHandler1, myEventHandler2);
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        ByteBuffer bb = ByteBuffer.allocate(8);

        for (long l = 0; l < 2; l++) {
          bb.putLong(0, l);
          long sequence = ringBuffer.next();

          try {
             MyEvent event = ringBuffer.get(sequence);
             event.set(bb.getLong(0));
          }
          finally {
            ringBuffer.publish(sequence);
          }
        }
      }
    }
    public class MyEvent {
      private long value;

      public void set(long value) {
        this.value = value;
      }

      public long get() {
        return value;
      }
    }

    public class MyEventHandler implements EventHandler<MyEvent> {
       private String id;

       public MyEventHandler(String id) {
          this.id = id;
       }

       public void onEvent(MyEvent event, long sequence, boolean endOfBatch) {
           System.out.println("id: " + id + ", event: " + event.get() + ", sequence: " + sequence +                  "," + Thread.currentThread().getName());
       }
    }

    public class EventFactoryImpl<T> implements EventFactory<T> {
       @SuppressWarnings("unchecked")
       public T newInstance() {
         return (T) new MyEvent();
       }
    }

我得到这个输出

id: 1, event: 0, sequence: 0,pool-1-thread-1
id: 1, event: 1, sequence: 1,pool-1-thread-1
id: 2, event: 0, sequence: 0,pool-1-thread-2
id: 2, event: 1, sequence: 1,pool-1-thread-2

但我希望每个事件都由单独的线程处理一次。我怎样才能实现它?

最佳答案

使用 Disruptor,每个订阅环形缓冲区的 EventHandler 都会读取每条消息一次。

如果您想让多个线程处理来自环形缓冲区的消息,有几个选项。第一个也是最好的选择是为每个读取器线程设置一个单独的 Disruptor,并让写入器以循环方式在缓冲区之间交替。如果您必须使用单个环形缓冲区(也许对事件进行排序),那么您可以设置线程 ID,该线程 ID 应该将每个事件处理到事件本身(再次以交替方式),并让与该 ID 不匹配的线程会丢弃该事件。

关于java - 破坏者消费者没有按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22664200/

相关文章:

java - 了解 Scala 中的 ArrayBuffer

java - 如何确定性地从 X/Y 坐标生成伪随机模式?

java - JUnit:是否有执行并行测试的聪明方法?

java - reactor lmax 线程转储

java - Play 框架 - 平衡异步和非阻塞

java - `Class.getDeclaredClasses()` 返回的数组中的元素是否有任何特定顺序?

java - 如何避免显示“应用程序无响应”对话框

.net - WinForms RichTextBox : how to reformat asynchronously, 没有触发 TextChanged 事件

design-patterns - 什么是 LMAX Disruptor 设计模式?