我当前的设置是:
- Spark 2.3.0 和 pyspark 2.2.1
- 使用 Azure IOTHub/EventHub 的流媒体服务
- 一些基于pandas、matplotlib等的自定义python函数
我正在使用 https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md作为如何读取数据的示例,但是:
- 不能使用 foreach sink,因为它没有在 python 中实现
- 当我尝试调用 .rdd、.map 或 .flatMap 时出现异常:“必须使用 writeStream.start() 执行带有流源的查询”
获取流的每个元素并通过 python 函数传递它的正确方法是什么?
谢谢,
埃德
最佳答案
在第一步中,您定义一个数据帧,从您的 EventHub 或 IoT-Hub 中读取数据作为流:
from pyspark.sql.functions import *
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()
数据以二进制形式存储在 body 属性中。要获取正文的元素,您必须定义结构:
from pyspark.sql.types import *
Schema = StructType([StructField("name", StringType(), True),
StructField("dt", LongType(), True),
StructField("main", StructType(
[StructField("temp", DoubleType()),
StructField("pressure", DoubleType())])),
StructField("coord", StructType(
[StructField("lon", DoubleType()),
StructField("lat", DoubleType())]))
])
并将架构应用于转换为字符串的主体:
from pyspark.sql.functions import *
rawData = df. \
selectExpr("cast(Body as string) as json"). \
select(from_json("json", Schema).alias("data")). \
select("data.*")
在生成的数据框中,您可以应用函数,例如。 G。在“名称”列上调用自定义函数 u_make_hash:
parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))
关于apache-spark - 如何使用 pyspark 和自定义 python 函数处理 eventhub 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49365852/