apache-spark - 将数据从ElasticSearch读取到Spark数据集中

标签 apache-spark elasticsearch apache-spark-dataset

使用elasticsearch-hadoop库,我想直接从ElasticSearch读取数据到Spark数据集。但是,该API返回RDD [(String,Map [String,Any])],其中元组的第一个元素是文档名称,第二个( map )是数据本身。
我想将其转换为Dataset [T],其中T是某种案例类,以使返回的数据更易于使用。我会考虑使用其他一些库(找不到任何库)或简洁的代码解决方案。

最佳答案

我写了一个函数stringMapRddToDataset 来做到这一点。我觉得应该有一个整体上更好的方法来执行此操作……还担心此解决方案的效率,但是我尚未在大量数据上进行过测试。

  private def mapToSparkRow(map: collection.Map[String, Any], orderedFields: List[StructField]): Row = {
    val orderedValues = orderedFields.map { field =>
      val columnValue = map.getOrElse(field.name, null)
      field.dataType match {
        case nestedField: StructType =>
          mapToSparkRow(columnValue.asInstanceOf[Map[String, Any]], nestedField.toList)
        case notNested => columnValue
      }
    }
    Row(orderedValues: _*)
  }

  def stringMapRddToDataset[T: Encoder](rdd: RDD[collection.Map[String, Any]])(
      implicit spark: SparkSession): Dataset[T] = {
    val encoder             = implicitly[Encoder[T]]
    val rddOfRows: RDD[Row] = rdd.map(mapToSparkRow(_, encoder.schema.toList))
    val df                  = spark.createDataFrame(rddOfRows, encoder.schema)
    df.as[T]
  }

关于apache-spark - 将数据从ElasticSearch读取到Spark数据集中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62951744/

相关文章:

Elasticsearch - 'Day of Week' 的日期时间映射

apache-spark - 与RDD相比,DataSet的性能优势

apache-spark - Spark DataFrame RangePartitioner

performance - 如何将 Iterable[String, String, String] 转换为 DataFrame?

amazon-web-services - 不清楚在 aws cloudformation yaml 模板中的何处添加 --conf spark.jars.packages=org.apache.spark :spark-avro_2. 11 :2. 4.4

多个字段的 ElasticSearch 排序顺序

scala - scala 的 "collect"采用偏函数的 Spark 数据集等价物

scala - 使用来自另一列的键从 MapType 列查找值

scala - Spark Windowsspec 滞后函数计算累积分数

elasticsearch - Elasticsearch-按工作日和小时进行的嵌套聚合