java - 如何仅在处理完 RDD 中的所有分区后才在 Spark Streaming 中接收输入?

标签 java apache-spark parallel-processing spark-streaming

假设我有一个 JavaDStreamReceiver,它每秒从 Spark Streaming 中的 TCP/IP 套接字连接接收一个整数。 然后我将其存储在一个列表中,直到有 100 个整数。 之后,我想将该 RDD 分为 4 个分区,我的电脑中每个核心一个分区,并并行映射这些分区。所以像这样:

 public final class sparkstreaminggetjson {
 private static final Pattern SPACE = Pattern.compile(" ");
 private static Integer N=100;
 private static List<Integer> allInputValues= new List<Integer>();

 public static void main(String[] args) throws Exception {

  SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");


  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  JavaReceiverInputDStream<Integer> receivedStream = ssc.socketTextStream(
        args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);

  JavaDStream<List<Integer>> storeValuesInList=receivedStream.map( // if N<100, keeps inserting integers; if N>100, inserts the new value into the List and removes the oldest value );

  JavaDStream<List<Integer>> partitionedList=storeValuesInList.repartition(4);


  JavaDStream<List<Integer>> someCalculations=partionedList.map(//some calculations)

  JavaDStream<List<Integer>> otherCalculations=someCalculations.map(//other calculations)

...

finalStream.print();

这是我的问题。我想实现一个 FILO 模型,在该模型中,我收到一个新输入,将其放置在 RDD 的第一个分区中,并从 RDD 的最后一个分区中删除最后一个元素。所以基本上我从列表中放入和轮询整数,保持原始大小。之后,我像往常一样并行处理每个分区。

这是我的问题:每当我的分区处理完成时,应用程序都会返回到 receivedStream,而不是 partitionedList。也就是说,我为每个分区获得一个新输入,这不是我想要的。我希望处理每个分区,然后才返回到 receiveStream 来获取新输入。

我该怎么做?我应该用其他方法替换 receivedStream 之后的 map() 来分隔阶段吗?

非常感谢。

最佳答案

据我所知,您可以使用窗口:每秒 1 个整数意味着您可以使用

JavaDstream integers = your stream;
JavaDstream hundredInt = integers.window(Seconds(100));

这样每个 RDD 就有 100 个整数。

根据缓冲:newInt ->[1...25][26...50][51...75][76...100] ->lastInt< br/> 这就是我的理解,所以如果你想保留最后的计算,你可以用 rdd.cache() 获取新的 100 个整数并从中进行详细说明。或者rdd.checkpoint

关于java - 如何仅在处理完 RDD 中的所有分区后才在 Spark Streaming 中接收输入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37767126/

相关文章:

scala - 将 Spark-kafka InputDStream 转换为 Array[Bytes]

apache-spark - 从 Kafka 流式传输的 PySpark 问题

multithreading - 连通分量的并行算法

c# - .Net中的Dictionary在并行读写时是否有可能导致死锁?

Java XML Dom 内存密集型

java - 在 Java 中将字符串转换为 SomeType

java - 如何在 Windows 7 上安装 Apache Ant

python - Pyspark DataFrame - 如何使用变量进行连接?

调用并行模块时 Python 计时代码不正确 - 我的代码或模块?

java - Xstream 创建类并导致痛苦的永久集合