Java多线程消息传递

标签 java multithreading messaging

我有一个有两个线程的应用程序,第一个线程写入队列,第二个线程从中异步读取。 我需要创建第三个,生成 20 个以上。 新创建的线程将运行直到显式停止。这 20 个线程应该获取“实时”数据以便进行分析。 这 20 个中的每一个都有一个唯一的 ID/名称。我需要将相关数据(READ 线程收集的)发送到正确的线程(20 个线程中)。例如如果数据包含 id(其中)为 2 的字符串 --> 我需要将其发送到 ID =2 的线程。 我的问题是:我应该如何保存指向 20 个线程中每个线程的“指针”并向其发送相关数据? (我可以在可运行列表中搜索 id(将保存线程)--> 但随后我需要调用方法“NewData(string)”以便将数据发送到正在运行的线程)。 我该怎么做呢? TIA 帕兹

最佳答案

您可能最好使用队列与线程进行通信。然后,您可以将所有队列放在 map 中以便于访问。我会推荐一个BlockingQueue

public class Test {
  // Special stop message to tell the worker to stop.
  public static final Message Stop = new Message("Stop!");

  static class Message {
    final String msg;

    // A message to a worker.
    public Message(String msg) {
      this.msg = msg;
    }

    public String toString() {
      return msg;
    }

  }

  class Worker implements Runnable {
    private volatile boolean stop = false;
    private final BlockingQueue<Message> workQueue;

    public Worker(BlockingQueue<Message> workQueue) {
      this.workQueue = workQueue;
    }

    @Override
    public void run() {
      while (!stop) {
        try {
          Message msg = workQueue.poll(10, TimeUnit.SECONDS);
          // Handle the message ...

          System.out.println("Worker " + Thread.currentThread().getName() + " got message " + msg);
          // Is it my special stop message.
          if (msg == Stop) {
            stop = true;
          }
        } catch (InterruptedException ex) {
          // Just stop on interrupt.
          stop = true;
        }
      }
    }
  }

  Map<Integer, BlockingQueue<Message>> queues = new HashMap<>();

  public void test() throws InterruptedException {
    // Keep track of my threads.
    List<Thread> threads = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
      // Make the queue for it.
      BlockingQueue<Message> queue = new ArrayBlockingQueue(10);
      // Build its thread, handing it the queue to use.
      Thread thread = new Thread(new Worker(queue), "Worker-" + i);
      threads.add(thread);
      // Store the queue in the map.
      queues.put(i, queue);
      // Start the process.
      thread.start();
    }

    // Test one.
    queues.get(5).put(new Message("Hello"));

    // Close down.
    for (BlockingQueue<Message> q : queues.values()) {
      // Stop each queue.
      q.put(Stop);
    }

    // Join all threads to wait for them to finish.
    for (Thread t : threads) {
      t.join();
    }
  }

  public static void main(String args[]) {
    try {
      new Test().test();
    } catch (Throwable t) {
      t.printStackTrace(System.err);
    }
  }

}

关于Java多线程消息传递,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19228594/

相关文章:

java - 如何使用 createRow() 将行添加到 View 对象后删除行

java - 近似正则表达式等价

java - 手动增加 Java 应用程序使用的 CPU 量

java - 多线程 Java 代理

iOS:如何自定义JSQMessagesCollectionview?

java - 如何使用 HtmlUnit 和 java 选择特定元素?

java - helidon项目中自动添加openapi文件

java - 一个键具有多个值的并发映射,并在超时时自动删除

objective-c - 从另一个应用程序接收文本

RabbitMQ 使用自定义 header 来存储消息参数