java - 如何将spark数据帧中的多列写入kafka队列

标签 java apache-spark dataframe apache-kafka

我知道我们可以将spark与kafka集成,并将数据帧以key和value的格式写入kafka队列,如下所示

df - 数据框

 df.withColumnRenamed("Column_1", "key")
 .withColumnRenamed("Column_2", "value")
 .write()
 .format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .save()

但是如何将第 3、4、5 列等写入 kafka 队列呢? 如何一次性将整行写入 kafka 队列?

如有任何建议,我们将不胜感激

最佳答案

Kafka 只获取 (key, value) 形式的消息。因此,您必须将列聚合为一个值(例如 JSON)。这是示例

这应该有效:(构造适当的value_fields)

import org.apache.spark.sql.functions._

val value_fields = df.columns.filter(_ != "Column_1") 

df
.withColumnRenamed("Column_1", "key")
.withColumn("value", to_json(struct(value_fields.map(col(_)):_*)))
.select("key", "value")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()

关于java - 如何将spark数据帧中的多列写入kafka队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56332202/

相关文章:

r - 从矩阵创建数据框

java - 集成 .net 和 java 应用程序的架构建议

apache-spark - PySpark - 将所有数据框列字符串拆分为数组

apache-spark - 在 kafka 流上使用 Spark 流作业

python - 删除 Pandas 数据框中的特殊字符

python - 如何根据另一个数据帧上的列对数据帧的行进行分类?

java - 两个不同的对象打印相同的属性值

java - 将 log4j.properties 移出 jar 导致 log4j 警告(不生成日志)

java - 使用 Intent 强制关闭从主类切换到 ListActivity

python - 从发送到 spark-submit 的外部 __main__ 文件修改 SparkContext