背景:我需要以异步方式向 WebSocket 客户端发送许多小尺寸消息。消息通常在高峰期发送,因此在暂停一段时间后我需要快速发送约 5000 条消息。所以问题是:
- 我不想在单线程中启动 5000 个异步
- 我不想串行循环“启动异步”-“等待完成”5000 次
- 我不想使用 5000 个线程,每个线程只有一个“启动异步”-“等待完成”
最好的方法是将每个线程大约 20 个异步分组,因此我需要非常具体的队列:
- 很多表示队列中并发推送/轮询
- 小型异步意味着我想批量轮询,例如每个队列 1 到 20 条消息
take()
(这样我就可以启动 1...20 异步 I/O 并等待单线程完成) - 立即表示我不想等到 20 条消息被轮询,只有当队列有大量消息时才应使用bundle-poll。应立即轮询并发送单个消息。
所以基本上:我需要像队列这样的结构,在单个阻塞调用中具有阻塞 take(1 到 X) 等待元素。伪代码:
[each of ~50 processing threads]:
messages = queue.blockingTake( max 10 or at least 1 if less than 10 available );
for each message: message.startAsync()
for each message: message.waitToComplete()
repeat
最佳答案
如果不是真的有必要,我不会从头开始实现队列。如果您有兴趣,可以提供一些想法:
Queue> 如果只有 1 个线程执行报价。如果您有更多,则必须同步集合。例如,一个提供者 peek()-s 进入队列,发现最后一个集合包含太多元素,因此它创建一个新集合并提供它。
或
许多正在运行的线程,其中可运行对象从队列中一一获取元素。
或
每个发送线程 1 个队列,如果保留队列引用,则可以以循环方式向每个队列添加元素。
或
对您选择的 BlockingQueue 进行子类化,并使用正常 take() 的重写版本创建一个“Collection take(int i)”方法。
关于java - 如何创建 Java 并发队列,以便我们可以在单次调用中阻塞获取超过 1 个元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22506680/