scala - 将 DStream 与动态数据集连接

标签 scala apache-spark spark-streaming rdd

我是 Spark Streaming 的新手。我需要使用动态数据集中的数据来丰富来自流的事件。我在创建动态数据集时遇到问题。该数据集应该由来自不同流的数据摄取(但该流的吞吐量比主要事件流低得多)。此外,数据集的大小约为 1-3 GB,因此使用简单的 HashMap 是不够的(在我看来)。

在 Spark Streaming 编程指南中我发现:

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

和解释:“事实上,您还可以动态更改要加入的数据集。”这部分我完全不明白 - RDD 如何动态更改?不是一成不变的吗?

下面你可以看到我的代码。重点是将每个新的 RDD 从 myStream 添加到 myDataset,但显然这并不像我希望的那样工作。

val ssc = new StreamingContext(conf, Seconds(5))
val myDataset: RDD[String] = ssc.sparkContext.emptyRDD[String]
val myStream = ssc.socketTextStream("localhost", 9997)
lines7.foreachRDD(rdd => {myDataset.union(rdd)})
myDataset.foreach(println)

如果有任何帮助或建议,我将不胜感激。 问候!

最佳答案

是的,RDD 是不可变的。您的代码的一个问题是 union() 返回一个新的 RDD,它不会更改现有的 myDataset RDD。

编程指南说明如下:

In fact, you can also dynamically change the dataset you want to join against. The function provided to transform is evaluated every batch interval and therefore will use the current dataset that dataset reference points to.

第一句话可能读起来更好:

In fact, you can also dynamically change which dataset you want to join against.

因此我们可以更改dataset引用的RDD,但不能更改RDD本身。以下是其工作原理的示例(使用 Python):

# Run as follows:
# $ spark-submit ./match_ips_streaming_simple.py.py 2> err
# In another window run:
# $ nc -lk 9999
# Then enter IP addresses separated by spaces into the nc window
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time

BATCH_INTERVAL = 2
SLEEP_INTERVAL = 8
sc       = SparkContext("local[*]", "IP-Matcher")
ssc      = StreamingContext(sc, BATCH_INTERVAL)
ips_rdd  = sc.parallelize(set())
lines_ds = ssc.socketTextStream("localhost", 9999)
# split each line into IPs
ips_ds   = lines_ds.flatMap(lambda line: line.split(" "))
pairs_ds = ips_ds.map(lambda ip: (ip, 1))
# join with the IPs RDD
matches_ds = pairs_ds.transform(lambda rdd: rdd.join(ips_rdd))
matches_ds.pprint()
ssc.start()

# alternate between two sets of IP addresses for the RDD
IP_FILES   = ('ip_file1.txt', 'ip_file2.txt')
file_index = 0
while True:
        with open(IP_FILES[file_index]) as f:
                ips = f.read().splitlines()
        ips_rdd = sc.parallelize(ips).map(lambda ip: (ip, 1))
        print "using", IP_FILES[file_index]
        file_index = (file_index + 1) % len(IP_FILES)
        time.sleep(SLEEP_INTERVAL)
#ssc.awaitTermination()

while 循环中,我每 8 秒更改一次 ips_rdd 引用的 RDD。 join() 转换将使用 ips_rdd 当前引用的任何 RDD。

$ cat ip_file1.txt
1.2.3.4
10.20.30.40
$ cat ip_file2.txt
5.6.7.8
50.60.70.80

$ spark-submit ./match_ips_streaming_simple.py  2> err
using ip_file1.txt
-------------------------------------------
Time: 2015-09-09 17:18:20
-------------------------------------------

-------------------------------------------
Time: 2015-09-09 17:18:22
-------------------------------------------

-------------------------------------------
Time: 2015-09-09 17:18:24
-------------------------------------------
('1.2.3.4', (1, 1))
('10.20.30.40', (1, 1))

using ip_file2.txt
-------------------------------------------
Time: 2015-09-09 17:18:26
-------------------------------------------

-------------------------------------------
Time: 2015-09-09 17:18:28
-------------------------------------------
('50.60.70.80', (1, 1))
('5.6.7.8', (1, 1))
...

当上述作业正在运行时,在另一个窗口中:

$ nc -lk 9999
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8
<... wait for the other RDD to load ...>
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8

关于scala - 将 DStream 与动态数据集连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30376434/

相关文章:

scala - 驯服 Scala 类型系统

scala - 良好的Scala介绍性文章/视频可激发食欲

scala - 配置 Scalaxb SBT 插件生成的源代码位置

java - 从Spark Workers将数据从SparkStreaming保存到Cassandra是否可行

apache-spark - 如何在 Yarn 上配置应用程序驱动程序的自动重启

bash - 如何执行solr zookeeper CLI上传solr配置

scala - 如何获取 RDD 的子集?

apache-spark - 获取 Pyspark Dataframe 的 nlargest 值的更有效方法

apache-spark - 如何控制每个任务/阶段/工作尝试的 Spark 应用程序?

python - 如何将 Spark Streaming 数据转换为 Spark DataFrame