当我想使用 KeyedOneInputStreamOperatorTestHarness[K, IN, OUT] 并以 Int 或 Long 作为关键 (K) 参数时,下面提到的 Apache Flink 测试代码无法在 Scala 中编译。可以使用 STRING、INTEGER 或 java.lang.Long 类型的泛型参数来编译相同的代码:
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.streaming.api.scala._
import org.junit.Test
case class TaxiRide(rideId: Int)
class RideKeySelector extends KeySelector[TaxiRide, Int] {
override def getKey(in: TaxiRide): Int = in.rideId
}
class RideTests {
private def setupHarness(function: KeyedProcessFunction[Int, TaxiRide, Int]): KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] = {
val operator: KeyedProcessOperator[Int, TaxiRide, Int] = new KeyedProcessOperator(function)
val testHarness: KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] =
new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)
testHarness.setup()
testHarness.open()
testHarness
}
}
构建错误:
type mismatch;
found : org.example.RideKeySelector
required: org.apache.flink.api.java.functions.KeySelector[org.example.TaxiRide,Int]
new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)
我从以下 Maven 原型(prototype)创建了该项目:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-quickstart-scala</artifactId>
<version>1.14.2</version>
</dependency>
我在 IntelliJ IDEA 中向 pom.xml 添加了以下测试依赖项:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.14.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
</dependencies>
我尝试用 lambda 替换键选择器:
val testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)
构建错误更改为:
overloaded method constructor KeyedOneInputStreamOperatorTestHarness with alternatives:
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: org.apache.flink.runtime.operators.testutils.MockEnvironment)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT]
cannot be applied to (org.apache.flink.streaming.api.operators.KeyedProcessOperator[Int,org.example.TaxiRide,Int], org.example.TaxiRide => Integer, org.apache.flink.api.common.typeinfo.TypeInformation[Integer])
testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)
看起来它不能接受Scala类型,为什么会这样?
最佳答案
参数化类型与一个名为 type variance
的概念相关联。 .
当你有 Wrapper
它采用类型参数 A
,然后是你的Wrapper
可以是 invariant
, covariant
或contravariant
分别输入 A
.
invariant
::类型 Wrapper[A]
和Wrapper[B]
根本不关心 A
之间的类型关系和B
。 Wrapper[A]
之间根本没有关系。和Wrapper[B]
.
covariant
::给定类型 B
这是另一种类型的子类型 A
(读作 B extends A
),然后 Wrapper[B]
也将是 Wrapper[A]
的子类型(读作 Wrapper[B] extends Wrapper[A]
)。
contravariant
::给定类型 B
这是另一种类型的子类型 A
(读作 B extends A
),然后 Wrapper[B]
将是 Wrapper[A]
的父类(super class)型(读作 Wrapper[A] extends Wrapper[B]
)。
Java 只允许 invariant
输入参数,而 Scala 允许您选择 variance
根据需要使用Wrapper[A]
对于 invariant
, Wrapper[+A]
对于 covariant
和Wrapper[-A]
对于 contravariant
关系。
因此,对于任何参数化类型 Wrapper[T]
用 Java 实现,Wrapper[A]
和Wrapper[B]
无论 A
之间是否存在任何关系,都没有关系和B
.
如KeyedOneInputStreamOperatorTestHarness
用Java实现的,是invariant
关于 RideKeySelector
之间的关系和KeySelector[TaxiRide, Int]
.
虽然你不需要担心variance
在许多情况下,当您处理invariant
时在covariant
处输入参数地方和事物只是巧合地工作,但在处理 invariant
时它们几乎从不工作在contravariant
处输入参数地点。
但是当它起作用时,你必须记住,该类型在任何时候都没有被使用只是一个巧合 contravariant
地方,你不应该依赖它的工作,因为开发人员从来没有想过它。它可能会因额外添加一个 contravariant
而改变。习惯了 future 版本中的类(class)。
这里的情况正是如此,第二个类型参数In
的KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
自然使用于contravariant
类(class)名额KeyedOneInputStreamOperatorTestHarness
.
至于重载方法错误,那是因为KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
的构造函数造成的想要一个KeySelector<IN, K>
。因此KeyedOneInputStreamOperatorTestHarness<Int, TaxiRide, Int>
想要一个KeySelector<TaxiRide, Int>
而不是scala.Function1[TaxiRide, Int]
正如您在下一行中提供的那样,
val testHarness =
new KeyedOneInputStreamOperatorTestHarness(
operator,
(ride: TaxiRide) => ride.rideId,
Types.INT
)
现在,您只需提供正确的实例即可。
val rideKeySelector = new KeySelector[TaxiRide, Int] {
@throws(classOf[Exception])
override def getKey(value: TaxiRide): Int = value.rideId
}
val testHarness =
new KeyedOneInputStreamOperatorTestHarness(
operator,
rideKeySelector,
Types.INT
)
我另外建议您放弃使用 Int
(并且只需使用 java.lang.Integer
)在处理类型密集型 Java 库时。 Scala 之间的交互 Int
和 Java 的 int
和Integer
有足够的复杂性来写一本完整的书。
关于scala - Apache Flink - 无法使用 Int 或 Long 泛型参数编译 KeyedOneInputStreamOperatorTestHarness,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70543330/