redis - 我们如何将Spark结构化流连接到Redis?

标签 redis apache-spark-sql spark-structured-streaming

我的目标是从Redis和进程中获取流数据。如何通过Spark结构化流连接和处理数据?

最佳答案

要从Spark中的Redis Streams中读取数据,我们需要确定如何连接到Redis,以及Redis Streams中数据的架构结构。

要连接到Redis,我们必须创建一个带有Redis连接参数的新SparkSession:

import com.redislabs.provider.redis._
import redis.clients.jedis.Jedis

object Samj45 {
    def main(args: Array[String]): Unit = {
         val spark = SparkSession
                     .builder()
                     .appName("redis-example")
                     .master("local[*]")
                     .config("spark.redis.host", "localhost")
                     .config("spark.redis.port", "6379")
                     .getOrCreate()

         val data_from_redis = spark
                     .readStream
                     .format("redis")
                     .option("stream.keys","data_clicks")
                     .schema(StructType(Array(
                           StructField("asset", StringType),
                           StructField("cost", LongType)
                      )))
                      .load()

对于编写,您可以使用ForeachWriter。让我知道是否有帮助。

关于redis - 我们如何将Spark结构化流连接到Redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61838264/

相关文章:

symfony - 失去与 Redis 服务的连接

scala - 在 Scala Spark 中加入不同的 Dataframe 时动态选择多个列

scala - 为什么 Spark 应用程序以 “ClassNotFoundException: Failed to find data source: kafka” 作为带有 sbt 程序集的 uber-jar 失败?

apache-spark - 如何使用 PySpark 转换结构化流?

performance - 执行redis操作的Go代码中缺少毫秒

mysql - Magento2 With Redis,错误:MySQL适配器:缺少必需的配置选项 'host'(有时)

spring - 使用Lettuce+SpringDataRedis进行Redis哨兵认证

hadoop - Spark : Hive Insert overwrite throws ClassNotFoundException

PySpark:如何将行转换为向量?

scala - Spark Structured Streaming DataFrame 上的排序操作