假设我有一个 JavaDStreamReceiver,它每秒从 Spark Streaming 中的 TCP/IP 套接字连接接收一个整数。 然后我将其存储在一个列表中,直到有 100 个整数。 之后,我想将该 RDD 分为 4 个分区,我的电脑中每个核心一个分区,并并行映射这些分区。所以像这样:
public final class sparkstreaminggetjson {
private static final Pattern SPACE = Pattern.compile(" ");
private static Integer N=100;
private static List<Integer> allInputValues= new List<Integer>();
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<Integer> receivedStream = ssc.socketTextStream(
args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<List<Integer>> storeValuesInList=receivedStream.map( // if N<100, keeps inserting integers; if N>100, inserts the new value into the List and removes the oldest value );
JavaDStream<List<Integer>> partitionedList=storeValuesInList.repartition(4);
JavaDStream<List<Integer>> someCalculations=partionedList.map(//some calculations)
JavaDStream<List<Integer>> otherCalculations=someCalculations.map(//other calculations)
...
finalStream.print();
这是我的问题。我想实现一个 FILO 模型,在该模型中,我收到一个新输入,将其放置在 RDD 的第一个分区中,并从 RDD 的最后一个分区中删除最后一个元素。所以基本上我从列表中放入和轮询整数,保持原始大小。之后,我像往常一样并行处理每个分区。
这是我的问题:每当我的分区处理完成时,应用程序都会返回到 receivedStream
,而不是 partitionedList
。也就是说,我为每个分区获得一个新输入,这不是我想要的。我希望处理每个分区,然后才返回到 receiveStream 来获取新输入。
我该怎么做?我应该用其他方法替换 receivedStream
之后的 map()
来分隔阶段吗?
非常感谢。
最佳答案
据我所知,您可以使用窗口:每秒 1 个整数意味着您可以使用
JavaDstream integers = your stream;
JavaDstream hundredInt = integers.window(Seconds(100));
这样每个 RDD 就有 100 个整数。
根据缓冲:newInt ->[1...25][26...50][51...75][76...100] ->lastInt
< br/>
这就是我的理解,所以如果你想保留最后的计算,你可以用 rdd.cache() 获取新的 100 个整数并从中进行详细说明。或者rdd.checkpoint
。
关于java - 如何仅在处理完 RDD 中的所有分区后才在 Spark Streaming 中接收输入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37767126/