我正在为以下用例寻找轻量级且高效的解决方案:
- 网关模块接收资源并交付给不同的接受者。
- 为每个接受器排队的资源(按到达顺序)。
- 清除进程会扫描这些队列,如果资源可供某个接受器使用,那么他会将它们捆绑在某个标签(唯一 ID)下,并发送新 bundle 可用的通知。
系统特点:
- 接受者的数量是动态的。
- 一个 bundle 中的资源数量没有限制。
该模块将在 Java 7 下的 Tomcat 7 中使用(非集群)。
我考虑了以下解决方案:
- JMS - 每个接受器的动态队列配置,是否可以消耗队列中的所有可用消息?每个队列的线程配置(不可扩展)?
- AKKA Actor 。没有找到合适的使用模式。
- 朴素的纯 Java 实现,其中队列将由一个线程扫描(循环)。
我认为这是讨论此问题的可用解决方案的正确场所。 请在考虑以下几点时分享您的想法:
- 合适的第三方框架。
- 资源队列可扩展扫描。
提前致谢。
最佳答案
您可以使用各种技术,例如:
JMS 动态队列
扩展 LMAX 干扰器(例如 https://github.com/hicolour/disruptor-ext )
但出于高可用性和可扩展性的原因,您应该使用Akka
Akka
实现的起点将是 Akka 中内置的一致性哈希路由算法 - 简而言之,这种类型的路由逻辑根据提供的 key 选择一致的路由。与您的问题描述相比的路线是接受者。
路由器参与者有两种不同的风格,这为您提供了在基础设施中部署新接受器的灵活机制。
池 - 路由器将路由创建为子参与者,并在它们终止时将其从路由器中删除。
组 - 路由参与者在路由器外部创建,路由器使用参与者选择将消息发送到指定路径,而不监视终止。
首先请阅读Akka路由文档,以更好地了解Akka框架中的路由实现:
您还可以查看这篇有关可扩展和高可用性系统设计的文章:
Q1 Actor 是否有可能知道他的路线(他的哈希 key )?
Actor 可能知道当前处理哪个键,因为它可能只是消息的一部分 - 但您不应该基于此键构建跨消息逻辑/状态。
消息:
import akka.routing.ConsistentHashingRouter.ConsistentHashable
class Message(key : String) extends ConsistentHashable with Serializable {
override def consistentHashKey(): AnyRef = key
}
Actor :
import akka.actor.{Actor, ActorLogging}
class EchoActor extends Actor with ActorLogging {
log.info("Actor created {}", self.path.name)
def receive = {
case message: Message =>
log.info("Received message {} in actor {}", message.consistentHashKey(), self.path.name)
case _ => log.error("Received unsupported message");
}
}
Q2 Actor 可以管理邮箱以外的状态吗?
Actor 状态只能通过它们之间发送的消息进行更改。
如果您将初始化包含对经典 java/spring/.. bean 的引用的 actor,它将能够与非 actor 世界/状态进行交互,例如。 dao 层,但这种类型的集成应该尽可能受到限制,并被视为反模式。
Q3 有没有办法使用防碰撞的配置?
作为 API 使用者,您需要自己定义防碰撞
模型,但 Akka 再次提供了执行此操作所需的基础设施。
在大多数情况下, key 将是域的一部分,例如。拍卖id、客户id
如果需要按需生成 key ,您可以使用 ClusterSingleton 与 Persistence扩展名。
Generator
可以是负责生成唯一ID的Actor
,其他actor可以使用ask
模式获取新的id。
ClusterSingleton
使用ClusterSingletonManager
初始化并使用ClusterSingletonProxy
获取
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Generator]),
singletonName = "gnerator",
terminationMessage = End,
role = Some("generator")),
name = "singleton")
system.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/singleton/generator",
role = Some("generator")),
name = "generatorProxy")
关于java - Java消息调度系统设计,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27101934/