java - 使用Java线程池,如何根据消息特性串行处理某些消息,并行处理其他消息?

标签 java multithreading

这更多的是一个Java并发设计问题。我正在开发一个需要为许多不同客户端处理许多消息的应用程序。如果两条消息具有不同的客户端名称,则可以并行处理它们。但是,如果它们具有相同的客户端名称,则需要按顺序依次处理。

实现这一点的最佳方法是什么?

我当前的实现非常简单:我编写了一个名为 OrderedExecutorPool 的包装类。它有一个单线程执行器列表。在其提交方法中,它执行以下操作来确定将任务提交给哪个执行器:

int executorNum = Math.abs(clientName.hashCode()) % numExecutors;
executorList.get(executorNum).submit(task);

这确保了具有相同客户端的所有消息都发送到相同的执行器,同时仍然支持并行处理不同客户端的消息。

此设计存在几个问题:

1.) 如果大多数客户端名称具有相同的哈希码,则只有少数执行程序正在工作

2.) 如果一个客户端有很多消息,则只有一个执行器可能无法跟上

是否有一个优雅的解决方案可以解决上述问题?

编辑 clientName 只是一个字符串。我只是调用 String.hashCode() 方法。

最佳答案

据我所知,没有jdk内置解决方案。我已经使用这个基本逻辑在当前的工作中实现了一个自定义执行器解决方案。

  • 保留客户端名称到工作队列的内部映射(每个客户端都有自己的队列)
  • 当客户收到工作时,将其添加到他们的队列中
    • 如果这是队列中的第一个作业,请为此 clientname/queue 创建一个 Runnable 并将其推送到“真正的”执行器(标准 jdk 线程池)
  • Runnable impl 只是消耗单个客户端队列中的任务,直到空为止,然后退出

这个简单的实现是“贪婪”方法(客户端将继续工作直到其队列为空)。如果您的客户端多于底层线程,您可能需要一种更“公平”的方法,其中客户端执行一定数量的任务,并且它们在底层执行器中重新排队(从而允许其他客户端完成一些工作)。

关于java - 使用Java线程池,如何根据消息特性串行处理某些消息,并行处理其他消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25812721/

相关文章:

JavaFX 通过透明节点将鼠标事件传递给子节点

java - JavaPoet如何生成没有名字的注解?

multithreading - QMutex 卡在锁定状态

c# - 尝试将行添加到数据表时线程进入等待或 sleep 状态

Java在线程中处理输入流

java - 生成给定字符串的所有排列

java - 如何让 Eclipse 知道 Maven 存储库中的 OSGI 包(本地和|远程)

Python - 多线程/多处理

c++ - 我必须运行 cmake 两次才能编译项目

java - 删除 HashSet 上的失败,尽管 equals 并已实现代码