apache-spark - 如何理解apache spark中的queueStream API?

标签 apache-spark

pyspark 有一个 api queueStream 用于从一系列 rdd 构造 dstream。

queueStream(rdds, oneAtATime=True, default=None)
Create an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.

NOTE: changes to the queue after the stream is created will not be recognized.

Parameters: 
rdds – Queue of RDDs
oneAtATime – pick one rdd each time or pick all of them once.
default – The default rdd if no more in rdds

问题 1:

在分布式环境中,如果我定义了一个队列对象 q1.我做q1.add(RDD)之类的操作。 q1 对象是否会转移到所有工作节点? q1.add(RDD)这个对象复制到其他节点上会不会有问题?

问题2:

在我运行 dstream = queueStream(q1) 之后。
如果我继续将 RDD 放入队列中。这些 RDDS 会添加到 dstream 中吗?

最佳答案

我相信以下注意事项:

changes to the queue after the stream is created will not be recognized.



几乎可以回答这个问题,但要了解为什么会出现这种情况,您必须在 PySpark 代码中这样做,尤其是 following line :
queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])

如果这还不够,您可以查看 the corresponding Scala code 以查看它需要一个静态列表:
def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]])

并将其转换为队列:
val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
rdds.asScala.foreach(queue.add)

因此,Python 端的任何更改根本无法反射(reflect)在流中。

关于第一个问题,答案是否定的。队列不会被分发,因为 RDD 在 Driver 上下文之外根本没有意义。

注意 :

需要明确的是,Scala queueStream 将反射(reflect)添加到队列中的内容。 There is even an example in the Spark source

关于apache-spark - 如何理解apache spark中的queueStream API?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36324991/

相关文章:

apache-spark - 如何获取Pyspark中RDD的大小?

scala - 更改 Spark 的 Hadoop 版本

apache-spark - 以编程方式将 Databricks spark-csv 添加到 Spark 1.6.2 客户端

json - Apache spark 解析带有拆分记录的 json

docker - 如何使用 Docker 创建分布式 spark 集群

bash - 如何从Kubernetes访问Spark Shell?

hadoop - Apache Spark JavaSchemaRDD 是空的,即使它的输入 RDD 有数据

hadoop - 如何处理(遍历)hadoop/Spark 集群上的大型 JSON 文件?

hadoop - Spark 独立版和 HDFS 的数据局部性

java - 在 spark 提交中覆盖 spark 的库