apache-spark - 如何使用 pyspark 和自定义 python 函数处理 eventhub 流

标签 apache-spark pyspark azure-eventhub

我当前的设置是:

  • Spark 2.3.0 和 pyspark 2.2.1
  • 使用 Azure IOTHub/EventHub 的流媒体服务
  • 一些基于pandas、matplotlib等的自定义python函数

我正在使用 https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md作为如何读取数据的示例,但是:

  • 不能使用 foreach sink,因为它没有在 python 中实现
  • 当我尝试调用 .rdd、.map 或 .flatMap 时出现异常:“必须使用 writeStream.start() 执行带有流源的查询”

获取流的每个元素并通过 python 函数传递它的正确方法是什么?

谢谢,

埃德

最佳答案

在第一步中,您定义一个数据帧,从您的 EventHub 或 IoT-Hub 中读取数据作为流:

from pyspark.sql.functions import *

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

数据以二进制形式存储在 body 属性中。要获取正文的元素,您必须定义结构:

from pyspark.sql.types import *

Schema = StructType([StructField("name", StringType(), True),
                      StructField("dt", LongType(), True),
                      StructField("main", StructType( 
                          [StructField("temp", DoubleType()), 
                           StructField("pressure", DoubleType())])),
                      StructField("coord", StructType( 
                          [StructField("lon", DoubleType()), 
                           StructField("lat", DoubleType())]))
                    ])

并将架构应用于转换为字符串的主体:

from pyspark.sql.functions import *

rawData = df. \
  selectExpr("cast(Body as string) as json"). \
  select(from_json("json", Schema).alias("data")). \
  select("data.*")

在生成的数据框中,您可以应用函数,例如。 G。在“名称”列上调用自定义函数 u_make_hash:

 parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))  

关于apache-spark - 如何使用 pyspark 和自定义 python 函数处理 eventhub 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49365852/

相关文章:

python - PySpark 应用程序在 Yarn 集群模式下提交错误

apache-spark - pyspark:CrossValidator 不起作用

c# - 将消息定向给消费者

azure - 一些流在此命令完成之前终止!结构化流媒体

java - 如何使用 java 对象将两个 spark 数据集连接到一个数据集?

scala - 这是在 RDD 上实现惰性 `take` 的合适方法吗?

arrays - 用 PySpark 中的对应元素替换数组中的元素

pyspark - 基于另一个数据帧 Pyspark 1.6.1 中匹配值的子集数据帧

spark-structured-streaming - Azure 事件中心流 : Does Checkpointing override setStartingPosition?

java - Spark Streaming 单键并行