我正在尝试在Flink-Kafka
集成中实现Exactly-Once
语义。我的生产者模块如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
env.getCheckpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart
val myProducer = new FlinkKafkaProducer[String](
"topic_name", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config
消费者模块:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)
我正在生成一些记录并将其推送给该制作人。记录格式如下:
1
2
3
4
5
6
..
..
等等。因此,假设在推送此数据时,生产者能够将数据推送到第 4 条记录,并且由于某些故障而发生故障,那么当它再次启动并运行时,它会从第 5 条记录开始推送记录吗?我的属性(property)足够吗?
我将按照 this link 在消费者端添加一个属性第一个用户提到的。我是否也应该在生产者端添加幂等
属性?
我的 Flink
版本是 1.13.5
、Scala 2.11.12
,我使用的是 Flink Kafka 连接器 2.11
>.
我认为我无法使用EXACTLY_ONCE
提交事务,因为检查点未写入上述路径。附上 Web UI 的屏幕截图:
我需要为此设置任何属性吗?
最佳答案
对于消费者端,Flink Kafka Consumer 会记录分布式检查点中的当前偏移量,如果消费者任务失败,它将从最新的检查点重新启动,并从检查点中记录的偏移量重新发出。例如,假设最新的检查点记录了偏移量 3,之后 flink 继续发出 4、5,然后进行故障转移,那么 Flink 将继续发出从 4 开始的记录。注意,这不会导致重复,因为所有运算符的状态都是也回退到处理记录3后的状态。
对于生产者端,Flink 使用两阶段提交[1]来实现恰好一次。大致上Flink Producer会依赖Kafka的事务来写入数据,只有事务提交后才正式提交数据。用户可以使用 Semantics.EXACTLY_ONCE
来启用此功能。
[1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
关于apache-kafka - flink kafka 生产者和消费者中恰好一次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70622321/