我有一个当前在我的桌面上运行的 python 脚本。它需要一个包含大约 2500 万行(可能有 15 列左右)的 csv 文件,并逐行执行操作。
对于每一行输入,都会产生多条输出行。然后将结果逐行输出到 csv 文件中,最终输出约为 1 亿行。
代码看起来像这样:
with open(outputfile,"a") as outputcsv:
with open(inputfile,"r") as input csv:
headerlist=next(csv.reader(csvfile)
for row in csv.reader(csvfile):
variable1 = row[headerlist.index("VAR1")]
variableN = row[headerlist.index("VARN")]
while calculations not complete:
do stuff #Some complex calculations are done at this point
outputcsv.write(stuff)
我们现在正在尝试使用 pyspark 将脚本转换为通过 Hadoop 运行。
我什至不知道如何开始。我正在尝试解决如何遍历 RDD 对象,但不认为它可以完成。
这样的逐行计算是否适合分布式处理?
最佳答案
如果你想直接运行脚本,你可以通过 spark-submit 来实现:
spark-submit master local[*]/yarn other_parameters path_to_your_script.py
但我建议使用 spark API,因为它们易于使用。它将降低编码开销。
首先,您必须创建一个 spark session 变量,以便您可以访问所有 spark 函数:
spark = SparkSession
.builder()
.appName("SparkSessionZipsExample")
.config("parameters", "value")
.getOrCreate()
接下来,如果要加载 csv 文件:
file = spark.read.csv("path to file")
您可以指定可选参数,例如 header 、推断模式等:
file=spark.read.option("header","true").csv("path to your file")
'file' 现在将是一个 pyspark 数据框。
您现在可以像这样编写最终输出:
file.write.csv("output_path")
请引用文档:spark documentation用于转换和其他信息。
关于python - 将 Python 脚本转换为能够在 Spark/Hadoop 中运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50253560/