java - Spring Kinesis 消费者太慢

标签 java spring spring-cloud amazon-kinesis spring-cloud-stream

在 Windows 7、Java 8 上运行的消费者。

消费者在 1-5 秒内阅读 1 篇信息。我的设置有什么问题?

消费者:

@EnableBinding({Sink.class})
@SpringBootApplication
public class SpringCloudStreamKinesisConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamKinesisConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void logger(String payload) {
        System.out.println("consumer received: " + payload);
    }
}

消费者应用程序.yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test_stream
          content-type: application/json
          consumer:
            idleBetweenPolls: 250

消费者项目是同一项目中的一个模块,它继承自以下pom:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kinesis</artifactId>
    <version>1.0.0.M1</version>
</dependency>

Kinesis Binder 忽略这些设置:

enter image description here

我在调试时手动更改了此参数,但消费者在 10 秒 +- 内收到 1-2 个元素。有什么问题?

最佳答案

这确实是个问题,@Stav Alfi。

在为 KinesisExtendedBindingProperties 注入(inject)修复了 KinesisMessageChannelBinder 之后,我看到了正确的属性填充。

因此配置应该是这样的:

spring:
  cloud:
    stream:
      kinesis:
        bindings:
          input:
            consumer:
              idleBetweenPolls: 250

我会尽快修复上述问题。希望我们今天能发布 M2。

感谢您的耐心等待!

关于缓慢的消费速度:不要忘记您向 Kinesis 流发送消息的速度有多快,并且还要牢记生产者和消费者之间确实存在一些延迟。换句话说,AWS Kinesis 上存在一些合理的延迟以使记录可供使用。

参见 AWS FAQ了解更多信息:

You can continuously add various types of data such as clickstreams, application logs, and social media to an Amazon Kinesis data stream from hundreds of thousands of sources. Within seconds, the data will be available for your Amazon Kinesis Applications to read and process from the stream.

关于java - Spring Kinesis 消费者太慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49229500/

相关文章:

spring-cloud - `spring-cloud-starter-eureka-server` 和 `spring-cloud-starter-netflix-eureka-server` 之间的区别

java - 在 Java 中从线程创建进程有意义吗?

java - 如何将 JSF 消息编码设置为 UTF-8?

java - 如何处理 MaxUploadSizeExceededException

java - org.springframework.web.servlet.DispatcherServlet noHandlerFound 404错误响应

java - spring cloud配置客户端配置刷新不起作用

spring-boot - 在Spring Cloud Config中,curl客户端如何工作?

java - 使用 Selenium 和 JUnit 解析 HTML 文档中的链接

java - Proguard 只保留类名以及类中的所有成员和方法

java - Spring 附加数据到当前事务