Scala WindowFunction 无法编译

标签 scala apache-flink flink-streaming

我一直在使用 Apache Flink 编写原型(prototype)应用程序。在此过程中,我选择将 org.apache.flink.streaming.api.functions.windowing.WindowFunction 用于特定用例。然而,在编写 apply() 函数的主体时,我遇到了这个错误(下面的代码不是来 self 正在编写的应用程序 - 我的数据类型不同 - 它来自 Flink 文档站点中提供的示例代码):

import scala.collection.Iterable
import scala.collection.Map
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow}
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._

class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {

  def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window $window count: $count")
  }
}

编译器提示:

    Error:(16, 7) class MyWindowFunction needs to be abstract, since method apply in trait WindowFunction of type 
(x$1: String, x$2: org.apache.flink.streaming.api.windowing.windows.TimeWindow, 
x$3: Iterable[(String, Long)], 
x$4: org.apache.flink.util.Collector[String])Unit is not defined
    class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {

我已经检查了apply()中参数的顺序;他们似乎是正确的。

出于某种原因,我无法找到错误的确切来源。有人可以帮我找到解决方案吗?

最佳答案

我已经找到这个错误的原因了。

我不清楚的是 Apache Flink 的 API 需要 java.lang.Iterable,而不是其 Scala 等效项:

class MyWindowFunction extends 
      WindowFunction[(String, Long), String, String, TimeWindow] {

  override 
  def apply(
      key: String, 
      w: TimeWindow, 
      iterable: Iterable[(String, Long)],  // from java.lang.Iterable
      collector: Collector[String]): Unit = {

      // ....
  }
}

所以,我必须适本地导入:

import java.lang.Iterable   // From Java
import java.util.Map        // From Java

import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.JavaConversions._  // Implicit conversions

 class MyWindowFunction 
   extends WindowFunction[(String, Long), String, String, TimeWindow] {

   override 
   def apply(
       key: String, 
       w: TimeWindow, 
       iterable: Iterable[(String, Long)], 
       collector: Collector[String]): Unit = {

     // ....

  }
}

一切都很好!

关于Scala WindowFunction 无法编译,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39405724/

相关文章:

scala - Scala中什么时候需要新的

Scala Def Macros - 如何获取符号的参数化类型成员?

scala - 定义供 java 使用的 scala function1 类型错误

scala - Apache Flink 上的 zipWithIndex

apache-flink - 弗林克 : Stateful stream processing by key

scala - 无法使用 SBT 导入 kamon-play-26

apache-flink - Apache Flink 中是否可以延迟事件流?

apache-spark - 有什么理由选择 Flink 而不是 Spark 2.3 Structured Streaming?

java - 使用 RichAggregateFunction 时出现 Flink 错误

apache-flink - Flink taskmanager内存不足及内存配置