python - 使用 Spark Streaming 读取脚本的输出输出

标签 python apache-spark pyspark spark-structured-streaming

我有一个类似这个的脚本

import json 
def line_generator():
    d = dict({1:1, 2:2, 3:3})
    while True:
        yield json.dumps(d)

it = line_generator()
for l in it:
    print(l)

哪些输出值到标准输出。我想用 spark Streaming api“捕获”这些值以将它们存储在 Parquet 文件中,并应用一些由 HiveQL 编写的推理代码。我不是 Scala 的人:/所以,如果可能的话,我更愿意在 PySpark 中找到一个解决方案,但我对任何建议都很满意。

例如,我知道可以读取来自 Kafka 流的数据流,是否有类似的方法来读取发送到标准输出的数据或连续写入文件的数据?
预先感谢您的帮助

最佳答案

我不会使用 stdout,因为 spark 通常用于具有多个节点的集群。更好的方法是 kafka(它还允许您临时存储数据并且更可靠)或套接字。下面是套接字示例(基于 Daniel Hynk ):

#send your data
import socket
import json          

def line_generator():
    d = dict({1:1, 2:2, 3:3}) 
    while True:
        yield json.dumps(d)

hostip = '127.0.0.1'
portno = 56789 

#listener need to be started before!
#try: netcat -lkp 56789
#before you start with spark streaming

soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
soc.connect((hostip, portno))

it = line_generator()
for l in it:
    soc.send(l.encode('utf8'))

Spark 结构化流示例:

hostip = '127.0.0.1'
portno = '56789'

received = spark.readStream.format("socket").option("host", hostip).option("port", portno).load()

#the value column of a structured stream contains the content
values = received.select(received.value)

###
#do your stuff
###

#will listen to the specified port and write the results to memory!!! until you call query.stop()
#this allows you to see the data with select * from mystream
query = values.writeStream.queryName("mystream").outputMode("complete").format("memory").start()

当然你不想最后把它写到内存但是它大大加快了开发速度。完成程序后,只需将最后一行更改为 guide 中提到的类似下面的内容:

writeStream.format("parquet").option("path", "path/to/destination/dir").start()

关于python - 使用 Spark Streaming 读取脚本的输出输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57391589/

相关文章:

python - 如何让 nosetests 按顺序运行测试?

python - PySpark - 如何根据坐标矩阵中表示的相似性获取 top-k id?

python-3.x - 有没有办法在 Databricks 上测试我的 Pyspark 笔记本

apache-spark - 获取 StructType 格式的 Parquet 文件的架构

python - 如何获得 Python 编程的概述和方法

python - 通过创建可重用模板来最大限度地减少代码重复

Python - 获取根项目结构的路径

scala - 通过spark从kafka到hdfs

apache-spark - Spark 结构化流可扩展性和重复问题

python - 让消费者从发布/订阅主题中读取消息并将其写入 NoSQL 数据库的最常见方法是什么?