Java,在多线程环境下通过散列统一划分传入的工作

标签 java multithreading math hash concurrency

我已经实现了一个 java 代码来执行传入的任务(如 Runnable ),其中有 n 个线程基于它们的 hashCode 模块 nThreads .理想情况下,工作应该均匀地分布在这些线程中。 具体来说,我们有一个 dispatchId作为每个任务的字符串。

这是这个 java 代码片段:

int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads
Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks
...
Worker getWorker(String dispatchId) { // Get a thread for this Task
    return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads];
}

重要:在大多数情况下,dispatchId 是:

String dispatchId = 'SomePrefix' + counter.next()

但是,我担心除以 nThreads 的模数不是一个好的选择,因为 nThreads 应该是质数,以便更均匀地分配 dispatId 键。

关于如何更好地传播工作,还有其他选择吗?

更新 1:

每个Worker都有一个队列: Queue<RunnableWrapper> tasks = new ConcurrentLinkedQueue();

worker 从中获取任务并执行它们。可以从其他线程将任务添加到此队列。

更新 2:

具有相同 dispatchId 的任务可以多次进来,因此我们需要通过 dispatchId 找到他们的线程.

最重要的是,每个 Worker 线程必须按顺序处理其传入的任务。于是,上面的update 1中就有了数据结构Queue。

更新 3: 此外,一些线程可能很忙,而其他线程则空闲。因此,我们需要以某种方式将队列与线程分离,但要为相同的 dispatchId 保持 FIFO 顺序。用于任务执行。

解决方案: 我已经实现了 Ben Manes 的想法(他在下面的回答),可以找到代码 here .

最佳答案

听起来您需要每个调度 ID 进行 FIFO 排序,因此理想的做法是将调度队列作为抽象。这可以解释您对散列不提供统一分布的担忧,因为一些调度队列可能比其他调度队列更活跃,并且在 worker 之间不公平地平衡。通过将队列与工作线程分开,您可以保留 FIFO 语义并均匀分布工作。

提供此抽象的非 Activity 库是 HawtDispatch .它与 Java 6 兼容。

一个非常简单的 Java 8 方法是使用 CompletableFuture作为排队机制,ConcurrentHashMap用于注册,执行器(例如 ForkJoinPool )用于计算。参见 EventDispatcher为了实现这个想法,注册是明确的。如果您的调度员更加动态,那么您可能需要定期修剪 map 。基本思路如下。

ConcurrentMap<String, CompletableFuture<Void>> dispatchQueues = ...

public CompletableFuture<Void> dispatch(String queueName, Runnable task) {
  return dispatchQueues.compute(queueName, (k, queue) -> {
    return (queue == null)
        ? CompletableFuture.runAsync(task)
        : queue.thenRunAsync(task);
  });
}

更新(JDK7)

上述想法的反向移植将用 Guava 翻译成类似的东西,

ListeningExecutorService executor = ...
Striped<Lock> locks = Striped.lock(256);
ConcurrentMap<String, ListenableFuture<?>> dispatchQueues = ...

public ListenableFuture<?> dispatch(String queueName, final Runnable task) {
  Lock lock = locks.get(queueName);
  lock.lock();
  try {
    ListenableFuture<?> future = dispatchQueues.get(queueName);
    if (future == null) {
      future = executor.submit(task);
    } else {
      final SettableFuture<Void> next = SettableFuture.create();
      future.addListener(new Runnable() {
        try {
          task.run();
        } finally {
          next.set(null);
        }
      }, executor);
      future = next;
    }
    dispatchQueues.put(queueName, future);
  } finally {
    lock.unlock();
  }
}

关于Java,在多线程环境下通过散列统一划分传入的工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29889885/

相关文章:

java - hibernate validator : Must I use Maven?

Java线程加入后卡住

java - Camunda BPM 的 JavaDelegate 类应该是线程安全的吗?

c - 如何计算覆盖矩形的最小数量的固定半径圆的圆心坐标?

c# - N阶贝塞尔曲线

algorithm - 递归时间复杂度计算

java - 使用定界符将 Java 字符串拆分为两个字符串

java - Effective Java 声称 elements.clone() 就足够了

java - 无法从 sqlite 表获取记录,出现 ResultSet Closed 错误

multithreading - 在主线程上等待回调方法