java - 使用执行器处理 Java 中流的子流

标签 java multithreading concurrency java.util.concurrent threadpoolexecutor

我有一个程序可以处理通过网络传入的大量数据流(不是 java.util.stream 的意思,而是 InputStream)。该流由对象组成,每个对象都有一种子流标识符。现在整个处理都是在单线程中完成的,但是需要大量的CPU时间,而且每个子流可以很容易地独立处理,所以我正在考虑多线程。

但是,每个子流都需要保留大量庞大的状态,包括各种缓冲区、 HashMap 等。没有特别的理由使其并发或同步,因为子流彼此独立。此外,每个子流都要求其对象按照到达的顺序进行处理,这意味着每个子流可能应该有一个线程(但可能一个线程处理多个子流)。

我正在考虑几种方法,但它们不太优雅。

  1. 为所有任务创建一个ThreadPoolExecutor。每个任务将包含下一个要处理的对象以及对保留所有状态的 Processor 实例的引用。这将确保必要的先发生关系,从而确保处理线程将看到该子流的最新状态。据我所知,这种方法无法确保同一子流的下一个对象将在同一线程中处理。此外,它需要一些保证,对象将按照它们进入的顺序进行处理,这将需要 Processor 对象的额外同步,从而引入不必要的延迟。

  2. 手动创建多个单线程执行器以及一种将子流标识符映射到执行器的 HashMap 。这种方法需要手动管理执行器,在新的子流开始或结束时创建或关闭它们,并相应地在它们之间分配任务。

  3. 创建一个自定义执行程序,用于处理任务的特殊子类,每个任务都有一个子流 ID。该执行器将使用它作为提示,使用与具有相同 ID 的前一个任务相同的线程来执行此任务。但是,我没有看到实现此类执行器的简单方法。不幸的是,似乎不可能扩展任何现有的执行器类,并且从头开始实现执行器有点矫枉过正。

  4. 创建一个ThreadPoolExecutor,但不是为每个传入对象创建一个任务,而是为每个子流创建一个长时间运行的任务,该任务会阻塞在并发队列中,等待对于下一个对象。然后根据对象的子流ID将对象放入队列中。这种方法需要与子流一样多的线程,因为任务将被阻塞。预计子流数量约为30-60,因此可以接受。

  5. 或者,按照 4 中的方式继续,但限制线程数量,将多个子流分配给单个任务。这是 2 和 4 之间的混合。据我所知,这是其中最好的方法,但它仍然需要在任务之间进行某种手动子流分配,并需要某种方法来关闭额外的任务,如子流结束。

确保每个子流在自己的线程中处理且没有大量容易出错的代码的最佳方法是什么?这样下面的伪代码就可以工作:

// loop {
    Item next = stream.read();
    int id = next.getSubstreamID();
    Processor processor = getProcessor(id);
    SubstreamTask task = new SubstreamTask(processor, next, id);
    executor.submit(task); // This makes sure that the task will
                           // be executed in the same thread as the
                           // previous task with the same ID.
// } // loop

最佳答案

我建议使用一组单线程执行器。如果您可以为子流设计一致的哈希策略,则可以将子流映射到各个线程。例如

final ExecutorsService[] es = ...

public void submit(int id, Runnable run) {
   es[(id & 0x7FFFFFFF) % es.length].submit(run);
}

key 可以是Stringlong,但可以通过某种方式来识别子流。如果您知道某个特定的子流非常昂贵,您可以为其分配一个专用线程。

关于java - 使用执行器处理 Java 中流的子流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33992822/

相关文章:

c++ - 使用 if 语句的奇怪行为

java - 数组的读锁

jenkins - 如何限制 Jenkins 并发多分支管道构建?

Java 和 SQLite 以及 Android : Array of different objects

Java - HashMap.get() 调用有多昂贵?

java - 多线程 Java - 连接

java - 通过扩展 Thread 类或实现 Runnable 创建线程哪个更可取?

java - 处理共享资源上的信号量冲突的最佳实践是什么?

java - 如何使用 apache POI java 库取消合并电子表格中的单元格

java - 使用 TCP 处理多线程