scala - 在Databricks中将数据从CSV格式传输到Redis哈希格式

标签 scala apache-spark redis databricks azure-databricks

我有一个分为三部分的Azure系统:

  • 我有一些csv文件的Azure Data Lake存储。
  • 我需要进行一些处理的Azure Databricks-正是将csv文件转换为Redis哈希格式。
  • Azure Redis缓存,我应该将转换后的数据放在其中。

  • 将存储装入databricks文件系统后,需要处理一些数据。
    如何将位于databricks文件系统中的csv数据转换为redisHash格式,并正确地将其放入Redis?
    具体来说,我不确定如何通过以下代码进行正确的映射。或者,也许有一些我无法找到的额外转移到SQL表的方法。
    这是我在scala上编写的代码示例:
    import com.redislabs.provider.redis._
    
    val redisServerDnsAddress = "HOST"
    val redisPortNumber = 6379
    val redisPassword = "Password"
    val redisConfig = new RedisConfig(new RedisEndpoint(redisServerDnsAddress, redisPortNumber, redisPassword))
    
    
    val data = spark.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/mnt/staging/data/file.csv")
    
    // What is the right way of mapping?
    val ds = table("data").select("Prop1", "Prop2", "Prop3", "Prop4", "Prop5" ).distinct.na.drop().map{x =>
      (x.getString(0), x.getString(1), x.getString(2), x.getString(3), x.getString(4))
    }
    
    sc.toRedisHASH(ds, "data")
    
    错误:
    error: type mismatch;
     found   : org.apache.spark.sql.Dataset[(String, String)]
     required: org.apache.spark.rdd.RDD[(String, String)]
    sc.toRedisHASH(ds, "data")
    
    如果我这样写最后一个代码字符串:
    sc.toRedisHASH(ds.rdd, "data")
    
    错误:
    org.apache.spark.sql.AnalysisException: Table or view not found: data;
    

    最佳答案

    准备一些样本数据以模拟从CSV文件加载的数据。

        val rdd = spark.sparkContext.parallelize(Seq(Row("1", "2", "3", "4", "5", "6", "7")))
        val structType = StructType(
          Seq(
            StructField("Prop1", StringType),
            StructField("Prop2", StringType),
            StructField("Prop3", StringType),
            StructField("Prop4", StringType),
            StructField("Prop5", StringType),
            StructField("Prop6", StringType),
            StructField("Prop7", StringType)
          )
        )
        val data = spark.createDataFrame(rdd, structType)
    
    转型:
    val transformedData = data.select("Prop1", "Prop2", "Prop3", "Prop4", "Prop5").distinct.na.drop()
    
    将数据帧写入Redis,使用Prop1作为键,并使用data作为Redis表名。参见docs
        transformedData
          .write
          .format("org.apache.spark.sql.redis")
          .option("key.column", "Prop1")
          .option("table", "data")
          .mode(SaveMode.Overwrite)
          .save()
    
    检查Redis中的数据:
    127.0.0.1:6379> keys data:*
    1) "data:1"
    
    127.0.0.1:6379> hgetall data:1
    1) "Prop5"
    2) "5"
    3) "Prop2"
    4) "2"
    5) "Prop4"
    6) "4"
    7) "Prop3"
    8) "3"
    

    关于scala - 在Databricks中将数据从CSV格式传输到Redis哈希格式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64759315/

    相关文章:

    scala - 明确的隐含

    Scala:如何定义 "generic"函数参数?

    node.js - 使用node序列化redis数据

    python - Redis:查找 SINTER 结果的 SCARD,而不存储中间集

    redis - 我如何使用 phusion/baseimage-docker "dockerize"一个 redis 服务

    events - Scala Swing 事件中使用的反引号

    scala - Scala Play 中的状态是如何管理的! 2.0 网络套接字?

    apache-spark - Spark可以用于实时交互应用吗?

    python - Spark DataFrame 聚合和分组多个列,同时保留顺序

    scala - 使用 Spark 和 Scala 将数据插入 Hive 表时出现问题