apache-kafka - 斯卡拉 : Cannot resolve overloaded methods (Flink WatermarkStrategy)

标签 apache-kafka apache-flink flink-streaming

我正在关注 Flink 关于如何将 WatermarkStrategy 与 KafkaConsumer 一起使用的文档。代码如下所示

val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(
  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))

val stream: DataStream[MyType] = env.addSource(kafkaSource)
每当我尝试编译上面的代码时,我都会收到一条错误消息
错误:重载方法值assignTimestampsAndWatermarks 与替代:
error: overloaded method value assignTimestampsAndWatermarks with alternatives:
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String] <and>
[ERROR]   (x$1: org.apache.flink.api.common.eventtime.WatermarkStrategy[String])org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase[String]
[ERROR]  cannot be applied to (org.apache.flink.api.common.eventtime.WatermarkStrategy[Nothing])
[ERROR]         consumer.assignTimestampsAndWatermarks(

最佳答案

下面的代码返回 WatermarkStrategyy[Nothing] 而不是 WatermarkStrategy[String]

  WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20)))
我通过使用此代码解决了这个问题
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
watermark: Watermark[String] = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))
kafkaSource.assignTimestampsAndWatermarks(watermark)

关于apache-kafka - 斯卡拉 : Cannot resolve overloaded methods (Flink WatermarkStrategy),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64974237/

相关文章:

apache-flink - Flink Savepoint 数据文件夹丢失

apache-kafka - Kafka Brokers 的 SSD 还是 HDD? (为 Kafka 使用 SSD)

performance - 端到端测试大数据管道的工具?

command-line - Kafka/PubSub 连接器 : Example pipeline: ERROR Task Converting byte[], 无法识别的 token ,需要 ('true'、 'false' 或 'null')

apache-flink - 如何在 DataStream API 中添加自定义运算符

serialization - Flink流: Unexpected charaters in serialized String messages

java - 如何在 Apache Flink 中注册 java.util.List 类型

spring-boot - Spring Boot + Kafka + Kerberos 配置

apache-flink - Flink 任务槽在设置算子并行度大于默认并行度时分布不均匀

apache-spark - 基于流的应用程序中的受控/手动错误/恢复处理