Java:实现多线程供应商/消费者管道,每种任务具有并行限制

标签 java threadpool producer-consumer

我们需要异步处理不同类型的对象。每种类型/类型的对象均使用 API key 进行处理。

每个 API key 都有自己的并发使用限制(例如一个 API key 不能超过 5 个并行 session )。

我们对工作线程数有全局限制(CPU 限制)。

我们希望在工作线程限制内进行尽可能多的 API 调用。

可能的解决方案:

2 tasks with KEY1 (max 2 session) -\  total 3 workers
5 tasks with KEY2 (max 3 session) -/

是:

1. worker1: KEY2, worker2: KEY2, worker3: KEY2 (in queue: 2x KEY1, 2x KEY2)
2. worker1: KEY1, worker2: KEY2, worker3: KEY2 (in queue: 1x KEY1, 3x KEY2)
3. worker1: KEY1, worker2: KEY1, worker3: KEY2 (in queue: 4x KEY2)

可能的解决方案:

3 tasks with KEY1 (max 1 session) & 3 workers

是:

1. worker1: KEY1, worker2: IDLE, worker3: IDLE, (in queue 2x KEY1)

执行顺序并不重要(但我们希望听到类似先进先出的策略),最大吞吐量是最重要的。

尚不清楚选择哪种实现策略。

ThreadExecutor 与任何队列都是不够的,因为您需要知道 ThreadExecutor 当前使用哪些 API key 。

最佳答案

我不确定我的问题是否正确,但您需要的似乎是 Semaphore对于每个 API key 。

Semaphore key1Semaphore = new Semaphore(2);
Semaphore key2Semaphore = new Semaphore(3);

您可以检查 key1Semaphore 是否具有许可,并通过调用 key1Semaphore.tryAcquire() 获取许可(如果可用)。这是非阻塞的,因此如果失败并返回 false,您可以尝试从另一个 API key 获取信号量并从中提交任务。

重要的是,在使用 API key 之一的任务结束时,信号量许可证会被释放。

您可能需要一个额外的对象来与 wait()notify() 同步,以便在任务完成时通知正在分派(dispatch)的主线程任务再次检查信号量。

所以本质上你得到的是你的任务调度程序将向你的 3 个工作线程的 ExecutorService 提交 5 个任务,然后它将无法再提交任何任务,直到其中一个信号量许可获得已发布。

当任务完成并且许可证被释放时,调度程序会收到通知,因此它会解除等待状态,并再次按顺序检查信号量并将任务提交给 ExecutorService

此解决方案有点偏向第一个 API key ,但您可以通过检查每个 key 的任务长度并更公平地分配它们来进一步完善它。您甚至可以旋转索引,以便每次循环时索引都会增加 1,这样第一次从 API KEY 1 开始,第二次从 API KEY 2 开始,依此类推。

关于Java:实现多线程供应商/消费者管道,每种任务具有并行限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55511707/

相关文章:

java - 使用Java通过网络复制文件数据非常慢

eclipse - Ant "JAVA_HOME does not point to the JDK"- 但确实如此

java - 为什么当执行器的线程出现异常时,它并没有耗尽可用线程?

java - 增加线程数量是否会使生产者消费者概率更快?

algorithm - 有人可以用 P V 形式解释生产者和消费者吗?

java - Web服务中的 session 管理?

java - 在我的 Spring 上下文文件中找不到 "ehcache: annotation-driven"定义

multithreading - 如何在 groovy 中使用多线程访问 1000 个端点?

c# - 执行多个线程

java - 生产者-消费者示例不起作用