apache-kafka - Flink KafkaSink 启动时间较长

标签 apache-kafka apache-flink flink-streaming

在使用 KafkaSink 时,我的 Flink 应用程序出现了令人讨厌的行为。如果我的应用程序包含一个到 Kafka 的接收器(EXACTLY_ONCE 交付),则需要很长时间才能启动,如果我删除 Kafka 接收器(留下其他接收器)或用打印替换它,则应用程序只需几秒钟即可启动。 在任务管理器日志中,我看到数千个重复的行,如下所示:

2023-04-05 14:01:25,828 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14788 with epoch 9
2023-04-05 14:01:25,828 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:25,829 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:25,932 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15843 with epoch 8
2023-04-05 14:01:25,932 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:25,933 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,035 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16840 with epoch 7
2023-04-05 14:01:26,035 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,036 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,139 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14813 with epoch 6
2023-04-05 14:01:26,139 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,140 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,244 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16858 with epoch 5
2023-04-05 14:01:26,244 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,245 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,348 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14832 with epoch 4
2023-04-05 14:01:26,348 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,349 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,451 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15886 with epoch 3
2023-04-05 14:01:26,452 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,453 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,555 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16891 with epoch 2
2023-04-05 14:01:26,555 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,556 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,659 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14860 with epoch 1
2023-04-05 14:01:26,660 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,660 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,766 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15917 with epoch 0
2023-04-05 14:01:26,767 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,767 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,870 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14713 with epoch 23
2023-04-05 14:01:26,870 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,871 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,974 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15752 with epoch 22
2023-04-05 14:01:26,974 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,975 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16751 with epoch 21
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,180 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14724 with epoch 20
2023-04-05 14:01:27,180 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,181 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,284 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15768 with epoch 19
2023-04-05 14:01:27,284 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,285 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,387 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16765 with epoch 18
2023-04-05 14:01:27,387 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,388 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,492 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14737 with epoch 17
2023-04-05 14:01:27,492 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,493 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,596 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15785 with epoch 16
2023-04-05 14:01:27,596 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,599 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,702 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16782 with epoch 15
2023-04-05 14:01:27,702 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,703 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,815 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15798 with epoch 14
2023-04-05 14:01:27,815 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,816 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,919 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14759 with epoch 13
2023-04-05 14:01:27,919 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,920 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,022 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16803 with epoch 12
2023-04-05 14:01:28,023 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,024 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,126 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15820 with epoch 11
2023-04-05 14:01:28,126 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,127 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:28,230 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14782 with epoch 10
2023-04-05 14:01:28,230 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,231 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,333 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16825 with epoch 9
2023-04-05 14:01:28,333 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,334 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,436 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15844 with epoch 8
2023-04-05 14:01:28,436 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,437 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:28,542 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14806 with epoch 7
2023-04-05 14:01:28,543 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,544 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,646 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16851 with epoch 6
2023-04-05 14:01:28,646 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,647 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,749 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15869 with epoch 5
2023-04-05 14:01:28,749 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,751 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,853 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16868 with epoch 4
2023-04-05 14:01:28,853 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,854 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,956 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15887 with epoch 3
2023-04-05 14:01:28,956 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,957 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,060 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14846 with epoch 2
2023-04-05 14:01:29,060 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,061 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16900 with epoch 1
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,268 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15918 with epoch 0
2023-04-05 14:01:29,268 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,269 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,374 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16748 with epoch 22
2023-04-05 14:01:29,374 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,375 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,478 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14721 with epoch 21
2023-04-05 14:01:29,478 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,479 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15763 with epoch 20
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,684 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16761 with epoch 19
2023-04-05 14:01:29,685 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,685 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,788 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14734 with epoch 18
2023-04-05 14:01:29,788 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,790 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,895 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15779 with epoch 17
2023-04-05 14:01:29,895 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,896 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)

有时重复20分钟甚至更长时间......

我尝试移除水槽,应用程序会在几秒钟内启动,但我丢失了输出。 我还尝试将接收器传送保证更改为 AT_LEAST_ONCE,它似乎更快(约 1 分钟),但仍然有很多日志...

这是 Sink 配置(一些与身份验证相关的属性被隐藏并通过 appConfiguration 对象加载)

KafkaSink deviceDaySink = KafkaSink.<DeviceDayTimeTuple>builder()
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(appConfiguration.getString("device-day")
                        .setValueSerializationSchema(new SerializeJsonDeviceDayTime())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setKafkaProducerConfig(appConfiguration.getKafkaProps())
                .setProperty("transactional.id", "tid")
                .setProperty("client.id", "cid")
                .setProperty("transaction.timeout.ms", "30000")
                .build();

编辑附加信息(4 月 12 日): 至于现在我们每 30 秒设置一次检查点,该主题之前已经由同一个应用程序编写过,但如果我们重新创建该主题,问题是相同的。 通过kafka的CLI脚本kafka-transactions.sh我们注意到flink应用程序在启动时打开了很多空事务:

kafka-sink-0-3394   3           18862       Empty               
kafka-sink-0-4122   3           21106       Empty               
kafka-sink-0-6148   3           21793       Empty               
kafka-sink-0-4366   3           21189       Empty               
kafka-sink-0-2584   3           18585       Empty               
kafka-sink-0-2340   3           18501       Empty 
kafka-sink-0-30     3           16458       Empty
kafka-sink-0-4816   3           21341       Empty 
kafka-sink-0-308    3           16553       Empty
kafka-sink-0-2872   3           18682       Empty 
kafka-sink-0-4654   3           21288       Empty 
kafka-sink-0-3600   3           18931       Empty               
kafka-sink-0-4898   3           21369       Empty 
kafka-sink-0-146    3           16498       Empty
kafka-sink-0-5626   3           21616       Empty               
kafka-sink-0-3844   3           21014       Empty  
kafka-sink-0-4492   3           21231       Empty 
kafka-sink-0-272    3           16541       Empty 
kafka-sink-0-5220   3           21478       Empty 
kafka-sink-0-5464   3           21561       Empty  
kafka-sink-0-3682   3           18958       Empty 

这些只是全部的一部分,transactionalId 和 ProducerID 都与日志文件中的匹配。

最佳答案

问题在于 Flink 如何管理 Kafka 事务。对于常规生产者,当它想要使用事务时,它将使用 set transactionalId 初始化事务。 Kafka 将响应 producentIdepoch (这是一个从0开始的迭代器)。

这是为了确保只有最新的生产者才能真正将数据输出到Kafka。任何具有相同trasactionalId的生产商但具有较低的epoch超过最新分配的将被隔离,并且不允许产生交易蛋白。这也意味着给定的生产者只有一笔交易(由 transactionalId 标识)。

然而,Flink 的做法有点不同。对于每个检查点,它将创建一个新的 transactionalId以及与之相关的新交易。该交易的名称将为 <transactionalIdPrefix>-<subtask id>-<checkpoint id> 。因此它将生成类似 kafka-sink-0-1 的交易, kafka-sink-0-2

Kafka中的事务信息持久化在主题中,并存储在当前Kafka事务领导者缓存中。通常,交易主题的保留期为 1 周。因此,对于单个生产者和单个实例的 1 分钟检查点,将会有超过 10k 的事务。当然,我们希望它们全部位于 Empty 中。或CompleteCommited状态。

Flink 在 Kafka 生产者启动时所做的事情是首先确保没有正在进行的事务。这是在 org.apache.flink.connector.kafka.sink.TransactionAborter#abortTransactionOfSubtask 中完成的。这段代码的作用是简单地构造 transactionalId<transactionalIdPrefix>-<subtask id>-<current checkpoint id>开始并迭代直到 Kafka 返回 epoch等于 0因为这意味着这是 Kafka 第一次看到这样的情况 transactionalId 。就可以initTransaction这使得 Kafka 用相同的 transactionalId 刷新先前的事务并将其保留在Empty中状态。

这对于常规操作来说没有问题,因为可以预期当前检查点 ID 是最新的检查点。然而,如果您从先前的检查点开始或从新的状态开始,情况就不会如此。在这种情况下,您的当前检查点 ID 将是 1所以将从 1 开始并将继续到该集群中创建的最后一个事务。

每次 Flink 启动时都会执行此操作,直到检查点 id 通过最后一个事务。

据我所知,Kafka 中没有 API 可以让您删除特定的 transationalId 。中止它只会留下 CompleteAborted 。您的选择是:

  1. 每当您想以新的 Flink 状态开始时,您都应该更改 transactionalIdPrefix 。这仅在您清理状态时才有效,因为如果您更改了 transactionalIdPrefix Flink 知道前一笔交易是什么,并且无论如何都会扫描这些交易。
  2. 清除 Kafka 事务状态主题并强制领导者更改,以便它将刷新主题缓存。
  3. 仅从状态中删除特定交易并强制领导者变更。
PS。此外,如果您针对共享 Kafka 运行 Flink 的多个本地实例,请确保每个人都有唯一的 transactionalIdPrefix .

当您的检查点失败时,也会发生此扫描,因为它会从上一个成功的检查点开始扫描。

关于apache-kafka - Flink KafkaSink 启动时间较长,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75940903/

相关文章:

bigdata - 如果 kafka 的一个副本宕机以跟上复制因子,kafka 会创建一个新的追随者吗

java - Spring Boot Embedded Kafka无法连接

scala - Flink : How to convert the deprecated fold to aggregrate?

scala - 使用 scala 在 Flink 中进行实时流预测

java - 在 Apache Flink 中将多个数据集传输到下一次迭代

java - Flink : Left joining a stream with a static list

ubuntu - 如何在Ubuntu系统启动时自动启动Kafka?

java - 如何解析来自Storm方案类中kafka主题的json数据?

java - Flink 是否提供 Java API 来向 JobManager 提交作业?

apache-flink - Apache Flink : guideliness for setting parallelism?