java - Kafka Streams - 处理超时

标签 java apache-kafka apache-kafka-streams

我正在尝试使用 <KStream>.process()TimeWindows.of("name", 30000)批处理一些 KTable 值并发送它们。似乎 30 秒超过了消费者超时间隔,在此之后 Kafka 认为该消费者已失效并释放分区。

我已经尝试提高轮询提交间隔的频率来避免这种情况:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

不幸的是,这些错误仍在发生:

(很多)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

接下来是这些:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

显然,我需要更频繁地将心跳发送回服务器。怎么办?

我的拓扑是:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

KTable 每 30 秒按键对值进行分组。在 Processor.init() 我调用 context.schedule(30000) .

DBProcessorSupplier 提供了一个 DBProcessor 实例。这是一个 AbstractProcessor 的实现,其中提供了所有覆盖。他们所做的只是记录,所以我知道每个人何时被击中。

这是一个非常简单的拓扑,但很明显我在某处遗漏了一步。


编辑:

我知道我可以在服务器端进行调整,但我希望有一个客户端解决方案。我喜欢分区在客户端退出/死亡时很快可用的概念。


编辑:

为了简化问题,我从图中删除了聚合步骤。现在只是消费者->处理器()。 (如果我将消费者直接发送到 .print(),它会很快工作,所以我知道没问题)。 (同样,如果我通过 .print() 输出聚合(KTable),似乎也可以)。

我发现 .process() - 应该调用 .punctuate()每 30 秒实际上会阻塞不同的时间长度并随机输出(如果有的话)。

进一步:

我将调试级别设置为“调试”并重新运行。我看到很多消息:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

但是.punctuate()中的一个断点功能没有受到打击。所以它做了很多工作,但没有给我机会使用它。

最佳答案

一些说明:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG 是提交间隔的下限,即提交后,下一次提交不会在此时间之前发生。基本上,Kafka Stream 会尝试在这段时间过后尽快提交,但无法保证下一次提交实际需要多长时间。
  • StreamsConfig.POLL_MS_CONFIG 用于内部KafkaConsumer#poll() 调用,指定poll() 的最大阻塞时间打电话。

因此,这两个值都无助于更频繁地心跳。

Kafka Streams 在处理记录时遵循“深度优先”策略。这意味着,在对每条记录执行 poll() 之后,将执行拓扑的所有运算符。假设您有三个连续的映射,那么在处理下一个/第二个记录之前,将为第一个记录调用所有三个映射。

因此,在第一个 poll() 的所有记录得到完全处理后,将进行下一个 poll() 调用。如果您想更频繁地进行心跳,则需要确保单个 poll() 调用获取较少的记录,以便处理所有记录花费更少的时间,并且下一个 poll() 会提前触发。

您可以使用 KafkaConsumer 的配置参数,您可以通过 StreamsConfig 指定它来完成此操作(参见 https://kafka.apache.org/documentation.html#consumerconfigs ):

streamConfig.put(ConsumerConfig.XXX, VALUE);

  • max.poll.records:如果您减小此值,将轮询较少的记录
  • session.timeout.ms:如果你增加这个值,会有更多的时间来处理数据(添加这个是为了完整性,因为它实际上是一个客户端设置而不是服务器/代理端配置 - - 即使您知道此解决方案但不喜欢它 :))

EDIT

As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(), respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

再补充一点:这个问题中描述的场景是一个已知问题,并且已经有KIP-62。这为发送心跳的 KafkaConsumer 引入了一个后台线程,从而将心跳与 poll() 调用分离。 Kafka Streams 将在即将发布的版本中利用这一新功能。

关于java - Kafka Streams - 处理超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39232395/

相关文章:

c# - 为什么要用容器来存放观察者?

docker - 运行Confluent docker 时记录

apache-kafka - 有没有办法确定 Kafka 主题中消息的来源?

java - 使用 Kafka Streams 进行自定义转换

java - 将 Kafka 流输入打印到控制台?

java - 如果在 Java 中的局部变量范围之外使用,列表将显示为 null

java - 使用 Apache HttpClient 4 的抢先式基本身份验证

apache-kafka-streams - Kafka Stream 0.10.2.0 状态存储在存储值时获取异常

java - Sqoop:错误 manager.SqlManager:从数据库读取时出错:java.sql.SQLException:

apache-kafka - Kafka 忽略生产者的 `transaction.timeout.ms`