我知道我们可以将spark与kafka集成,并将数据帧以key和value的格式写入kafka队列,如下所示
df - 数据框
df.withColumnRenamed("Column_1", "key")
.withColumnRenamed("Column_2", "value")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
但是如何将第 3、4、5 列等写入 kafka 队列呢? 如何一次性将整行写入 kafka 队列?
如有任何建议,我们将不胜感激
最佳答案
Kafka 只获取 (key, value) 形式的消息。因此,您必须将列聚合为一个值(例如 JSON)。这是示例
这应该有效:(构造适当的value_fields
)
import org.apache.spark.sql.functions._
val value_fields = df.columns.filter(_ != "Column_1")
df
.withColumnRenamed("Column_1", "key")
.withColumn("value", to_json(struct(value_fields.map(col(_)):_*)))
.select("key", "value")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
关于java - 如何将spark数据帧中的多列写入kafka队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56332202/