scala - 将 Cassandra 查询数据组合/更新到从 Kafka 接收的结构化流

标签 scala apache-spark cassandra spark-structured-streaming

我正在创建一个 Spark 结构化流应用程序,它将每 10 秒计算一次从 Kafka 接收的数据。

为了能够进行一些计算,我需要查找有关 Cassandra 数据库中的传感器和放置情况的一些信息

我有点困惑如何保持 Cassandra 数据在整个集群中可用,并以某种方式不时更新数据,以防我们对数据库表进行了一些更改。

目前,我使用 Datastax Spark-Cassandra-connector 在本地启动 Spark 后立即查询数据库

val cassandraSensorDf = spark
  .read
  .cassandraFormat("specifications", "sensors")
  .load

从这里开始,我可以通过将这个 cassandraSensorDs 与我的结构化流数据集连接来使用它。

.join(
   cassandraSensorDs ,
   sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)

在运行结构化流时,如何执行其他查询来更新此 Cassandra 数据? 如何使查询的数据在集群设置中可用?

最佳答案

使用广播变量,您可以编写一个包装器来定期从 Cassandra 获取数据并更新广播变量。使用广播变量在流上执行映射端连接。我还没有测试过这种方法,我认为这可能是一种矫枉过正,具体取决于您的用例(吞吐量)。

How can I update a broadcast variable in spark streaming?

另一种方法是查询 Cassandra 以获取流中的每个项目,为了优化连接,您应该确保使用连接池并为 JVM/分区仅创建一个连接。这种方法更简单,您不必担心定期加热 Cassandra 数据。

spark-streaming and connection pool implementation

关于scala - 将 Cassandra 查询数据组合/更新到从 Kafka 接收的结构化流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49863343/

相关文章:

scala - 在 Scala IDE 中运行简单的 Spark 代码

scala - 如何在命令行中从 build.sbt 中提取数据

python - 断言错误 : all exprs should be Column

apache-spark - PySpark - 有没有办法水平连接两个数据帧,以便第一个 df 中的每一行都具有第二个 df 中的所有行

java - DefaultEntityManager 不允许我使用 <UUID, String> 的 ColumnFamily

node.js - cassandra 在执行时继续运行

scala - sbt 查找所请求依赖项的另一个版本

python - Spark 流 : read CSV string from kafka, 写入 Parquet

java - 现有应用程序从 Cassandra DB 迁移到 Cosmos DB

scala - 减少 Scala 程序中 Spark 的日志输出