Spring Kafka,使用嵌入式 Kafka 进行测试

标签 spring spring-boot apache-kafka spring-kafka

我们通过 Servicetest 和嵌入式 Kafka 观察到了奇怪的行为。

该测试是 Spock 测试,我们使用 JUnit 规则 KafkaEmbedded 并按如下方式传播brokersAsString:

@ClassRule
@Shared
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1)

@Autowired
KafkaListenerEndpointRegistry endpointRegistry

def setupSpec() {
    System.setProperty("kafka.bootstrapServers",  embeddedKafka.getBrokersAsString())
}

通过检查 KafkaEmbedded 的代码,使用 KafkaEmbedded(int count) 构造一个实例会导致一个 Kafka Server 每个主题有两个分区。

为了解决测试中的分区分配和服务器客户端同步问题,我们遵循 spring-kafka 的 ContainerTestUtils 类中所示的策略。

public static void waitForAssignment(KafkaMessageListenerContainer<String, String> container, int partitions)
        throws Exception {

        log.info(
            "Waiting for " + container.getContainerProperties().getTopics() + " to connect to " + partitions + " " +
                "partitions.")

        int n = 0;
        int count = 0;
        while (n++ < 600 && count < partitions) {
            count = 0;
            container.getAssignedPartitions().each {
                TopicPartition it ->
                    log.info(it.topic() + ":" + it.partition() + "; ")
            }

            if (container.getAssignedPartitions() != null) {
                count = container.getAssignedPartitions().size();
            }
            if (count < partitions) {
                Thread.sleep(100);
            }
        }
     }

当我们观察日志时,我们注意到以下模式:

2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.696  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.807  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.811  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.812  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.602  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : SyncGroup for group timeslot-service-group-06x failed due to coordinator rebalance, rejoining the group
2016-07-29 11:24:03.637  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2016-07-29 11:24:03.637  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2016-07-29 11:24:04.065  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:04.066  INFO 1160 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 50810 (http)
2016-07-29 11:24:04.073  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : Started AllocationsDeliveryZonesServiceSpec in 20.616 seconds (JVM running for 25.456)
2016-07-29 11:24:04.237  INFO 1160 --- [           main] org.eclipse.jetty.server.Server          : jetty-9.2.17.v20160517
2016-07-29 11:24:04.265  INFO 1160 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@6a8598e7{/__admin,null,AVAILABLE}
2016-07-29 11:24:04.270  INFO 1160 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@104ea372{/,null,AVAILABLE}
2016-07-29 11:24:04.279  INFO 1160 --- [           main] o.eclipse.jetty.server.ServerConnector   : Started ServerConnector@3c9b416a{HTTP/1.1}{0.0.0.0:50811}
2016-07-29 11:24:04.430  INFO 1160 --- [           main] o.eclipse.jetty.server.ServerConnector   : Started ServerConnector@7c214597{SSL-http/1.1}{0.0.0.0:50812}
2016-07-29 11:24:04.430  INFO 1160 --- [           main] org.eclipse.jetty.server.Server          : Started @25813ms
2016-07-29 11:24:04.632  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : waiting...
2016-07-29 11:24:04.662  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : Waiting for [moa] to connect to 2 partitions.^
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[moa-0]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[deliveryZipCode_v1-0]
2016-07-29 11:24:13.740  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:16.644  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:16.666  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[staggering-0]
2016-07-29 11:24:16.750  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:23.559  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.660  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.660  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.662  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.686  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[moa-0]
2016-07-29 11:24:23.686  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[deliveryZipCode_v1-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[moa-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[deliveryZipCode_v1-0]

请注意[..]指示省略的行

我们将 metadata.max.age.ms 设置为 3000 毫秒 因此,它会尝试频繁刷新元数据信息。

现在让我们困惑的是,如果我们等待两个分区连接,等待就会超时。仅当我们等待一个分区连接时,一段时间后一切都会成功运行。

我们是否理解错误的代码,嵌入式 Kafka 中每个主题有两个分区?只有一个分配给我们的监听器,这正常吗?

最佳答案

对于测试,设置 spring.kafka.consumer.auto-offset-reset=earliest 以避免竞争条件(消费者与生产者的顺序或时间)非常重要,请参阅 https://docs.spring.io/spring-kafka/reference/html/#junit

Starting with version 2.5, the consumerProps method sets the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. This is because, in most cases, you want the consumer to consume any messages sent in a test case. The ConsumerConfig default is latest which means that messages already sent by a test, before the consumer starts, will not receive those records. To revert to the previous behavior, set the property to latest after calling the method.

关于Spring Kafka,使用嵌入式 Kafka 进行测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38655780/

相关文章:

java - 如何在ServletContextListener中获取spring ApplicationContext?

spring-boot - 无法在 Vaadin 上使用 Spring session

java - 无法获得连接的 Kafka 流来运行或输出任何内容

apache-kafka - 如何为 Kafka 主题实现生存时间 (TTL)

apache-kafka - Kafka如何保证零停机和零数据丢失?

java - 使用自定义代码重新填充缓存而不是 Spring + Quartz 集成

java - 将数据库配置查询值传递给 native 查询

java - 如何使用 Spring Rest Docs 记录对象列表

java - 确保消息的有序处理

java - Spring Boot中使用gradle构建前端