apache-kafka - 如何修复与组协调器相关的 kafka 流问题不可用或无效,将尝试重新发现

标签 apache-kafka apache-kafka-streams

当我尝试使用 PROCESSING_GUARANTEE_CONFIG 运行 Kafka Streams 应用程序时遇到问题,对于其他情况,该应用程序等于“恰好一次语义”,例如至少一次语义,它工作得很好。

我在日志中注意到出了问题,我在这里找到了一些建议来解决这个问题,但不幸的是它没有帮助我:(

03:35:28.627 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Discovered group coordinator kafka:9093 (id: 2147483646 rack: null)
03:35:28.627 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Group coordinator kafka:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
03:35:28.628 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=to-transform] Discovered group coordinator kafka:9093 (id: 2147483646 rack: null)
03:35:28.628 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Group coordinator kafka:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
03:35:48.628 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Discovered group coordinator kafka:9093 (id: 2147483646 rack: null)
03:35:48.630 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Found no committed offset for partition topic-0
03:35:48.631 INFO  o.a.k.c.c.KafkaConsumer - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
03:35:48.631 INFO  o.a.k.s.p.i.StreamThread - stream-thread [transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
03:35:48.631 INFO  o.a.k.s.KafkaStreams - stream-client [transform-f8268b2b-4673-49ac-9396-6a2b86d45697] State transition from REBALANCING to RUNNING
03:35:48.632 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Attempt to heartbeat failed for since member id transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer-6aacbde6-4553-43ee-bc2f-2b5718e55acf is not valid.
03:35:48.632 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Found no committed offset for partition topic-0
03:35:48.633 INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Resetting offset for partition topic-0 to offset 0.
03:35:48.634 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
03:35:48.634 INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1-consumer, groupId=transform] Lost previously assigned partitions topic-0
03:35:48.634 INFO  o.a.k.s.p.i.StreamThread - stream-thread [transform-f8268b2b-4673-49ac-9396-6a2b86d45697-StreamThread-1] at state RUNNING: partitions [topic-0] lost due to missed rebalance.

例如,如果我只运行单个 kafka 代理节点,那么我必须将分区和复制配置设置为 1,第二个建议是重新启动 kafka 代理,但也没有给出任何结果

  kafka:
    image: wurstmeister/kafka:2.12-2.4.1
    ports:
      - "9092:9092"
      - "9093:9093"
    depends_on:
      - zookeeper 
    links:
      - zookeeper:zk
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENERS: OUTSIDE://kafka:9092,INSIDE://kafka:9093
      KAFKA_ADVERTISED_LISTENERS: OUTSIDE://localhost:9092,INSIDE://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_LOG_RETENTION_HOURS: 1
      KAFKA_MESSAGE_MAX_BYTES: 1048576
      KAFKA_REPLICA_FETCH_MAX_BYTES: 1048576
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 30000
      KAFKA_NUM_PARTITIONS: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_NUM_PARTITIONS: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_DELETE_RETENTION_MS: 86400000
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: topic:1:1, transform:1:1

感谢您的帮助 亲切的问候,维克多

最佳答案

造成观察到的问题的原因可能有很多。一般来说,exaclty-once 成本更高,并且会给代理和 KafkaStreams 应用程序带来更高的负载。

另请注意,如果您确实想要获得一次性处理,则应运行至少 3 个代理(并且主题应配置为复制因子 3,min-isr 为 2)。否则,EOS无法真正得到保证。

增加 commit.interval.ms 可能有助于缓解该问题。请注意,对于 EOS,它可能会导致更高的处理延迟(这就是如果启用 EOS,默认提交间隔会减少到 100 毫秒的原因)。如果您可以接受更高的延迟,您可能希望将其增加到例如 1 秒。

此外,对 EOS 进行了大量投资,新版本包含许多改进。如果可以的话,您可能想升级到即将发布的 2.6 版本并测试新的“eos_beta”处理模式(需要代理 2.5 或更高版本)。

关于apache-kafka - 如何修复与组协调器相关的 kafka 流问题不可用或无效,将尝试重新发现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63095098/

相关文章:

apache-kafka - Flink KafkaSink 启动时间较长

java - 如何将 Kafka Producer 的指标报告给 prometheus(使用 spring boot)

docker - 如何将连接器安装到 apache kafka connect 的 docker 镜像

apache-kafka - Kafka KSQL Re分区和rekey问题

java - Kafka Streams 应用程序在 kafka 服务器上打开太多文件

apache-kafka - 卡夫卡 - 经纪人 : Message size too large

apache-kafka-streams - Spring Cloud Stream 中是否可以有多个@StreamListener?

apache-kafka - 如何检测kafka主题中的重复消息?

apache-kafka-streams - Kafka 流时间和窗口到期 - KStreamSessionWindowAggregate 跳过记录

java - Kafka 流记录在窗口/聚合后不转发