scala - 将 reduceByKey 从 Spark 转换到 Flink

标签 scala apache-spark apache-flink

如何将此示例 scala spark 代码转换为 apache flink?

reduceByKey( (x, y) => (x._1 + y._1, ( (x._2) ++ y._2) ) )

我意识到 reduceByKey 在 flink 中不存在,但它显示了我正在努力实现的目标。

谢谢你的帮助!

最佳答案

与Spark不同,Flink不需要键值对来执行reduce、join和coGroup操作。它可以直接在任何类型上执行它们,例如 POJO、元组或用户类型。你必须提供给 Flink 的是它必须分组的字段。这可以是提取键的函数、逻辑索引或字段名称。当您随后调用 reduce 操作时,整个对象将被提供给 reduce 函数,而不仅仅是值部分。

因此假设您有一个 input: DataSet[(K, (T, List[U]))] 并且 K 是键类型,那么您的 reduce 函数看起来像:

input.groupBy(0).reduce{
  (left: (K, (T, List[U])), right: (K, (T, List[U]))) =>
    val (key, (leftValue1, leftValue2)) = left
    val (_, (rightValue1, rightValue2)) = right

    (key, (leftValue1 + rightValue1, leftValue2 ++ rightValue2))
}

为了便于理解,我还为匿名函数提供了类型注释。但这不是必需的。

更新

这是 Humberto 特定用例的解决方案,假设输入字段由具有 3 个条目的行组成,空格分隔,第三个条目是整数:

val input = env.readCsvFile[(String, String, Int)](filePath, lineDelimiter = "\n", fieldDelimiter = " ")

val result = input
  .map (element => (element._1, element._3, Map(element._2 -> element._3)))
  .groupBy(0)
  .reduce{
    (left, right) =>
      val (key, left1, left2) = left
      val (_, right1, right2) = right

      (key, left1 + right1, left2 ++ right2)
  }

关于scala - 将 reduceByKey 从 Spark 转换到 Flink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34146800/

相关文章:

apache-flink - Apache Flink 检查点卡住

postgresql - 使用 postgres 驱动程序玩光滑的数据库配置问题

scala - 我什么时候应该(或者不应该)使用 Scala 的 @inline 注释?

scala - 如何解决 DataSet.toJSON 与结构化流不兼容的问题

apache-flink - 如何使用带有时间戳和水印分配器的 Flink 流时间窗口?

java - apache flink - 作为终止条件过滤

Scala:基于文件夹定义测试用例

java - Akka 有限状态机实例

python - 超时错误: Error with 400 StatusCode: "requirement failed: Session isn' t active.”

apache-spark - drop table 命令不会删除由 spark-sql 创建的配置单元表的路径