java - 以线程安全的方式填充映射并将该映射从后台线程传递给另一个方法?

标签 java multithreading data-structures thread-safety atomic

我有一个下面的类,其中 add 方法将由多个线程调用来填充 channelMessageHolder CHM以线程安全的方式。

在同一个类中,我有一个每 30 秒运行一次的后台线程,它通过从 channelMessageHolder< 传递数据来调用 send 方法

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>> channelMessageHolder =
                new AtomicReference<>(new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>());

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        send();
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  // this will be called by only single background thread
  private void send(ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> messageByChannels) {
    for(Entry<Channel, ConcurrentLinkedQueue<Message>> entry : messageByChannels.entrySet()) {
      Channel channel = entry.getKey();
      ConcurrentLinkedQueue<Message> messageHolder = entry.getValue();

      while (!messageHolder.isEmpty()) {
        Message message = messageHolder.poll();
        ....
        // process this and send to database
      }      
    }
  }

  // called by multiple threads
  public void add(final Channel channel, final Message message) {
    // populate channelMessageHolder in a thread safe way
  }
}

问题

如您所见,channelMessageHolder 已存在于我的 Processor 类中,因此我是否需要每 30 秒显式地从该映射传递数据到 send 方法?或者我可以直接在我的发送方法中使用它?

令人困惑的是,如果我直接在发送方法中使用它,那么它将同时由多个线程填充,这就是为什么我使用 AtomicReference 的 getAndSet 方法将其传递给 发送方法。

请告诉我我所做的事情是否错误,是否有更好的方法?

最佳答案

As you can see channelMessageHolder is already present in my Processor class so do I need to explicitly pass data from this map every 30 seconds to send method? Or I can directly use it in my send method?

您当然可以直接在 send() 方法中使用它,并且不需要 AtomicReference 包装器,因为 ConcurrentHashMap 已经同步。您需要担心的是映射中的键和值对象是否已正确同步。我假设 Channel 是不可变的,ConcurrentLinkedQueue 是并发的,所以你应该很好。

// no need for AtomicReference
private final ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>> channelMessageHolder =
     new ConcurrentHashMap<Channel, ConcurrentLinkedQueue<Message>>();

ConcurrentHashMap 会为您处理同步,以便生产者线程可以在发送者线程发送项目的同时向其中添加项目,而不会发生冲突。仅当您尝试在多个线程之间共享不同步的类时才需要 AtomicReference

Confusion is, if I directly use it in my send method, then it will be populated by multiple threads at the same time so that's why I am using getAndSet method of AtomicReference to pass it to send method.

对,但这没关系。多个线程将向 ConcurrentLinkedQueue 添加消息。每 30 秒,您的后台线程就会启动一次,获取 Channel、出队,然后发送当时队列中的消息。 ConcurrentLinkedQueue 可防止生产者和消费者的竞争条件。

您的代码中存在的问题是,它不可重入,因为它依赖于对队列的多次调用:

while (!messageHolder.isEmpty()) {
    Message message = messageHolder.poll();

它适用于您的情况,因为看起来只有一个线程出列,但以下代码更好:

while (true) {
    // only one call to the concurrent queue
    Message message = messageHolder.poll();
    if (message == null) {
        break;
    }
    ...
}

关于java - 以线程安全的方式填充映射并将该映射从后台线程传递给另一个方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42037894/

相关文章:

java - 第二个 Activity 出错 "cannot be resolved or not a field"

c++ - 使用不同线程时 Win32 API 死锁

algorithm - 最佳阅读计划

c++ - 如何在范围内声明?

java - Hadoop的默认分组比较器?

java - java中的双数字格式化

java - java中是否可以阻止类使用方法?

Java 线程和 POSIX 线程,用户级还是内核级?

java - 处理器核心数量与线程池大小的关系

c++ - 如何在不复制数据且不暴露指针的情况下向用户检索数据对象?