apache-spark - Azure DataBricks Stream foreach 因 NotSerializableException 而失败

标签 apache-spark redis apache-kafka databricks azure-databricks

我想不断详细说明数据集流的行(最初由 Kafka 发起):基于我想更新 Radis 哈希的条件。这是我的代码片段(lastContacts 是前一个命令的结果,它是这种类型的流:org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long]。这扩展为 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]):

class MyStreamProcessor extends ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
  }

  override def close(errorOrNull: Throwable): Unit = {}
}

val query = lastContacts
  .writeStream
  .foreach(new MyStreamProcessor())
  .start()

query.awaitTermination()

我收到一个巨大的堆栈跟踪,相关部分(我认为)是这样的:java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter

谁能解释为什么会出现这种异常以及如何避免?谢谢!

这个问题与以下两个有关:

最佳答案

Spark 上下文不可序列化。

ForeachWriter 的任何实现都必须是可序列化的,因为每个任务都将获得所提供对象的新序列化-反序列化副本。因此,强烈建议在调用 open(...) 方法后完成写入数据的任何初始化(例如打开连接或启动事务),这表示任务已准备好生成数据。

在您的代码中,您试图在 process 方法中使用 spark 上下文,

override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    *sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
  }

要向redis发送数据,需要自己创建连接并在open方法中打开,然后在process方法中使用。

看看如何创建redis连接池。 https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

关于apache-spark - Azure DataBricks Stream foreach 因 NotSerializableException 而失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55362469/

相关文章:

apache-spark - 在 Spark 中压缩序列文件?

scala - Spark 数据帧 : Accessing next record in map function

redis - RedissonClient.getSet ("my-set").size() 返回一个数字后,为什么该集合在下次运行之前显示为空?

java - 相当于python中的objectmapper

apache-spark - 获取 Spark Dataframe 中特定单元格的值

java - Spark 2.4.0 的 shuffle block 大小仍然有 2GB 限制?

ruby-on-rails - rails 和 node.js 之间的共享身份验证与 redis 存储

redis - 部署 Redis Broker,然后将 Redis 作为服务部署到 Cloud Foundry

apache-kafka - HDF 模式注册表和 Confluence 模式注册表之间的主要区别是什么?

apache-kafka - 为微服务扩展 Kafka