scala - Apache Flink - 无法使用 Int 或 Long 泛型参数编译 KeyedOneInputStreamOperatorTestHarness

标签 scala apache-flink

当我想使用 KeyedOneInputStreamOperatorTestHarness[K, IN, OUT] 并以 Int 或 Long 作为关键 (K) 参数时,下面提到的 Apache Flink 测试代码无法在 Scala 中编译。可以使用 STRING、INTEGER 或 java.lang.Long 类型的泛型参数来编译相同的代码:

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.streaming.api.scala._
import org.junit.Test

case class TaxiRide(rideId: Int)

class RideKeySelector extends KeySelector[TaxiRide, Int] {
  override def getKey(in: TaxiRide): Int = in.rideId
}

class RideTests {
  private def setupHarness(function: KeyedProcessFunction[Int, TaxiRide, Int]): KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] = {
    val operator: KeyedProcessOperator[Int, TaxiRide, Int] = new KeyedProcessOperator(function)

    val testHarness: KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] =
      new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)

    testHarness.setup()
    testHarness.open()

    testHarness
  }
}

构建错误:

type mismatch;
 found   : org.example.RideKeySelector
 required: org.apache.flink.api.java.functions.KeySelector[org.example.TaxiRide,Int]
      new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)

我从以下 Maven 原型(prototype)创建了该项目:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-quickstart-scala</artifactId>
    <version>1.14.2</version>
</dependency>

我在 IntelliJ IDEA 中向 pom.xml 添加了以下测试依赖项:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-test-utils_2.11</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>    
        <classifier>tests</classifier>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.14.2</version>
        <scope>test</scope>
        <classifier>tests</classifier>
    </dependency>
</dependencies>

我尝试用 lambda 替换键选择器:

val testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)

构建错误更改为:

overloaded method constructor KeyedOneInputStreamOperatorTestHarness with alternatives:
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: org.apache.flink.runtime.operators.testutils.MockEnvironment)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT]
 cannot be applied to (org.apache.flink.streaming.api.operators.KeyedProcessOperator[Int,org.example.TaxiRide,Int], org.example.TaxiRide => Integer, org.apache.flink.api.common.typeinfo.TypeInformation[Integer])
     testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)

看起来它不能接受Scala类型,为什么会这样?

最佳答案

参数化类型与一个名为 type variance 的概念相关联。 .

当你有 Wrapper它采用类型参数 A ,然后是你的Wrapper可以是 invariant , covariantcontravariant分别输入 A .

invariant::类型 Wrapper[A]Wrapper[B]根本不关心 A 之间的类型关系和BWrapper[A] 之间根本没有关系。和Wrapper[B] .

covariant::给定类型 B这是另一种类型的子类型 A (读作 B extends A ),然后 Wrapper[B]也将是 Wrapper[A] 的子类型(读作 Wrapper[B] extends Wrapper[A] )。

contravariant::给定类型 B这是另一种类型的子类型 A (读作 B extends A ),然后 Wrapper[B]将是 Wrapper[A] 的父类(super class)型(读作 Wrapper[A] extends Wrapper[B] )。

Java 只允许 invariant输入参数,而 Scala 允许您选择 variance根据需要使用Wrapper[A]对于 invariant , Wrapper[+A]对于 covariantWrapper[-A]对于 contravariant关系。

因此,对于任何参数化类型 Wrapper[T]用 Java 实现,Wrapper[A]Wrapper[B]无论 A 之间是否存在任何关系,都没有关系和B .

KeyedOneInputStreamOperatorTestHarness用Java实现的,是invariant关于 RideKeySelector 之间的关系和KeySelector[TaxiRide, Int] .

虽然你不需要担心variance在许多情况下,当您处理invariant时在covariant处输入参数地方和事物只是巧合地工作,但在处理 invariant 时它们几乎从不工作在contravariant处输入参数地点。

但是当它起作用时,你必须记住,该类型在任何时候都没有被使用只是一个巧合 contravariant地方,你不应该依赖它的工作,因为开发人员从来没有想过它。它可能会因额外添加一个 contravariant 而改变。习惯了 future 版本中的类(class)。

这里的情况正是如此,第二个类型参数InKeyedOneInputStreamOperatorTestHarness<K, IN, OUT>自然使用于contravariant类(class)名额KeyedOneInputStreamOperatorTestHarness .

至于重载方法错误,那是因为KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>的构造函数造成的想要一个KeySelector<IN, K> 。因此KeyedOneInputStreamOperatorTestHarness<Int, TaxiRide, Int>想要一个KeySelector<TaxiRide, Int>而不是scala.Function1[TaxiRide, Int]正如您在下一行中提供的那样,

val testHarness = 
  new KeyedOneInputStreamOperatorTestHarness(
    operator,
   (ride: TaxiRide) => ride.rideId,
   Types.INT
)

现在,您只需提供正确的实例即可。

val rideKeySelector = new KeySelector[TaxiRide, Int] {
  @throws(classOf[Exception])
  override def getKey(value: TaxiRide): Int = value.rideId
}

val testHarness = 
  new KeyedOneInputStreamOperatorTestHarness(
    operator,
   rideKeySelector,
   Types.INT
)

我另外建议您放弃使用 Int (并且只需使用 java.lang.Integer )在处理类型密集型 Java 库时。 Scala 之间的交互 ​​Int和 Java 的 intInteger有足够的复杂性来写一本完整的书。

关于scala - Apache Flink - 无法使用 Int 或 Long 泛型参数编译 KeyedOneInputStreamOperatorTestHarness,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70543330/

相关文章:

scala - Scala中 'HashSet'和 'Set'之间的区别?

scala - Play 2.0 + Bootstrap3 : Showing active navigation item

java - Flink 从集群 GUI 向作业提交 args 的正确方法是什么?

java - Apache Flink 连续 split 奇怪的行为

java - Flink RichMapFunction 中未调用 open 方法

java - 为了在 Java 中整合 + 一个条件

google-app-engine - Java GAE maven配置问题?只是另一个验证错误

java - 小写 "month"在Scala中是什么意思?

java - 一个 Flink 作业中有两个数据流

apache-flink - Flink CSV 文件读取器无法将 LongType 转换为 PojoType