scala - ApacheFlink 中的数据集联盟

标签 scala bigdata apache-flink

我正在尝试联合 Seq[DataSet(Long,Long,Double)]
DataSet[(Long,Long,Double)]在 Flink 中:

     val neighbors= graph.map(el => zKnn.neighbors(results,
      el.vector, 150, metric)).reduce(
     (a, b) => a.union(b)
      ).collect()

其中 graph 是一个常规的 scala 集合,但可以转换为 DataSet;
结果是 DataSet[Vector]并且不应该被收集并且在邻居方法中是必需的

我总是得到一个 FlinkRuntime Exeption:

cannot currently handle nodes with more than 64 outputs. org.apache.flink.optimizer.CompilerException: Cannot currently handle nodes with more than 64 outputs. at org.apache.flink.optimizer.dag.OptimizerNode.addOutgoingConnection(OptimizerNode.java:347) at org.apache.flink.optimizer.dag.SingleInputNode.setInput(SingleInputNode.java:202

最佳答案

Flink 目前不支持超过 64 个输入数据集的联合算子。

作为一种解决方法,您可以分层合并多达 64 个数据集,并在层次结构的各个级别之间注入(inject)一个身份映射器。
就像是:

DataSet level1a = data1.union(data2.union(data3...(data64))).map(new IDMapper());
DataSet level1b = data65.union(data66...(data128))).map(new IDMapper());
DataSet level2 = level1a.union(level1b)

关于scala - ApacheFlink 中的数据集联盟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31616178/

相关文章:

python - Dask 数据帧 : how to convert a column dtype from object to numeric

apache-flink - 管理具有大量内存使用的状态 - 从存储中查询

apache-flink - Apache Flink 资源规划最佳实践

amazon-s3 - Amazon EMR 在为 Apache-Flink 提交作业时遇到 Hadoop 可恢复错误

scala - saveTocassandra 找不到参数 rwf 的隐式值

python - Dask + Pandas : Returning a sequence of conditional dummies

scala - 基于级联的烫伤(旧版本)计数器

python - 如何在特定模式的 pandas/python 中加载大于 10gb 的 json 文件

scala - 特殊值的字符串表示

scala - 合格的进口,如果可能的话使用别名