apache-kafka - flink kafka 生产者和消费者中恰好一次

标签 apache-kafka apache-flink flink-streaming

我正在尝试在Flink-Kafka集成中实现Exactly-Once语义。我的生产者模块如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
    env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
    env.getCheckpointConfig.enableExternalizedCheckpoints(
      ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
    //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart

    val myProducer = new FlinkKafkaProducer[String](
      "topic_name", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config 

消费者模块:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)
  

我正在生成一些记录并将其推送给该制作人。记录格式如下:

1  
2  
3  
4  
5  
6  
..  
..  

等等。因此,假设在推送此数据时,生产者能够将数据推送到第 4 条记录,并且由于某些故障而发生故障,那么当它再次启动并运行时,它会从第 5 条记录开始推送记录吗?我的属性(property)足够吗?

我将按照 this link 在消费者端添加一个属性第一个用户提到的。我是否也应该在生产者端添加幂等属性?

我的 Flink 版本是 1.13.5Scala 2.11.12,我使用的是 Flink Kafka 连接器 2.11 >.

我认为我无法使用EXACTLY_ONCE提交事务,因为检查点未写入上述路径。附上 Web UI 的屏幕截图:

enter image description here

enter image description here

我需要为此设置任何属性吗?

最佳答案

对于消费者端,Flink Kafka Consumer 会记录分布式检查点中的当前偏移量,如果消费者任务失败,它将从最新的检查点重新启动,并从检查点中记录的偏移量重新发出。例如,假设最新的检查点记录了偏移量 3,之后 flink 继续发出 4、5,然后进行故障转移,那么 Flink 将继续发出从 4 开始的记录。注意,这不会导致重复,因为所有运算符的状态都是也回退到处理记录3后的状态。

对于生产者端,Flink 使用两阶段提交[1]来实现恰好一次。大致上Flink Producer会依赖Kafka的事务来写入数据,只有事务提交后才正式提交数据。用户可以使用 Semantics.EXACTLY_ONCE 来启用此功能。

[1] https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

[2] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#fault-tolerance

关于apache-kafka - flink kafka 生产者和消费者中恰好一次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70622321/

相关文章:

java - kafka 和 kafka-clients 有什么区别?

scala - Flink Word Count Example 缺少一个库

apache-flink - Apache Flink : Using filter() or split() to split a stream?

amazon-s3 - 无法执行HTTP请求: Timeout waiting for connection from pool in Flink

amazon-web-services - 使用 IAM 的 AWS MSK Spring Boot 应用程序示例

apache-kafka - 有没有办法在特定的偏移量处停止 Kafka 消费者?

java - 如何使用 Avro 二进制编码器对 Kafka 消息进行编码/解码?

apache-flink - NOT followBy 的 Apache Flink CEP 模式操作

scala - 弗林克 : How to write DataSet to a variable instead of to a file

scala - Apache 弗林克 : Creating a Lagged Datastream