pyspark 有一个 api queueStream 用于从一系列 rdd 构造 dstream。
queueStream(rdds, oneAtATime=True, default=None)
Create an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.
NOTE: changes to the queue after the stream is created will not be recognized.
Parameters:
rdds – Queue of RDDs
oneAtATime – pick one rdd each time or pick all of them once.
default – The default rdd if no more in rdds
问题 1:
在分布式环境中,如果我定义了一个队列对象 q1.我做q1.add(RDD)之类的操作。 q1 对象是否会转移到所有工作节点? q1.add(RDD)这个对象复制到其他节点上会不会有问题?
问题2:
在我运行 dstream = queueStream(q1) 之后。
如果我继续将 RDD 放入队列中。这些 RDDS 会添加到 dstream 中吗?
最佳答案
我相信以下注意事项:
changes to the queue after the stream is created will not be recognized.
几乎可以回答这个问题,但要了解为什么会出现这种情况,您必须在 PySpark 代码中这样做,尤其是 following line :
queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
如果这还不够,您可以查看 the corresponding Scala code 以查看它需要一个静态列表:
def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]])
并将其转换为队列:
val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
rdds.asScala.foreach(queue.add)
因此,Python 端的任何更改根本无法反射(reflect)在流中。
关于第一个问题,答案是否定的。队列不会被分发,因为 RDD 在
Driver
上下文之外根本没有意义。注意 :
需要明确的是,Scala
queueStream
将反射(reflect)添加到队列中的内容。 There is even an example in the Spark source
关于apache-spark - 如何理解apache spark中的queueStream API?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36324991/