这可能是一个愚蠢的问题,但我似乎找不到任何用纯英语澄清这一点的文档(好吧,夸张了),并且在阅读了官方文档和一些博客之后,我仍然对 driver 和执行者工作。
这是我目前的理解:
1) 驱动程序定义转换/计算。
2)一旦我们调用SparkContext.start()
,驱动程序会将定义的转换/计算发送给所有执行器,以便每个执行器知道如何处理传入的RDD流数据。
好的,这是我的一些令人困惑的问题:
1) 驱动程序是否仅一次且全部将定义的转换/计算发送给所有执行程序?
如果是这种情况,我们就没有机会重新定义/更改计算,对吧?
例如,我做了类似于此的字数统计工作 one ,但是我的工作有点复杂,我只想计算前 60 秒以字母 J
开头的单词,然后仅计算接下来的 60 秒以字母 K
开头的单词,然后只有以 ...... 开头的单词,这样下去。
那么我应该如何在驱动程序中实现这个流作业?
2)或者驱动程序是否在每批数据完成后重新启动/重新调度所有执行器?
跟进
为了解决1)的问题,我想我可以利用一些外部存储介质,比如redis
,我的意思是我可以在中实现一个处理函数count_fn
driver,每次调用这个count_fn
时,都会从redis中读取起始字母,然后在RDD流中计数,这样正确吗?
最佳答案
Does the driver send the defined transformation/computation to all executors only ONCE AND FOR ALL?
不,每个任务都会被序列化并在每个批处理迭代中发送给所有工作人员。想想当您有一个在转换中使用的类的实例时会发生什么,Spark 必须能够将同一个实例及其所有状态发送给每个执行器进行操作。
If this is the case, we wouldn't have any chance to redefine/change the computation, right?
转换定义中的逻辑是不变的,但这并不意味着您不能查询存储影响转换中数据的信息的第三方。
例如,假设您有一些外部源来指示您应该过滤哪些字母。然后,您可以在 DStream 上调用 transform
来从驱动程序中获取有关按哪个字母进行过滤的数据。
Or does the driver restart/reschedule all the executors after each batch of data is done?
它不会重新启动,它只是在每个批处理间隔启动一个新作业。如果您将 StreamingContext
批处理持续时间定义为 60 秒,则每 60 秒就会有一个新作业(微批处理)开始处理数据。
根据您的后续行动,是的,我就是这样做的。
关于python - 想要了解 Spark Streaming 的工作原理吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43138379/