streaming - 如何使 Spark 分区具有粘性,即与节点保持一致?

标签 streaming apache-spark

我正在尝试使用 Spark Streaming 1.2.0。在某些时候,我按键对流数据进行分组,然后对它们应用一些操作。

以下是一段测试代码:

...        
JavaPairDStream<Integer, Iterable<Integer>> grouped = mapped.groupByKey();
JavaPairDStream<Integer, Integer> results = grouped.mapToPair(
            new PairFunction<Tuple2<Integer, Iterable<Integer>>, Integer, Integer>() {
                @Override
                public Tuple2<Integer, Integer> call(Tuple2<Integer, Iterable<Integer>> tp) throws Exception {
                    TaskContext tc = TaskContext.get();
                    String ip = InetAddress.getLocalHost().getHostAddress();
                    int key = tp._1();
                    System.out.println(ip + ": Partition: " + tc.partitionId() + "\tKey: " + key);
                    return new Tuple2<>(key, 1);
                }
            });
results.print();

mapped 是一个 JavaPairDStream,它包装了一个每秒存储一个整数数组的虚拟接收器。

我在一个有两个从站的集群上运行这个应用程序,每个从站有 2 个核心。 当我检查打印输出时,似乎分区没有永久分配给节点(或以“粘性”方式)。他们经常在两个节点之间移动。这给我带来了一个问题。

在我的实际应用程序中,我需要为每个分区加载相当大量的地理数据。这些地理数据将用于处理流中的数据。我只能加载每个分区的部分地理数据集。如果分区在节点之间移动,我也必须移动地理数据,这可能非常昂贵。

有没有办法让分区保持粘性,即分区 0,1,2,3 保留在节点 0 上,分区 4,5,6,7 保留在节点 1 上?

我尝试将spark.locality.wait设置为一个很大的数字,例如1000000。但它不起作用。

谢谢。

最佳答案

我找到了解决方法。 我可以将我的辅助数据作为 RDD。对其进行分区并缓存。 稍后,我可以将其与其他 RDD 组合在一起,Spark 将尝试将缓存的 RDD 分区保留在原处,而不是对其进行混洗。例如

...
JavaPairRDD<Integer, GeoData> geoRDD = 
    geoRDD1.partitionBy(new HashPartitioner(num)).cache();

稍后再执行此操作

JavaPairRDD<Integer, Integer> someOtherRDD = ...
JavaPairRDD<Integer, Tuple2<Iterator<GeoData>>, Iterator<Integer>>> grp =
    geoRDD.cogroup(someOtherRDD);

然后,您可以在 cogroupped rdd 上使用 foreach 来处理带有地理数据的输入数据。

关于streaming - 如何使 Spark 分区具有粘性,即与节点保持一致?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28079037/

相关文章:

hadoop - 如何分别从tarball安装Spark和Hadoop [Cloudera]

ios - RESTful WCF 将视频流式传输到 iOS 设备

apache-spark - 如何配置spark sql Thrift服务器

hadoop - 如何使用 Spark Streaming 确保事件的有序处理?

python - RDD 沿袭/Spark 操作图的良好输出

scala - NLineInputFormat 在 Spark 中不起作用

c# - C# 中的 Windows 应用程序?

json - 如何在循环中将结构序列化为 io::Write

JAVA : Transferring files through socket

java - Spring Boot + 视频错误 "video playback aborted due to a network error"