apache-flink - 如何迭代 Flink DataStream 中的每条消息?

标签 apache-flink flink-streaming

我有一个来自 Kafka 的消息流,如下所示

DataStream<String> messageStream = env
  .addSource(new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));

如何迭代流中的每条消息并对其执行某些操作?我看到一个iterate()方法DataStream但它不会返回 Iterator<String> .

最佳答案

我认为您正在寻找MapFunction

DataStream<String> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(topic, new MsgPackDeserializer(), props));

DataStream<Y> mappedMessages = messageStream
  .map(new MapFunction<String, Y>() {
    public Y map(String message) {
      // do something with each message and return Y
    }
  });

如果您不想为每条传入消息发出恰好一条记录,请查看 FlatMapFunction

关于apache-flink - 如何迭代 Flink DataStream 中的每条消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40392632/

相关文章:

java - Flink流, 'sum'到底是做什么的?

java - 在 Apache Flink Broadcast 流中应用基于窗口的规则

hdfs - Flink RocksDB 性能问题

apache-flink - DataStream#assignAscendingTimestamps 的实际用途是什么

apache-flink - Flink 如何在 S3 中将 DataSet 写成 Parquet 文件?

apache-flink - 泛型参数的 Flink Scala API 函数

apache-flink - 如何对数据集批处理的第一个元组字段元素进行 keyBy

java - 弗林克 : Cluster Execution error of loss of Taskmanager

apache-flink - Apache Flink 中的周期性水印和标点水印有什么区别?

java - 将 TemporalTableFunction 注册为函数时出现编译器错误