apache-camel - Akka 本身支持集成模式吗?

标签 apache-camel akka actor thread-synchronization enterprise-integration

我是 Akka 新手,正在尝试弄清楚它是否具有对 Enterprise Integration Patterns (EIP) 的内置支持或者我是否需要将这种类型的路由/集成委托(delegate)给像 Camel 这样的框架.

在我的用例中,我有一个从源(文件)读取二进制样本的参与者;这个参与者被称为Sampler。然后,SamplerSample 实例(消息)传递到名为 SampleProcessors 的参与者字段。每个示例处理器对给定的示例执行不同的操作。根据处理器处理 Sample 的结果,它可能需要路由到 1+ 个其他 SampleProcessor,或者可能所有处理都已结束。根据具体的 SampleProcessor 以及给定 Sample 的具体性质,Sample 可能需要多播到其他列表收件人SampleProcessors

这对我来说就像 Camel 。

所以我问:

  • Akka 是否内置对路由、广播、多播和其他 EIP 的支持(如果有,它们是什么以及它们的文档在哪里)?
  • 或者,我应该尝试将 Actor 系统与 Camel 集成,在这种情况下,会是什么样子?我知道有一个 Camel-Akka 组件,但我相信这只是为了将 Camel 总线与 Actor 系统集成(而我想要一个服务总线在我的 Actor 系统内)
  • 或者,我应该在这里做我自己的本土 EIP/actor 连接吗?

最佳答案

Akka 本身并不支持 EIP,但有多种方法可以实现它。

无论如何,如果你想要一些方便的 DSL,有一个比 EIP 更好的主意 - 因为它是用 GoF-patterns 完成的。 ,您可以用函数组合 + Functors (map) 和 Monads (flatMap) 替换(实现)大多数 EIP 模式。换句话说,您可以将输入流视为无限集合。所以,

  • 处理器成为函数
  • 管道成为仿函数,例如 val output1 = input.map(processor1).map(processor2)
  • 路由器和过滤器成为... monad(filter基于flatMap):

    val fork1 = output1.filter(routingCondition1).map(...)

    val fork2 = output1.filter(routingCondition2).map(...)

  • 分割为flatMap:input.flatMap(x => Stream(x.submsg1, x.submsg2))

  • 聚合器变成变形,又名折叠(累加器通常应该由一些存储支持)

这种基于流的工作流程已经在 Akka 中实现,它被称为 Akka Streams ,这是 Reactive Streams 的实现,另见thisthat文章。

另一种选择是按原样使用 Akka,actor 保证顺序处理,因此您可以通过创建 actor 链来实现管道:

class Processor1(next: ActorRef) extends Actor {
   def receive = {
      case x if filterCondition => 
      case x => next ! process(x)
   }
}

val processor2 = system.actorOf(Props[Processor2])
val processor1 = system.actorOf(Props[Processor1], processor2)

如果您需要路由 - 只需两个“下一个”

class Router(next1: ActorRef, next2: ActorRef) extends Actor {
   def receive = {
      case x if filterCondition => 
      case x if cond1 => next1 ! process(x)
      case x if cond2 => next2 ! process(x)
   }
}

如果您需要保证路线之间没有比赛 - 请参阅this answer 。当然,你会失去整个 DSL 的想法,直接使用 actor。

附注是的,您仍然可以使用 Camel 作为端点 - Akka 有一些 support为了那个原因。您可以使用 Akka 作为服务激活器。

关于apache-camel - Akka 本身支持集成模式吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28178055/

相关文章:

java - CamelContext camelContext = SpringCamelContext.springCamelContext(appContext, false);在 Camel 版本 2.17.3 中弃用

jpa - Camel JPA - 没有名为 camel 的 EntityManager 的持久性提供程序

java - 如何配置Camel的RedeliveryPolicy retriesExhaustedLogLevel?

distributed-computing - AMQP 和 ZeroMQ 的区别

java - 使用 Props 初始化 actor

c++ - 无法在 C++ Actor Framework 中声明模板类型的 actor

java - JMS 如何与服务总线相关

akka - 如何在 Akka 中测试 Actor 未终止

remoting - 使用 Akka 启动多个远程服务器

scala - Akka中Actorref.tell和inbox.send的区别