scala - 高效使用Apache Spark将数据推送到elasticsearch

标签 scala apache-spark elasticsearch apache-spark-sql

我在 xml 文件中有 2700 万条记录,我想将其推送到 elasticsearch 索引中
下面是用 spark scala 编写的代码片段,我将创建一个 spark 作业 jar 并在 AWS EMR 上运行
我怎样才能有效地使用 Spark 来完成这个练习?请指导。
我有一个 12.5 gb 的 gzipped xml,我正在将其加载到 spark 数据帧中。我是 Spark 的新手 ..(我应该拆分这个 gzip 文件吗?还是 spark 执行程序会处理它?)

class ReadFromXML {

  def createXMLDF(): DataFrame = {
    val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
    import spark.implicits._
    val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)

    var new_df: DataFrame = null
      
      new_df = m_df.select($"CountryCode"(0).as("countryCode"),
        $"PostalCode"(0).as("postalCode"),
        $"state"(0).as("state"),
        $"county"(0).as("county"),
        $"city"(0).as("city"),
        $"district"(0).as("district"),
        $"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
        $"FullStreetName"(0).as("street"),
        functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal"))
        .where($"LocationList.Location._primary" === "true")
        .where("(array_contains(_languageCode, 'en'))")
        .where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
    

    new_df.drop("name")
  }
}

object PushToES extends App {
  val spark = SparkSession
    .builder()
    .appName("PushToES")
    .master("local[*]")
    .config("spark.es.nodes", "awsurl")
    .config("spark.es.port", "port")
    .config("spark.es.nodes.wan.only", "true")
    .config("spark.es.net.ssl", "true")
    .getOrCreate()

  val extractor = new ReadFromXML()

  val df = extractor.createXMLDF()
  df.saveToEs("myindex/_doc")
}
更新 1:
我已将每个文件拆分为 68M,读取这个文件需要 3.7 分钟
我在尝试使用 snappy 而不是 gzip 压缩编解码器
因此将 gz 文件转换为 snappy 文件并在配置中添加如下
.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
但它返回空数据帧
df.printschema 只返回“root”
更新 2:
我已经设法以 lzo 格式运行..解压缩和加载数据帧所需的时间非常少。
迭代每个大小为 140 MB 的 lzo 压缩文件并创建数据帧是个好主意吗?
或者
我应该在数据框中加载 10 个文件集吗?
或者
我应该在单个数据帧中加载所有 200 个 lzo 压缩文件,每个 140MB 吗?如果是,那么应该为 master 分配多少内存,因为我认为这将加载到 master 上?
从 s3 存储桶读取文件时,“s3a”uri 可以提高性能吗?或“s3”uri 可以用于 EMR?
更新 3:
测试一小组 10 个 lzo 文件。我使用了以下配置。
EMR 集群总共耗时 56 分钟,其中步骤(Spark 应用程序)耗时 48 分钟处理 10 个文件
1位大师 - m5.xlarge
4 vCore,16 GiB 内存,仅 EBS 存储
EBS 存储:32 GiB
2芯 - m5.xlarge
4 vCore,16 GiB 内存,仅 EBS 存储
EBS 存储:32 GiB
使用从 https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/ 学习到的以下 Spark 调谐参数
[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.nodemanager.vmem-check-enabled": "false",
      "yarn.nodemanager.pmem-check-enabled": "false"
    }
  },
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "false"
    }
  },
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.network.timeout": "800s",
      "spark.executor.heartbeatInterval": "60s",
      "spark.dynamicAllocation.enabled": "false",
      "spark.driver.memory": "10800M",
      "spark.executor.memory": "10800M",
      "spark.executor.cores": "2",
      "spark.executor.memoryOverhead": "1200M",
      "spark.driver.memoryOverhead": "1200M",
      "spark.memory.fraction": "0.80",
      "spark.memory.storageFraction": "0.30",
      "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
      "spark.yarn.scheduler.reporterThread.maxFailures": "5",
      "spark.storage.level": "MEMORY_AND_DISK_SER",
      "spark.rdd.compress": "true",
      "spark.shuffle.compress": "true",
      "spark.shuffle.spill.compress": "true",
      "spark.default.parallelism": "4"
    }
  },
  {
    "Classification": "mapred-site",
    "Properties": {
      "mapreduce.map.output.compress": "true"
    }
  }
]

最佳答案

不是一个完整的答案,但仍然有点长评论。我想提出一些建议。
目前尚不清楚,但我认为您担心的是执行时间。正如评论中所建议的,您可以通过向集群添加更多节点/执行程序来提高性能。如果 gzip 文件是在 spark 中没有分区的情况下加载的,那么您应该将其拆分为合理的大小。 (不要太小 - 这会使处理速度变慢。不要太大 - 执行程序将运行 OOM)。parquet使用 Spark 时是一种很好的文件格式。如果您可以将 XML 转换为 Parquet 。它是 super 压缩和轻量级的。
阅读您的评论,coalesce不做完全洗牌。合并算法通过将数据从某些分区移动到现有分区来更改节点数。这个算法显然不能增加分区数。使用 repartition反而。该操作成本高,但可以增加分区数。查看更多事实:https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

关于scala - 高效使用Apache Spark将数据推送到elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63501883/

相关文章:

java - Spark 1.5.2 和 SLF4J StaticLoggerBinder

python - pyspark中导入多个py文件

apache-spark - SAP Vora 2.1 是否需要 Hadoop/Spark 集群?并且可以使用 PySpark 吗?

c# - 利用NEST for ES FunctionScore具有可变数量的功能

elasticsearch - Elasticsearch -查询值

scala - 值(value) ||不是字符串的成员 - scala

scala - Scala 中如何检查字符串是否为十进制数

scala - def fn[String] 似乎破坏了 Scala/java.lang.String 兼容性

scala - Spark Scala TF-IDF 值排序向量

java - 在调用 ElasticSearch API 之前预处理输入文本