我是 Spark Streaming 的新手。我需要使用动态数据集中的数据来丰富来自流的事件。我在创建动态数据集时遇到问题。该数据集应该由来自不同流的数据摄取(但该流的吞吐量比主要事件流低得多)。此外,数据集的大小约为 1-3 GB,因此使用简单的 HashMap 是不够的(在我看来)。
在 Spark Streaming 编程指南中我发现:
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
和解释:“事实上,您还可以动态更改要加入的数据集。”这部分我完全不明白 - RDD 如何动态更改?不是一成不变的吗?
下面你可以看到我的代码。重点是将每个新的 RDD 从 myStream 添加到 myDataset,但显然这并不像我希望的那样工作。
val ssc = new StreamingContext(conf, Seconds(5))
val myDataset: RDD[String] = ssc.sparkContext.emptyRDD[String]
val myStream = ssc.socketTextStream("localhost", 9997)
lines7.foreachRDD(rdd => {myDataset.union(rdd)})
myDataset.foreach(println)
如果有任何帮助或建议,我将不胜感激。 问候!
最佳答案
是的,RDD 是不可变的。您的代码的一个问题是 union()
返回一个新的 RDD,它不会更改现有的 myDataset
RDD。
编程指南说明如下:
In fact, you can also dynamically change the dataset you want to join against. The function provided to
transform
is evaluated every batch interval and therefore will use the current dataset thatdataset
reference points to.
第一句话可能读起来更好:
In fact, you can also dynamically change which dataset you want to join against.
因此我们可以更改dataset
引用的RDD,但不能更改RDD本身。以下是其工作原理的示例(使用 Python):
# Run as follows:
# $ spark-submit ./match_ips_streaming_simple.py.py 2> err
# In another window run:
# $ nc -lk 9999
# Then enter IP addresses separated by spaces into the nc window
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import time
BATCH_INTERVAL = 2
SLEEP_INTERVAL = 8
sc = SparkContext("local[*]", "IP-Matcher")
ssc = StreamingContext(sc, BATCH_INTERVAL)
ips_rdd = sc.parallelize(set())
lines_ds = ssc.socketTextStream("localhost", 9999)
# split each line into IPs
ips_ds = lines_ds.flatMap(lambda line: line.split(" "))
pairs_ds = ips_ds.map(lambda ip: (ip, 1))
# join with the IPs RDD
matches_ds = pairs_ds.transform(lambda rdd: rdd.join(ips_rdd))
matches_ds.pprint()
ssc.start()
# alternate between two sets of IP addresses for the RDD
IP_FILES = ('ip_file1.txt', 'ip_file2.txt')
file_index = 0
while True:
with open(IP_FILES[file_index]) as f:
ips = f.read().splitlines()
ips_rdd = sc.parallelize(ips).map(lambda ip: (ip, 1))
print "using", IP_FILES[file_index]
file_index = (file_index + 1) % len(IP_FILES)
time.sleep(SLEEP_INTERVAL)
#ssc.awaitTermination()
在 while
循环中,我每 8 秒更改一次 ips_rdd
引用的 RDD。 join()
转换将使用 ips_rdd
当前引用的任何 RDD。
$ cat ip_file1.txt
1.2.3.4
10.20.30.40
$ cat ip_file2.txt
5.6.7.8
50.60.70.80
$ spark-submit ./match_ips_streaming_simple.py 2> err
using ip_file1.txt
-------------------------------------------
Time: 2015-09-09 17:18:20
-------------------------------------------
-------------------------------------------
Time: 2015-09-09 17:18:22
-------------------------------------------
-------------------------------------------
Time: 2015-09-09 17:18:24
-------------------------------------------
('1.2.3.4', (1, 1))
('10.20.30.40', (1, 1))
using ip_file2.txt
-------------------------------------------
Time: 2015-09-09 17:18:26
-------------------------------------------
-------------------------------------------
Time: 2015-09-09 17:18:28
-------------------------------------------
('50.60.70.80', (1, 1))
('5.6.7.8', (1, 1))
...
当上述作业正在运行时,在另一个窗口中:
$ nc -lk 9999
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8
<... wait for the other RDD to load ...>
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8
关于scala - 将 DStream 与动态数据集连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30376434/