scala - 使用 Akka、SQS 和 Camel 的消费者投票率

标签 scala apache-camel akka amazon-sqs

我正在做的一个项目需要从 SQS 读取消息,我决定使用 Akka 来分发这些消息的处理。

由于 Camel 支持 SQS,并且在 Consumer 类中内置了供 Akka 使用的功能,我认为最好以这种方式实现端点和读取消息,尽管我没有看到很多人这样做的例子。

我的问题是我无法足够快地轮询我的队列以保持我的队列为空或接近空。我最初的想法是我可以让消费者以 X/s 的速度通过 Camel 从 SQS 接收消息。从那里,我可以简单地创建更多的消费者来达到我需要处理消息的速度。

我的消费者:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}

如图,我设置了delay=1以及 &maxMessagesPerPoll=10以提高消息率,但我无法使用相同的端点生成多个消费者。

我在文档中读到 By default endpoints are assumed not to support multiple consumers.我相信这也适用于 SQS 端点,因为产生多个消费者只会给我一个消费者,在运行系统一分钟后,输出消息是 Count for actor: x而不是其他输出 Count for actor: 0 .

如果这有用的话;使用当前在单个消费者上的实现,我能够每秒读取大约 33 条消息。

这是从 Akka 中的 SQS 队列读取消息的正确方法吗?如果是这样,有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近 900 条消息/秒的水平?

最佳答案

遗憾的是,Camel 目前不支持在 SQS 上并行使用消息。

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

为了解决这个问题,我编写了自己的 Actor 来使用 aws-java-sdk 轮询批处理消息 SQS。

  def receive = {
    case BeginPolling => {
      // re-queue sending asynchronously
      self ! BeginPolling
      // traverse the response
      val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
      val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
      messages.toList.foreach {
        node => {
          deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
          //log.info("Node body: {}", node.getBody)
          filterSupervisor ! node.getBody
        }
      }
      if(deleteEntryList.size() > 0){
        val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
        sqs.deleteMessageBatch(deleteMessageBatchRequest)
      }
    }

    case _ => {
      log.warning("Unknown message")
    }
  }

虽然我不确定这是否是最好的实现,而且它当然可以改进,以便请求不会不断进入空队列,但它确实适合我当前能够从同一队列轮询消息的需求。

用这个从 SQS 获得大约 133(消息/秒)/ Actor 。

关于scala - 使用 Akka、SQS 和 Camel 的消费者投票率,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19771021/

相关文章:

scala - 记录 Play!2 Scala 应用程序的请求时间

java - 像 Scala Controller 一样在 Java Controller 中呈现 JSON 响应字符串

spring - Camel , Spring ,OSGI : Is there a way to specify the stop method?

java - Camel 邮件 IMAP 不跳过失败的邮件

java - CamelContext 在初始化 PooledConnectionFactory 时关闭

scala - 如何在 actorSelection 中查找命中数

Scala - 迭代器和可迭代的惰性 - 内存消耗

scala - Play 异常: '{' expected but 'import' found

scala - 如何在 scala 中为 https 请求创建 akka-http cachedHostConnectionPool?

scala - Scala 中的多个 Actor 实现有何不同?