我使用以下组件构建了一个 Spring Boot Kinesis Consumer:
- Spring Boot(版本 - 2.1.2.RELEASE)
- Spring Cloud(版本 - Greenwich.RELEASE)
- Spring Cloud Stream Kinesis Binder(版本 - 1.1.0.RELEASE)
我使用来自 具有 1 个分片的 kinesis 流的事件。此外,这个 Spring Boot 消费者应用程序正在 Pivotal Cloud Foundry Platform 中运行。
在发布这个问题之前,我在本地(使用kinesalite)和PCF(使用kinesalite)尝试了该场景。您能确认一下我的理解是否正确吗?我浏览了 Spring Cloud Stream 文档( https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/ 和 https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc )。尽管文档很详尽,但没有详细解释并发性和高可用性。
假设我有 3 个消费者实例部署到 PCF(通过在 cf 推送期间使用的 manifest.yml 文件中将实例属性设置为 3)。
所有 3 个实例都具有以下属性:
spring.cloud.stream.bindings..consumer.concurrency=5
spring.cloud.stream.bindings..group=my-consumer-group
spring.cloud.stream.kinesis.binder.checkpoint.table=my-metadata-dynamodb-table
spring.cloud.stream.kinesis.binder.locks.table=my-locks-dynamodb-table
假设事件由生产者按此顺序发送到 kinesis
event5(流中的最新事件)- event4 - event3 - event2 - event1(流中的第一个事件)
对于这样的配置,我在下面解释了我的理解。您能确认一下这是否正确吗?
- 在给定时间点只有一个消费者实例处于 Activity 状态,它将处理发送到 kinesis 流的所有事件(因为该流只有一个分片)。仅当主实例关闭时,其他 2 个实例之一才会接管控制权。此配置是为了确保高可用性并保留消息的顺序。
- 由于实例数量是在 PCF 的 Manifest.yml 中设置的,因此我无需担心设置 spring.cloud.stream.instanceCount 或 spring.cloud.stream.bindings..consumer.instanceCount 属性。
- 当 Spring Boot 消费者启动/启动时,5 个消费者线程处于 Activity 状态(因为并发设置为 5)。现在事件按照上面解释的顺序被消耗。线程 1 获取事件 1。当线程 1 仍在积极处理事件 1 时,另一个线程只是选择并开始处理流中的下一个事件(线程 2 处理事件 2 等等...)。虽然在这种情况下事件的顺序被保留(事件 1 总是在事件 2 之前被拾取,依此类推......),但不能保证线程 1 将在线程 2 之前完成事件 1 的处理。
- 当所有 5 个线程都忙于处理流中的 5 个事件时,如果有新事件说 event6 和 event7 进入,则使用者必须等待线程变得可用。假设,线程 3 已处理完事件 3,而其他线程仍在忙于处理事件,线程 3 将拾取事件 6 并开始处理,但事件 7 仍未拾取,因为没有可用线程。
- 默认情况下,并发度设置为 1。如果您的业务要求要求您在处理下一个事件之前完成第一个事件的处理,则并发度应为 1。在这种情况下,您会影响吞吐量。您一次只能消费一个事件。但是,如果吞吐量很重要并且您希望在给定时间点处理多个事件,则应将并发性设置为所需的值。增加分片数量也是一种选择,但作为消费者,如果您不能要求增加分片数量,那么这是实现并行性/吞吐量的最佳选择。
最佳答案
请参阅 KinesisMessageDrivenChannelAdapter
中的并发
选项 JavaDocs:
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
因此,由于该流中只有一个分片,因此将只有一个 Activity 线程在该单个分片上迭代 ShardIterator
。
关键是我们总是必须在单个线程中处理来自单个分片的记录。这样我们保证了正确的顺序,并且针对最高序列号完成了检查点。
请详细了解什么是 AWS Kinesis 及其工作原理。
关于java - Spring Cloud Stream Kinesis Binder - 并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56572653/