python - PySpark - 逐行转换为 JSON

标签 python json pyspark apache-spark-sql

我有一个非常大的 pyspark 数据框。我需要将每一行的数据框转换为 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题。我最初使用以下代码。

for message in df.toJSON().collect():
        kafkaClient.send(message) 

但是数据帧非常大,因此在尝试 collect() 时失败。

我正在考虑使用 UDF,因为它会逐行处理它。

from pyspark.sql.functions import udf, struct

def get_row(row):
    json = row.toJSON()
    kafkaClient.send(message) 
    return "Sent"

send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()

但是我得到一个错误,因为列被输入到函数而不是行。

为了便于说明,我们可以使用下面的 df,我们可以假设必须发送 Col1 和 Col2。

df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])

每行的 JSON 字符串:

'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'

最佳答案

你不能像这样使用select。使用foreach/foreachPartition:

import json

def send(part):
    kafkaClient = ...
    for r in part:
        kafkaClient.send(json.dumps(r.asDict()))

如果您需要诊断信息,只需使用Accumulator

在当前版本中,我会直接使用 Kafka 源代码(2.0 及更高版本):

from pyspark.sql.functions import to_json, struct

(df.select(to_json(struct([df[x] for x in df.columns])).alias("value"))
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .option("topic", topic)
    .save())

例如,您需要 Kafka SQL 包:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1

关于python - PySpark - 逐行转换为 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48542089/

相关文章:

python - 如何使用 scapy 取消设置 TCP 数据包中的 EOL 选项

python - PySpark 广播变量连接

apache-spark - PySpark 计数在 RDD 中按组区分

python - Pyspark:从 blob 存储加载 zip 文件

python - 了解错误 : 'str' object is not callable

python - 在 Reddit API 中访问列表结果的下一页

python - 如何使用 SQLAlchemy 定义没有主键的表?

c++ - 使用 boost::asio 读取 JSON 流,获取完整字符串?

javascript - AJAX打开不接受Servlet

javascript - JSON 返回 url 两次