java - 在 Spark Streaming 中调用 updateStateByKey 时出错

标签 java scala apache-spark

我在 Scala 中有这个通用方法

def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S],
Optional[S]])   : JavaPairDStream[K, S] = { ... }

当我用 Java 调用它时,这两个都无法编译:

1

JavaPairDStream<String, Integer> stateDstream =
pairs.<Integer>updateStateByKey(...);

2

JavaPairDStream<String, Integer> stateDstream =
pairs.updateStateByKey(...);

如何正确调用该方法?

错误消息:

The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>,
int) in the type JavaPairDStream<String,Integer> is not applicable for
the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>,
HashPartitioner, JavaPairRDD<String,Integer>)

编辑: 整个函数调用(Java 8):

final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
    (values, state) -> {
      Integer newSum = state.or(0);
      for (Integer value : values) {
        newSum += value;
      }
      return Optional.of(newSum);
    };



JavaPairDStream<String, Integer> stateDstream = pairs.updateStateByKey(
    updateFunction
    ,
    new HashPartitioner(context.defaultParallelism()), initialRDD);

编辑: 事实证明,泛型不是问题,而是参数与方法签名不匹配。

最佳答案

问题是您传入了一个 initialRDD,而方法 updateStateByKey没有它作为参数。

最接近的签名是:

updateStateByKey[S](updateFunc: Function2[List[V], Optional[S], Optional[S]], 
  partitioner: Partitioner): JavaPairDStream[K, S] 

关于java - 在 Spark Streaming 中调用 updateStateByKey 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29129195/

相关文章:

java - 在 Log4j 中将任意对象传递给 ThreadContext.put()

java - Java 中抽象属性的解决方法

scala - 在 Scala 中在运行时对协变和逆变类进行有效的类型转换

scala - 编写一个不返回任何内容的 Scala 方法

java - 警告 TaskSetManager : Lost Task xxx: java. lang.ArrayIndexOutOfBoundsException: 1 - Scala

dataframe - 如何将具有多个分隔符的文件转换为数据帧

java - 连接到 LDAP 服务器,无需硬编码凭据

java - 从对象内的对象调用方法?(Java)

java - 我想减少时间。如何在不花时间的情况下进行下一个 Activity 。我有 12 个 j 儿子在异步任务中运行

java - 如何从 Spark 结构化流获取 Kafka 输出中的批处理 ID