apache-spark - 如何将流数据集写入Kafka?

标签 apache-spark apache-kafka spark-structured-streaming

我正在尝试对主题数据进行一些丰富。因此,使用 Spark 结构化流从 Kafka sink 读取回 Kafka。

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

但我有一个异常(exception):
Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

任何解决方法?

最佳答案

Spark 2.1(目前是 Spark 的最新版本)没有它。下一个版本 - 2.2 - 将拥有 Kafka Writer,see this commit .

Kafka Sink 与 Kafka Writer 相同。

关于apache-spark - 如何将流数据集写入Kafka?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42996293/

相关文章:

python - 从 kafka 导入 KafkaClient ImportError : No module named kafka

apache-kafka - Apache Kafka使用者组的偏移量如何过期?

spring-boot - spring boot消费者不消费任何消息

apache-spark - 如何解决向 Hive 表发送大文件时的连接问题?

java - 连接spark master java的安全异常

apache-spark - 如何使 Spark 驱动程序对 Master 重启具有弹性?

python - 如何在Spark中使用ElasticSearch在脚本文档中更新或部分更新?

scala - 如何在RDD中展平列表?

scala - 生成用于查找的单行数据框

mongodb - 如何将spark结构流写入mongodb集合?