我正在尝试使用 Spark Streaming 1.2.0。在某些时候,我按键对流数据进行分组,然后对它们应用一些操作。
以下是一段测试代码:
...
JavaPairDStream<Integer, Iterable<Integer>> grouped = mapped.groupByKey();
JavaPairDStream<Integer, Integer> results = grouped.mapToPair(
new PairFunction<Tuple2<Integer, Iterable<Integer>>, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Iterable<Integer>> tp) throws Exception {
TaskContext tc = TaskContext.get();
String ip = InetAddress.getLocalHost().getHostAddress();
int key = tp._1();
System.out.println(ip + ": Partition: " + tc.partitionId() + "\tKey: " + key);
return new Tuple2<>(key, 1);
}
});
results.print();
mapped 是一个 JavaPairDStream,它包装了一个每秒存储一个整数数组的虚拟接收器。
我在一个有两个从站的集群上运行这个应用程序,每个从站有 2 个核心。 当我检查打印输出时,似乎分区没有永久分配给节点(或以“粘性”方式)。他们经常在两个节点之间移动。这给我带来了一个问题。
在我的实际应用程序中,我需要为每个分区加载相当大量的地理数据。这些地理数据将用于处理流中的数据。我只能加载每个分区的部分地理数据集。如果分区在节点之间移动,我也必须移动地理数据,这可能非常昂贵。
有没有办法让分区保持粘性,即分区 0,1,2,3 保留在节点 0 上,分区 4,5,6,7 保留在节点 1 上?
我尝试将spark.locality.wait设置为一个很大的数字,例如1000000。但它不起作用。
谢谢。
最佳答案
我找到了解决方法。 我可以将我的辅助数据作为 RDD。对其进行分区并缓存。 稍后,我可以将其与其他 RDD 组合在一起,Spark 将尝试将缓存的 RDD 分区保留在原处,而不是对其进行混洗。例如
...
JavaPairRDD<Integer, GeoData> geoRDD =
geoRDD1.partitionBy(new HashPartitioner(num)).cache();
稍后再执行此操作
JavaPairRDD<Integer, Integer> someOtherRDD = ...
JavaPairRDD<Integer, Tuple2<Iterator<GeoData>>, Iterator<Integer>>> grp =
geoRDD.cogroup(someOtherRDD);
然后,您可以在 cogroupped rdd 上使用 foreach 来处理带有地理数据的输入数据。
关于streaming - 如何使 Spark 分区具有粘性,即与节点保持一致?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28079037/