hadoop - map 转换性能 spark dataframe 与 RDD

标签 hadoop apache-spark pyspark mapr

我有一个四节点 hadoop 集群 (mapr),每个集群有 40GB 内存。我需要在大数据集(5 亿行)的其中一个字段上“应用”一个函数。我的代码流程是,我从配置单元表中读取数据作为 spark 数据帧,并在其中一列上应用所需的函数,如下所示:

schema = StructType([StructField("field1", IntegerType(), False), StructField("field2", StringType(), False),StructField("field3", FloatType(), False)])
udfCos = udf(lambda row: function_call(row), schema)
result = SparkDataFrame.withColumn("temp", udfCos(stringArgument))

类似的 RDD 版本可能如下所示:

result = sparkRDD.map(lambda row: function_call(row))

我想提高这段代码的性能,确保代码以最大的并行度和降低的吞吐量运行——我需要帮助使用 spark 概念,例如“重新分区”、“SparkConf 中的并行度值”或其他方法,在我的问题的背景下。任何帮助表示赞赏。

我的spark启动参数:

MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 10 --driver-cores 10 --driver-memory 30g --executor-memory 7g --executor-cores 5 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="150"

最佳答案

为了调整您的应用程序,您需要了解一些事情

1) 您需要监控您的应用程序,无论您的集群是否未得到充分利用,您创建的应用程序使用了多少资源

可以使用各种工具进行监控,例如。 Ganglia 在 Ganglia 中,您可以找到 CPU、内存和网络使用情况。

2) 根据对 CPU 和内存使用情况的观察,您可以更好地了解您的应用程序需要什么样的调优

形成你的 Spark 点

在 spark-defaults.conf 中

您可以指定需要什么样的序列化,您的应用程序需要多少驱动程序内存和执行程序内存,甚至您可以更改垃圾收集算法。

下面是几个例子,你可以根据你的要求调整这个参数

spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.executor.extraJavaOptions  -XX:MaxPermSize=2G -XX:+UseG1GC
spark.driver.extraJavaOptions    -XX:MaxPermSize=6G -XX:+UseG1GC

有关更多详细信息,请参阅 http://spark.apache.org/docs/latest/tuning.html

关于hadoop - map 转换性能 spark dataframe 与 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39126117/

相关文章:

python - Spark mllib 线性回归给出非常糟糕的结果

hadoop - Hadoop DB 的 ORM 支持

hadoop - 如何将 tar.gz 中的多个文件加载到 Pig 中

apache-spark - Spark 结构化流恰好一次 - 未实现 - 重复事件

azure - 从 Python Notebook 使用 Markdown 调用 SQL Notebook 会导致 "Parse Exception"

apache-spark - 在循环内使用 sparkDF.write.saveAsTable() 会导致作业之间的延迟呈指数增长

hadoop fs -text 返回 InvocationTargetException

hadoop - Hive 脚本运行时间过长

scala - Spark : split only one column in dataframe and keep remaining columns as it is

python - 在pySpark中处理空数组(可选的二进制元素(UTF8)不是一个组)