我有一个类似这个的脚本
import json
def line_generator():
d = dict({1:1, 2:2, 3:3})
while True:
yield json.dumps(d)
it = line_generator()
for l in it:
print(l)
哪些输出值到标准输出。我想用 spark Streaming api“捕获”这些值以将它们存储在 Parquet 文件中,并应用一些由 HiveQL 编写的推理代码。我不是 Scala 的人:/所以,如果可能的话,我更愿意在 PySpark 中找到一个解决方案,但我对任何建议都很满意。
例如,我知道可以读取来自 Kafka 流的数据流,是否有类似的方法来读取发送到标准输出的数据或连续写入文件的数据?
预先感谢您的帮助
最佳答案
我不会使用 stdout,因为 spark 通常用于具有多个节点的集群。更好的方法是 kafka(它还允许您临时存储数据并且更可靠)或套接字。下面是套接字示例(基于 Daniel Hynk ):
#send your data
import socket
import json
def line_generator():
d = dict({1:1, 2:2, 3:3})
while True:
yield json.dumps(d)
hostip = '127.0.0.1'
portno = 56789
#listener need to be started before!
#try: netcat -lkp 56789
#before you start with spark streaming
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.connect((hostip, portno))
it = line_generator()
for l in it:
soc.send(l.encode('utf8'))
Spark 结构化流示例:
hostip = '127.0.0.1'
portno = '56789'
received = spark.readStream.format("socket").option("host", hostip).option("port", portno).load()
#the value column of a structured stream contains the content
values = received.select(received.value)
###
#do your stuff
###
#will listen to the specified port and write the results to memory!!! until you call query.stop()
#this allows you to see the data with select * from mystream
query = values.writeStream.queryName("mystream").outputMode("complete").format("memory").start()
当然你不想最后把它写到内存但是它大大加快了开发速度。完成程序后,只需将最后一行更改为 guide 中提到的类似下面的内容:
writeStream.format("parquet").option("path", "path/to/destination/dir").start()
关于python - 使用 Spark Streaming 读取脚本的输出输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57391589/