我在 xml 文件中有 2700 万条记录,我想将其推送到 elasticsearch 索引中
下面是用 spark scala 编写的代码片段,我将创建一个 spark 作业 jar 并在 AWS EMR 上运行
我怎样才能有效地使用 Spark 来完成这个练习?请指导。
我有一个 12.5 gb 的 gzipped xml,我正在将其加载到 spark 数据帧中。我是 Spark 的新手 ..(我应该拆分这个 gzip 文件吗?还是 spark 执行程序会处理它?)
class ReadFromXML {
def createXMLDF(): DataFrame = {
val spark: SparkSession = SparkUtils.getSparkInstance("Spark Extractor")
import spark.implicits._
val m_df: DataFrame = SparkUtils.getDataFrame(spark, "temp.xml.gz").coalesce(5)
var new_df: DataFrame = null
new_df = m_df.select($"CountryCode"(0).as("countryCode"),
$"PostalCode"(0).as("postalCode"),
$"state"(0).as("state"),
$"county"(0).as("county"),
$"city"(0).as("city"),
$"district"(0).as("district"),
$"Identity.PlaceId".as("placeid"), $"Identity._isDeleted".as("deleted"),
$"FullStreetName"(0).as("street"),
functions.explode($"Text").as("name"), $"name".getField("BaseText").getField("_VALUE")(0).as("nameVal"))
.where($"LocationList.Location._primary" === "true")
.where("(array_contains(_languageCode, 'en'))")
.where(functions.array_contains($"name".getField("BaseText").getField("_languageCode"), "en"))
new_df.drop("name")
}
}
object PushToES extends App {
val spark = SparkSession
.builder()
.appName("PushToES")
.master("local[*]")
.config("spark.es.nodes", "awsurl")
.config("spark.es.port", "port")
.config("spark.es.nodes.wan.only", "true")
.config("spark.es.net.ssl", "true")
.getOrCreate()
val extractor = new ReadFromXML()
val df = extractor.createXMLDF()
df.saveToEs("myindex/_doc")
}
更新 1:我已将每个文件拆分为 68M,读取这个文件需要 3.7 分钟
我在尝试使用 snappy 而不是 gzip 压缩编解码器
因此将 gz 文件转换为 snappy 文件并在配置中添加如下
.config("spark.io.compression.codec", "org.apache.spark.io.SnappyCompressionCodec")
但它返回空数据帧df.printschema 只返回“root”
更新 2:
我已经设法以 lzo 格式运行..解压缩和加载数据帧所需的时间非常少。
迭代每个大小为 140 MB 的 lzo 压缩文件并创建数据帧是个好主意吗?
或者
我应该在数据框中加载 10 个文件集吗?
或者
我应该在单个数据帧中加载所有 200 个 lzo 压缩文件,每个 140MB 吗?如果是,那么应该为 master 分配多少内存,因为我认为这将加载到 master 上?
从 s3 存储桶读取文件时,“s3a”uri 可以提高性能吗?或“s3”uri 可以用于 EMR?
更新 3:
测试一小组 10 个 lzo 文件。我使用了以下配置。
EMR 集群总共耗时 56 分钟,其中步骤(Spark 应用程序)耗时 48 分钟处理 10 个文件
1位大师 - m5.xlarge
4 vCore,16 GiB 内存,仅 EBS 存储
EBS 存储:32 GiB
2芯 - m5.xlarge
4 vCore,16 GiB 内存,仅 EBS 存储
EBS 存储:32 GiB
使用从 https://idk.dev/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/ 学习到的以下 Spark 调谐参数
[
{
"Classification": "yarn-site",
"Properties": {
"yarn.nodemanager.vmem-check-enabled": "false",
"yarn.nodemanager.pmem-check-enabled": "false"
}
},
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "false"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.network.timeout": "800s",
"spark.executor.heartbeatInterval": "60s",
"spark.dynamicAllocation.enabled": "false",
"spark.driver.memory": "10800M",
"spark.executor.memory": "10800M",
"spark.executor.cores": "2",
"spark.executor.memoryOverhead": "1200M",
"spark.driver.memoryOverhead": "1200M",
"spark.memory.fraction": "0.80",
"spark.memory.storageFraction": "0.30",
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.yarn.scheduler.reporterThread.maxFailures": "5",
"spark.storage.level": "MEMORY_AND_DISK_SER",
"spark.rdd.compress": "true",
"spark.shuffle.compress": "true",
"spark.shuffle.spill.compress": "true",
"spark.default.parallelism": "4"
}
},
{
"Classification": "mapred-site",
"Properties": {
"mapreduce.map.output.compress": "true"
}
}
]
最佳答案
不是一个完整的答案,但仍然有点长评论。我想提出一些建议。
目前尚不清楚,但我认为您担心的是执行时间。正如评论中所建议的,您可以通过向集群添加更多节点/执行程序来提高性能。如果 gzip 文件是在 spark 中没有分区的情况下加载的,那么您应该将其拆分为合理的大小。 (不要太小 - 这会使处理速度变慢。不要太大 - 执行程序将运行 OOM)。parquet
使用 Spark 时是一种很好的文件格式。如果您可以将 XML 转换为 Parquet 。它是 super 压缩和轻量级的。
阅读您的评论,coalesce
不做完全洗牌。合并算法通过将数据从某些分区移动到现有分区来更改节点数。这个算法显然不能增加分区数。使用 repartition
反而。该操作成本高,但可以增加分区数。查看更多事实:https://medium.com/@mrpowers/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4
关于scala - 高效使用Apache Spark将数据推送到elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63501883/