scala - Spark 结构化流媒体 avro 到 avro 和自定义 Sink

标签 scala apache-kafka avro spark-structured-streaming

有人可以给我推荐一个在 S3 或任何文件系统中编写 avro 的好例子吗?我正在使用自定义 Sink,但我想通过 SinkProvider 的构造函数传递一些属性 Map,我猜这些属性可以进一步传递给 Sink?

更新的代码:

val query = df.mapPartitions { itr =>
  itr.map { row =>
    val rowInBytes = row.getAs[Array[Byte]]("value")
    MyUtils.deserializeAvro[GenericRecord](rowInBytes).toString
  }
}.writeStream
  .format("com.test.MyStreamingSinkProvider")
  .outputMode(OutputMode.Append())
  .queryName("testQ" )
  .trigger(ProcessingTime("10 seconds"))
  .option("checkpointLocation", "my_checkpoint_dir")
  .start()

query.awaitTermination()

接收器提供者:

class MyStreamingSinkProvider extends StreamSinkProvider {

  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    new MyStreamingSink
  }
}

水槽:

class MyStreamingSink extends Sink with Serializable {

  final val log: Logger = LoggerFactory.getLogger(classOf[MyStreamingSink])

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    //For saving as text doc
    data.rdd.saveAsTextFile("path")

    log.warn(s"Total records processed: ${data.count()}")
    log.warn("Data saved.")
  }
}

最佳答案

您应该能够通过 writeStream.option(key, value) 将参数传递到自定义接收器:

DataStreamWriter writer = dataset.writeStream()
  .format("com.test.MyStreamingSinkProvider")
  .outputMode(OutputMode.Append())
  .queryName("testQ" )
  .trigger(ProcessingTime("10 seconds"))
  .option("key_1", "value_1")
  .option("key_2", "value_2")
  .start()

在这种情况下,方法MyStreamingSinkProvider.createSink(...)中的参数将包含key_1key_2

关于scala - Spark 结构化流媒体 avro 到 avro 和自定义 Sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49339469/

相关文章:

java - 无法在kafka流中加载类 "org.slf4j.impl.StaticLoggerBinder"?

python - 如何使用 kafka-python 计算主题中的记录数(消息)

hadoop - Parquet API 没有Keys 的概念?

apache - 如何以图形方式表示和操作 apache avro 架构

performance - Protocol Buffers 和 Avro 中 ZigZag 编码背后的原因是什么?

scala - 在 spark-shell 中加载伴随对象

scala - 为什么启动StreamingContext失败并显示“IllegalArgumentException:要求失败:未注册任何输出操作,所以没有任何执行”?

scala - 无法找到 IntelliJ Idea 12 的 SBT 插件

scala - 经销商(服务器)到经销商( worker )不工作

python - 反序列化 Avro 消息