我们需要异步处理不同类型的对象。每种类型/类型的对象均使用 API key 进行处理。
每个 API key 都有自己的并发使用限制(例如一个 API key 不能超过 5 个并行 session )。
我们对工作线程数有全局限制(CPU 限制)。
我们希望在工作线程限制内进行尽可能多的 API 调用。
可能的解决方案:
2 tasks with KEY1 (max 2 session) -\ total 3 workers
5 tasks with KEY2 (max 3 session) -/
是:
1. worker1: KEY2, worker2: KEY2, worker3: KEY2 (in queue: 2x KEY1, 2x KEY2)
2. worker1: KEY1, worker2: KEY2, worker3: KEY2 (in queue: 1x KEY1, 3x KEY2)
3. worker1: KEY1, worker2: KEY1, worker3: KEY2 (in queue: 4x KEY2)
可能的解决方案:
3 tasks with KEY1 (max 1 session) & 3 workers
是:
1. worker1: KEY1, worker2: IDLE, worker3: IDLE, (in queue 2x KEY1)
执行顺序并不重要(但我们希望听到类似先进先出的策略),最大吞吐量是最重要的。
尚不清楚选择哪种实现策略。
ThreadExecutor
与任何队列都是不够的,因为您需要知道 ThreadExecutor
当前使用哪些 API key 。
最佳答案
我不确定我的问题是否正确,但您需要的似乎是 Semaphore
对于每个 API key 。
Semaphore key1Semaphore = new Semaphore(2);
Semaphore key2Semaphore = new Semaphore(3);
您可以检查 key1Semaphore
是否具有许可,并通过调用 key1Semaphore.tryAcquire()
获取许可(如果可用)。这是非阻塞的,因此如果失败并返回 false,您可以尝试从另一个 API key 获取信号量并从中提交任务。
重要的是,在使用 API key 之一的任务结束时,信号量许可证会被释放。
您可能需要一个额外的对象来与 wait()
和 notify()
同步,以便在任务完成时通知正在分派(dispatch)的主线程任务再次检查信号量。
所以本质上你得到的是你的任务调度程序将向你的 3 个工作线程的 ExecutorService 提交 5 个任务,然后它将无法再提交任何任务,直到其中一个信号量许可获得已发布。
当任务完成并且许可证被释放时,调度程序会收到通知,因此它会解除等待状态,并再次按顺序检查信号量并将任务提交给 ExecutorService
。
此解决方案有点偏向第一个 API key ,但您可以通过检查每个 key 的任务长度并更公平地分配它们来进一步完善它。您甚至可以旋转索引,以便每次循环时索引都会增加 1,这样第一次从 API KEY 1 开始,第二次从 API KEY 2 开始,依此类推。
关于Java:实现多线程供应商/消费者管道,每种任务具有并行限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55511707/