apache-spark - Spark Streaming 数据集 Cassandra 连接 UnsupportedOperationChecker

标签 apache-spark cassandra spark-structured-streaming spark-cassandra-connector

我正在尝试将流数据集写入 Cassandra。

我有以下类的流数据集;

case class UserSession(var id: Int,
                       var visited: List[String]
                      )

我在 Cassandra 中还有以下键空间/表。 (博客=KeySpace, session =表

CREATE KEYSPACE blog WITH REPLICATION = { 'class' : 'SimpleStrategy',    'replication_factor' : 1 };


CREATE TABLE blog.session(id int PRIMARY KEY, visited list<text>);

我选择了list<text>对于访问过的,因为我访问过的类型是 List<String>

我的foreach作家如下

class SessionCassandraForeachWriter extends ForeachWriter[UserSession] {

/*
  - on every batch, on every partition `partitionId`
    - on every "epoch" = chunk of data
      - call the open method; if false, skip this chunk
      - for each entry in this chunk, call the process method
      - call the close method either at the end of the chunk or with an error if it was thrown
 */

val keyspace = "blog"
val table = "session"
val connector = CassandraConnector(sparkSession.sparkContext.getConf)

override def open(partitionId: Long, epochId: Long): Boolean = {
  println("Open connection")
  true
}

override def process(sess: UserSession): Unit = {
  connector.withSessionDo { session =>
    session.execute(
      s"""
         |insert into $keyspace.$table("id")
         |values (${sess.id},${sess.visited})
       """.stripMargin)
  }
}

override def close(errorOrNull: Throwable): Unit = println("Closing connection")

 }

查看我的流程函数可能会有所帮助,因为这可能会引发错误。我的主要内容如下。

finishedUserSessionsStream:数据集[UserSession]

def main(args: Array[String]): Unit = {
/// make finishedUserSessionStreams.....

finishedUserSessionsStream.writeStream
      .option("checkpointLocation", "checkpoint")
      .foreach(new SessionCassandraForeachWriter)
      .start()
      .awaitTermination()

}

这给了我以下错误

org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:431)

最佳答案

对于 Spark 3.0 和 Spark Cassandra Connector 3.0.0,您不应使用 foreach - 这是 SCC < 2.5.0 的解决方法,没有对写入流数据集的 native 支持。 Starting with SCC 2.5.0 ,您可以直接将数据写入Cassandra,如下所示(这里是 full example ):

     val query = streamingCountsDF.writeStream
      .outputMode(OutputMode.Update)
      .format("org.apache.spark.sql.cassandra")
      .option("checkpointLocation", "checkpoint")
      .option("keyspace", "ks")
      .option("table", "table")
      .start()

您还需要切换到使用包含大量修复的 SCC 3.0.0-beta。

关于apache-spark - Spark Streaming 数据集 Cassandra 连接 UnsupportedOperationChecker,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63461453/

相关文章:

apache-spark - 如何调试/获取 SparkR Java 后端故障的日志?

database - Cassandra 中的映射冗余

Cassandra 。没有足够的可用副本 - Java 驱动程序行为与 CQL 控制台不同

scala - 如何将 Spark Structured Streaming 与 Kafka Direct Stream 一起使用?

scala - 将包含值的列作为列表转换为数组

apache-spark - 使用 Keras 模型作为 Apache Spark 和 Elephas 的广播变量

java - Spark 流 : Why internal processing costs are so high to handle user state of a few MB?

cassandra - Spark 与 Cassandra 配置

scala - 结构化流 - Foreach Sink

apache-spark - 如何从Kafka读取XML格式的流数据?