scala - 使用 Redis 进行 Spark 结构化流动态查找

标签 scala apache-spark redis streaming lookup

我是 Spark 新手。 我们目前正在 build 一条管道:

  1. 读取 Kafka 主题中的事件
  2. 借助 Redis-Lookup 丰富这些数据
  3. 将事件写入新的 Kafka 主题

所以,我的问题是,当我想使用 Spark-redis 库时,它的性能非常好,但数据在我的流处理作业中保持静态。

虽然数据在 Redis 中刷新,但它没有反射(reflect)到我的数据帧中。 Spark 首先读取数据,然后从不更新它。 另外,我首先从 REDIS 数据中读取,总数据约为 1mio key-val 字符串。

我可以采取什么样的方法/方法,我想使用Redis作为内存动态查找。 查找表的变化时间接近 1 小时。

谢谢。

使用的库: Spark-redis-2.4.1.jar commons-pool2-2.0.jar jedis-3.2.0.jar

这是代码部分:

import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events") 


val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
        .alias("C"))


import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*") 
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )



val curated_df = my_raw_events_df 
.select(

     ...
     $"C.SystemEntryDate".alias("RecordCreateDate")
    ,$"C.Profile".alias("ProfileCode")     
    ,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
    ,$"C.IdentityType"     
    ,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")     
     ...

).as("C")



import org.apache.spark.sql.streaming.Trigger

val query = curated_df
   .select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
   .writeStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
   .option("topic", "curated-event")
   .option("checkpointLocation","/user/spark/checkPointLocation/xyz")
   .trigger(Trigger.ProcessingTime("30 seconds"))
   .start()

   query.awaitTermination()

最佳答案

一种选择是不使用spark-redis,而是直接在Redis中查找。这可以通过 df.mapPartitions 函数来实现。您可以在此处找到 Spark DStreams 的一些示例 https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/ 。结构流的想法是相似的。注意正确处理Redis连接。

关于scala - 使用 Redis 进行 Spark 结构化流动态查找,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64592973/

相关文章:

json - Play 2.1特质的Json序列化?

apache-spark - 在本地模式下启动 spark 时如何更改主 web ui 端口

redis - 如何在python中获取redis(codis版本)中的所有键

php - 脚本/队列不断耗尽内存

django - 如何使用 Pytest 在 Django 中测试缓存存储(Redis)?

scala - 如何等待多个 future ?

scala - 为什么我不能重写 Scala 中采用值类作为参数的方法?

scala - 我如何从 finagle 客户端发出带有参数的请求?

apache-spark - 使用 kotlin 编程 Apache spark 2.0

scala - Dataproc 返回 StatusRuntimeException 找不到集群