java - 如何创建 Java 并发队列,以便我们可以在单次调用中阻塞获取超过 1 个元素?

标签 java concurrency websocket queue producer-consumer

背景:我需要以异步方式向 WebSocket 客户端发送许多小尺寸消息。消息通常在高峰期发送,因此在暂停一段时间后我需要快速发送约 5000 条消息。所以问题是:

  • 我不想在单线程中启动 5000 个异步
  • 我不想串行循环“启动异步”-“等待完成”5000 次
  • 我不想使用 5000 个线程,每个线程只有一个“启动异步”-“等待完成”

最好的方法是将每个线程大约 20 个异步分组,因此我需要非常具体的队列:

  • 很多表示队列中并发推送/轮询
  • 小型异步意味着我想批量轮询,例如每个队列 1 到 20 条消息 take() (这样我就可以启动 1...20 异步 I/O 并等待单线程完成)
  • 立即表示我不想等到 20 条消息被轮询,只有当队列有大量消息时才应使用bundle-poll。应立即轮询并发送单个消息。

所以基本上:我需要像队列这样的结构,在单个阻塞调用中具有阻塞 take(1 到 X) 等待元素。伪代码:

[each of ~50 processing threads]:
messages = queue.blockingTake( max 10 or at least 1 if less than 10 available );
for each message: message.startAsync()
for each message: message.waitToComplete()
repeat

最佳答案

如果不是真的有必要,我不会从头开始实现队列。如果您有兴趣,可以提供一些想法:

Queue> 如果只有 1 个线程执行报价。如果您有更多,则必须同步集合。例如,一个提供者 peek()-s 进入队列,发现最后一个集合包含太多元素,因此它创建一个新集合并提供它。

许多正在运行的线程,其中可运行对象从队列中一一获取元素。

每个发送线程 1 个队列,如果保留队列引用,则可以以循环方式向每个队列添加元素。

对您选择的 BlockingQueue 进行子类化,并使用正常 take() 的重写版本创建一个“Collection take(int i)”方法。

关于java - 如何创建 Java 并发队列,以便我们可以在单次调用中阻塞获取超过 1 个元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22506680/

相关文章:

python - Urllib 和并发 - Python

file - Micropython网络服务器: Serve large textfiles without memory allocation failure

拉拉维尔。每个用户的多个 channel 或个人 channel ?

perl - AnyEvent->计时器不能与 AnyEvent::Handle 一起使用?

java - "Design for Extension"原理与访问器(getters)/修改器(setters)意识形态之间的冲突

java - Retrofit2 和 converter-gson : Expected BEGIN_OBJECT but was STRING at line 1 column 1 path $

java - 优化线程池大小

java - 引入ExecutorService后TextView setText不起作用

java - 如何引用数组的一部分?

java - dex 文件不包含 MainActivity 类