java - RabbitMQ 示例 : Multiple Threads, channel 和队列

标签 java multithreading rabbitmq messaging channel

我刚看完 RabbitMQ's Java API docs ,并发现它非常翔实和直截了当。如何设置简单的示例 Channel用于发布/消费非常容易理解和理解。但这是一个非常简单/基本的例子,它给我留下了一个重要的问题:如何设置 1+ Channels向多个队列发布/消费?

假设我有一个 RabbitMQ 服务器,上面有 3 个队列:logging , security_eventscustomer_orders .所以我们要么需要一个 Channel能够发布/使用所有 3 个队列,或者更有可能拥有 3 个独立的 Channels ,每个专用于一个队列。

最重要的是,RabbitMQ 的最佳实践要求我们设置 1 Channel每个消费者线程。对于这个例子,假设 security_events只有 1 个消费者线程没问题,但是 loggingcustomer_order两者都需要 5 个线程来处理音量。所以,如果我理解正确,这是否意味着我们需要:

  • 1 Channel和 1 个消费者线程,用于在 security_events 之间发布/消费;和
  • 5 Channels和 5 个消费者线程,用于在 logging 之间发布/消费;和
  • 5 Channels和 5 个消费者线程,用于在 customer_orders 之间发布/消费?

  • 如果我的理解在这里被误导,请首先纠正我。无论哪种方式,一些厌倦了战斗的 RabbitMQ 老手都可以吗用一个体面的代码示例帮助我“连接点”,以在此处设置满足我要求的发布商/消费者? 提前致谢!

    最佳答案

    我认为您对初步理解有几个问题。坦率地说,看到以下内容我有点惊讶:both need 5 threads to handle the volume .你是如何确定你需要那个确切的数字的?你有任何保证 5 个线程就足够了吗?

    RabbitMQ is tuned and time tested, so it is all about proper design and efficient message processing.



    让我们尝试查看问题并找到合适的解决方案。顺便说一句,消息队列本身不会提供任何保证您有真正好的解决方案。您必须了解自己在做什么,并进行一些额外的测试。

    您肯定知道有许多布局可能:

    enter image description here

    我将使用布局 B作为最简单的说明方式 1生产商N消费者问题。由于您非常担心吞吐量。顺便说一句,正如您可能期望的那样,RabbitMQ 表现得很好( source )。关注 prefetchCount ,我稍后会讲到:

    enter image description here

    因此,消息处理逻辑很可能是确保您拥有足够吞吐量的正确位置。当然,每次需要处理消息时,您都可以跨越一个新线程,但最终这种方法会杀死您的系统。基本上,你会得到更多的线程,你会得到更大的延迟(如果你愿意,你可以检查 Amdahl's law)。

    enter image description here

    (见 Amdahl’s law illustrated)

    提示 #1:小心线程,使用 ThreadPools ( details )

    A thread pool can be described as a collection of Runnable objects (work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add a new Runnable object to the work queue.


    public class Main {
      private static final int NTHREDS = 10;
    
      public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
        for (int i = 0; i < 500; i++) {
          Runnable worker = new MyRunnable(10000000L + i);
          executor.execute(worker);
        }
        // This will make the executor accept no new threads
        // and finish all existing threads in the queue
        executor.shutdown();
        // Wait until all threads are finish
        executor.awaitTermination();
        System.out.println("Finished all threads");
      }
    } 
    

    提示 #2:小心消息处理开销

    我会说这是明显的优化技术。您很可能会发送小而易于处理的消息。整个方法是关于连续设置和处理较小的消息。大消息最终会开玩笑,所以最好避免这种情况。

    enter image description here

    所以最好发送一小段信息,但是处理呢?每次提交作业都会产生开销。在高传入消息率的情况下,批处理非常有用。

    enter image description here

    例如,假设我们有简单的消息处理逻辑,并且我们不希望每次处理消息时都有线程特定的开销。为了优化那个很简单的CompositeRunnable can be introduced :
    class CompositeRunnable implements Runnable {
    
        protected Queue<Runnable> queue = new LinkedList<>();
    
        public void add(Runnable a) {
            queue.add(a);
        }
    
        @Override
        public void run() {
            for(Runnable r: queue) {
                r.run();
            }
        }
    }
    

    或者以稍微不同的方式做同样的事情,通过收集要处理的消息:
    class CompositeMessageWorker<T> implements Runnable {
    
        protected Queue<T> queue = new LinkedList<>();
    
        public void add(T message) {
            queue.add(message);
        }
    
        @Override
        public void run() {
            for(T message: queue) {
                // process a message
            }
        }
    }
    

    通过这种方式,您可以更有效地处理消息。

    Tip #3:优化消息处理

    尽管您知道可以并行处理消息( Tip #1 )并减少处理开销( Tip #2 ),但您必须快速完成所有工作。冗余的处理步骤、繁重的循环等可能会极大地影响性能。请参阅有趣的案例研究:

    enter image description here

    Improving Message Queue Throughput tenfold by choosing the right XML Parser

    提示 #4:连接和 channel 管理
  • 在现有连接上启动新 channel 涉及一个网络
    往返 - 开始一个新的连接需要几个时间。
  • 每个连接都使用服务器上的文件描述符。 channel 没有。
  • 在一个 channel 上发布大消息将阻止连接
    当它熄灭时。除此之外,多路复用是相当透明的。
  • 如果服务器被阻止,正在发布的连接可能会被阻止
    重载 - 将发布和消费分开是个好主意
    联系
  • 准备好处理消息突发

  • ( source )

    请注意,所有提示都可以完美地协同工作。如果您需要其他详细信息,请随时告诉我。

    完整的消费者示例 ( source )

    请注意以下事项:
  • channel.basicQos(预取) - 正如你之前看到的 prefetchCount可能非常有用:

    This command allows a consumer to choose a prefetch window that specifies the amount of unacknowledged messages it is prepared to receive. By setting the prefetch count to a non-zero value, the broker will not deliver any messages to the consumer that would breach that limit. To move the window forwards, the consumer has to acknowledge the receipt of a message (or a group of messages).

  • ExecutorService 线程Executor - 您可以指定正确配置的执行程序服务。

  • 例子:
    static class Worker extends DefaultConsumer {
    
        String name;
        Channel channel;
        String queue;
        int processed;
        ExecutorService executorService;
    
        public Worker(int prefetch, ExecutorService threadExecutor,
                      , Channel c, String q) throws Exception {
            super(c);
            channel = c;
            queue = q;
            channel.basicQos(prefetch);
            channel.basicConsume(queue, false, this);
            executorService = threadExecutor;
        }
    
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            Runnable task = new VariableLengthTask(this,
                                                   envelope.getDeliveryTag(),
                                                   channel);
            executorService.submit(task);
        }
    }
    

    您还可以检查以下内容:
  • Solution Architecting Using Queues?
  • Some queuing theory: throughput, latency and bandwidth
  • A quick message queue benchmark: ActiveMQ, RabbitMQ, HornetQ, QPID, Apollo…
  • 关于java - RabbitMQ 示例 : Multiple Threads, channel 和队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18531072/

    相关文章:

    Java : My error is a cannot find symbol in run method for Threads?

    node.js - 通过 RabbitMQ 发送数组

    java - Spring Boot 中所有 @RabbitListener-s 使用 RabbitMQ 消息

    python - puka 可以与 gevent 或 eventlet 配合使用吗?

    java - Spring Data Repository 方面的建议不起作用

    基于java Tile的游戏ArrayIndexOutOfBoundsException错误

    java - IntelliJ Idea,如何从控制台删除java文件目录?

    java - 匈牙利算法死胡同

    javascript - 浏览器中的并行性

    c# - .NET4.0 : Thread-Safe Updating of ConcurrentDictionary<TKey, T值>