amazon-web-services - 同一 Kinesis 流的多个不同使用者

标签 amazon-web-services publish-subscribe amazon-kinesis

我有一个 Kinesis 生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,对于给定的主题/流,具有单个发布者的发布/订阅。我还想利用检查点来确保每个消费者处理写入流的每条消息。

最初,我为所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动了多个消费者,我就开始收到以下错误:

com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)



这似乎是因为消费者在使用相同的应用程序名称时与他们的检查点发生冲突。

从阅读文档来看,使用检查点进行发布/订阅的唯一方法似乎是每个消费者应用程序都有一个流,这要求每个生产者了解所有可能的消费者。这比我想要的更紧密;这真的只是一个队列。

似乎 Kafka 支持我想要的东西:给定主题/分区的任意消费,因为消费者完全可以控制他们自己的检查点。如果我想要带有检查点的发布/订阅,我唯一的选择是转移到 Kafka 还是其他一些选择?

我的 RecordProcessor 代码,在每个消费者中都是相同的:
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
  log.trace("Received record(s) from kinesis")
  for {
    record <- processRecordsInput.getRecords
    json   <- jawn.parseByteBuffer(record.getData).toOption
    msg    <- decode[T](json.toString).toOption
  } yield subscriber ! msg
  processRecordsInput.getCheckpointer.checkpoint()
}

该代码解析消息并将其发送给订阅者。现在,我只是将所有消息标记为已成功接收。我可以在 AWS Kinesis 仪表板上看到正在发送的消息,但没有读取发生,大概是因为每个应用程序都有自己的 AppName 并且没有看到任何其他消息。

最佳答案

支持您想要的模式,即从一个 Kinesis 流的一个发布者到多个消费者的模式。每个消费者不需要单独的流。

你是怎样做的?您需要为每个使用者提供不同的应用程序名称。这样,一个消费者的检查点信息不会与另一个消费者的信息发生冲突。

检查对此的第一个回复:https://forums.aws.amazon.com/message.jspa?messageID=554375

关于amazon-web-services - 同一 Kinesis 流的多个不同使用者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39167880/

相关文章:

join - CloudFormation 加入标签

session - Node.js:无法在 Redis pubsub 事件中更新 session ?

amazon-web-services - 将 AWS Lambda 数据推送到 Kinesis Stream

ios - AWS Cognito 用户池 - "Value null at ' userName' 未能满足约束 : Member must not be null"- iOS

ios - IOS从AWS S3下载图像

java - 如何在 REST API 中拥有订阅者?

events - 使用Node.js监听Redis上的按键事件

amazon-kinesis - Kinesis 消耗滞后监控

spring - 使用 Spring Cloud 函数 AWS Adapter 运行 AWS lambda 函数时,组件 bean 不会被 spring 注入(inject)( Autowiring )

amazon-web-services - Lambda 的 aws-sdk 不是最新的吗?