apache-spark - Apache Spark中的持久内存数据库

标签 apache-spark spark-streaming innodb h2 derby

我有一个用于Spark流的自定义foreach编写器。对于每一行,我都写入JDBC源。我还想在执行JDBC操作之前进行某种快速查找,并在执行JDBC操作之后更新值,例如下面的示例代码中的“Step-1”和“Step-3” ...

我不想使用REDIS,MongoDB等外部数据库。我想要低足迹的东西,例如RocksDB,Derby等。

我可以为每个应用程序存储一个文件,就像checkpointing一样,我将创建一个internal-db文件夹...

我看不到任何适用于Spark的内存数据库。

def main(args: Array[String]): Unit = {

val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();

val sparkContext = sparkSession.sparkContext;
sparkContext.setLogLevel("ERROR")
val sqlContext = sparkSession.sqlContext;

val kafkaDataframe = sparkSession.readStream.format("kafka")
  .options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
    "startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
  .load()

kafkaDataframe.printSchema()
kafkaDataframe.createOrReplaceTempView("kafka_view")
val sqlDataframe = sqlContext.sql("select concat ( topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")

val customForEachWriter = new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long) = {
    println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
    true
  }

  override def process(value: Row) = {
    // Step 1 ==> Lookup a key in persistent KEY-VALUE store

    // JDBC operations

    // Step 3 ==> Update the value in persistent KEY-VALUE store
  }

  override def close(errorOrNull: Throwable) = {
    println(" ************** Closed ****************** ")
  }
}

val yy = sqlDataframe
  .writeStream
  .queryName("foreachquery")
  .foreach(customForEachWriter)
  .start()

yy.awaitTermination()

sparkSession.close();

}

最佳答案

Manjesh,

您正在寻找的“将Spark和您的内存数据库作为一个无缝集群,共享一个进程空间”以及对MVCC的支持正是SnappyData提供的。使用SnappyData,要快速查找的表与运行Spark流作业的过程相同。看看here

SnappyData拥有核心产品的Apache V2许可证,并且OSS下载中提供了您所指的特定用途。

(公开:我是SnappyData的一名雇员,因此提供针对该问题的产品特定答案是有意义的,因为产品就是该问题的答案)

关于apache-spark - Apache Spark中的持久内存数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46245574/

相关文章:

scala - 如何根据 spark 数据框中值的累计和为每一行分配一个类别?

scala - 使用案例类将未知列添加为空

apache-spark - Spark 流 : long queued/active batches

MySQL:表 tbl_2_1_15 已满

MySQL 使用具有新值的索引发生死锁

csv - 如何在数据框中指定缺失值

scala - 使用 Spark Scala 将 HDFS 文件内容存储在 ArrayBuffer 中

scala - 在 Spark 流中使用过滤器转换后,如何在函数中返回两个 DStream?

scala insert to redis 给出任务不可序列化

java - mysql/jdbc : Deadlock