java - Kafka Streams 应用程序死亡,错误代码为 "StreamsException: Could not create internal topics."

标签 java apache-kafka apache-kafka-streams

我正在评估 Kafka 流并制作了一个简单的应用程序并让它运行了一夜。我在 2 个实例上运行它,每个实例有 1 个流线程。我有一个 2 个代理 Kafka 集群。

流配置:

private Map<String, Object> settings() {

    Map<String, Object> settings = new HashMap<>();
    settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "fare_tracker");
    settings.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverAddress + ":" + serverPort);
    settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
    settings.put(StreamsConfig.STATE_DIR_CONFIG, directoryName);
    settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
    settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, AvroTimeStampExtractor.class);
    settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
    settings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);

    settings.put("schema.registry.url", "http://zookeeper1:8081");

    settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.COMPRESSION_TYPE_CONFIG), "snappy");
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRIES_CONFIG), 3);
    settings.put(StreamsConfig.producerPrefix(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), 500);

    return settings;
}

它在启动后大约 12 小时就死掉了,堆栈跟踪如下:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create internal topics.
        at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:81)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:628)
        at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:382)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:343)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:501)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:451)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:433)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

我发现了一些相关的警告和错误日志。

{
  "@timestamp": "2017-06-07T05:44:26.996+05:30",
  "@version": 1,
  "message": "Got error produce response with correlation id 198191 on topic-partition fare_tracker-small_window-changelog-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION",
  "logger_name": "org.apache.kafka.clients.producer.internals.Sender",
  "thread_name": "kafka-producer-network-thread | fare_tracker-9e0a04f4-c1cc-4b61-8ca5-8bf25f18549f-StreamThread-1-producer",
  "level": "WARN",
  "level_value": 30000
}

^这似乎是一个常见的网络问题,我为生产者配置了 3 次重试。

Application1 死亡并显示以下日志:

{
  "@timestamp": "2017-06-07T06:20:35.122+05:30",
  "@version": 1,
  "message": "stream-thread [StreamThread-1] Failed to commit StreamTask 2_61 state: ",
  "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000,
  "stack_trace": org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:702)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:581)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124)
    at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296)
    at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
    at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
}

{
  "@timestamp": "2017-06-07T06:20:35.236+05:30",
  "@version": 1,
  "message": "Bootstrap broker kafka2:9092 disconnected",
  "logger_name": "org.apache.kafka.clients.NetworkClient",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}
{
  "@timestamp": "2017-06-07T06:20:36.100+05:30",
  "@version": 1,
  "message": "Could not create internal topics: Found only 1 brokers,  but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
  "logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}
{
  "@timestamp": "2017-06-07T06:20:36.914+05:30",
  "@version": 1,
  "message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
  "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}

应用程序 2 终止并显示以下日志:

{
  "@timestamp": "2017-06-07T06:20:06.254+05:30",
  "@version": 1,
  "message": "Could not create internal topics: Found only 1 brokers,  but replication factor is 2. Decrease replication factor for internal topics via StreamsConfig parameter \"replication.factor\" or add more brokers to your cluster. Retry #4",
  "logger_name": "org.apache.kafka.streams.processor.internals.InternalTopicManager",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}
{
  "@timestamp": "2017-06-07T06:20:07.041+05:30",
  "@version": 1,
  "message": "stream-thread [StreamThread-1] Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING.",
  "logger_name": "org.apache.kafka.streams.processor.internals.StreamThread",
  "thread_name": "StreamThread-1",
  "level": "WARN",
  "level_value": 30000
}

我检查了我的其他应用程序,虽然它们运行良好,但我在与上面大约相同的时间多次看到以下日志。

    {
      "@timestamp": "2017-06-07T06:10:34.962+05:30",
      "@version": 1,
      "message": "Publishing to kafka failed ",
      "thread_name": "kafka-producer-network-thread | producer-1",
      "level": "ERROR",
      "level_value": 40000,
      "stack_trace": org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
    at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:255)
    at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
    at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
    at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time

org.apache.kafka.common.errors.TimeoutException: Expiring 30 record(s) for fareAlertDispatch-36 due to 424387 ms has passed since batch creation plus linger time
    }

最佳答案

您说您有 2 个 Kafka 代理,但其中一条错误消息包含以下信息:

Could not create internal topics: Found only 1 brokers, but replication factor is 2.

您的应用程序和 Kafka 代理之间(或许还有 Kafka 代理本身之间)似乎存在网络连接问题。如果此类网络问题持续较长时间,最终尝试与 Kafka 代理通信的应用程序迟早会失败(取决于其设置)。

关于java - Kafka Streams 应用程序死亡,错误代码为 "StreamsException: Could not create internal topics.",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44405726/

相关文章:

java - 我们可以以线程安全的方式从方法返回 Function 吗?

spring-boot - 找不到 kafka-clients-test.jar (org.apache.kafka :kafka-clients:0. 10.2-kafka-2.2.0)

java - 卡夫卡 : consume all messages on demand

apache-kafka-streams - java.lang.NoSuchMethodError : org. kafka 流中的 apache.kafka.common.metrics.Sensor.add 错误

java - 用于添加全局存储的 Kafka 流用例

java - 返回所有矩形的并集

java - Maven 构建在 IntelliJ 内部和命令行上失败

java - 防止 Java 中的 SSL 证书验证

java - 卡夫卡输出流

apache-kafka - 卡夫卡流 : RocksDB TTL