scala - Flink : No operators defined in streaming topology.无法执行

标签 scala apache-flink flink-streaming

我正在尝试设置一个非常基本的flink作业。当我尝试运行时,出现以下错误:

Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
    at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)

该错误是由以下代码引起的:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")

streamExecutionEnvironment.execute("Test Job")

当我在流的末尾添加print()调用时,错误消失了:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()

streamExecutionEnvironment.execute("Test Job")

我对print()为什么可以解决此问题感到困惑。在引入接收器之前,流拓扑不会处理其任何运算符的想法吗? print()在这里充当接收器吗?任何帮助,将不胜感激。谢谢。

最佳答案

在编程语言理论中,惰性求值或按需调用是一种评估策略,它可以延迟对表达式的求值,直到需要其值为止,并且还避免了重复求值。懒惰评估的反义词是急切评估,有时也称为严格评估。
惰性评估的好处包括:

  • 将控制流(结构)定义为抽象的能力
    而不是原始。
  • 定义潜在无限数据结构的能力。这个
    允许更直接地实现某些算法。
  • 通过避免不必要的计算并避免不必要的计算来提高性能
    计算复合表达式时的错误条件。

  • 延迟评估可以减少内存占用,因为在需要时会创建值。但是,由于操作顺序变得不确定,因此惰性评估很难与诸如异常处理和输入/输出之类的命令性功能结合使用。

    通常,Flink将操作分为两类:转换操作和接收操作。就像您猜到的那样,Flink转换是惰性的,这意味着直到调用接收器操作才执行它们。

    Flink programs are regular programs that implement transformations on distributed collections (e.g., filtering, mapping, updating state, joining, grouping, defining windows, aggregating). Collections are initially created from sources (e.g., by reading from files, Kafka topics, or from local, in-memory collections). Results are returned via sinks, which may, for example, write the data to (distributed) files, or to standard output (for example, the command line terminal).

    关于scala - Flink : No operators defined in streaming topology.无法执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54977290/

    相关文章:

    apache-flink - Flink CEP 状态存储

    arrays - 在 Scala 中为数组添加值

    scala - Apache Flink 上的 zipWithIndex

    apache-flink - 从 IDE 运行 Flink 时如何启动 Flink 作业管理器 Web 界面

    python - Flink Slots/Parallelism 与最大 CPU 能力

    java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念

    kubernetes - 在 kubernetes 上持续部署有状态的 apache flink 应用程序

    scala - 使用跳过或跨步在范围内触发聚合函数

    scala - 使用 DataFrame API 时,自联接无法按预期工作

    scala - Neo4j - Cypher 与 Scala-Neo4j API