python - 在 Pyspark rdd 中更改 saveAsTextFile 选项中的分隔符

标签 python hadoop apache-spark pyspark rdd

我的数据集在 HDFS 中可用。我正在阅读它并执行过滤操作。

dir = sc.textFile('/datasets/DelayedFlights.csv').filter(lambda x: 
int(x.split(',')[24]) == 1).map(lambda y: y.split(','))
The output of above operation is
[u'1763', u'2008', u'1', u'3', u'4', u'922.0', u'915', u'', u'1050', u'WN', 
u'1069', u'N630WN', u'', u'95.0', u'', u'', u'7.0', u'SAN', u'SMF', u'480', 
u'', u'12.0', u'0', u'N', u'1', u'', u'', u'', u'', u'']
[u'1911', u'2008', u'1', u'3', u'4', u'2325.0', u'1900', u'', u'2030', 
u'WN', u'2092', u'N302SW', u'', u'90.0', u'', u'', u'265.0', u'SFO', u'SAN', 
u'447', u'', u'11.0', u'0', u'N', u'1', u'', u'', u'', u'', u'']
[u'2651', u'2008', u'1', u'4', u'5', u'1949.0', u'1905', u'', u'1910', 
u'WN', u'1403', u'N504SW', u'', u'65.0', u'', u'', u'44.0', u'BOI', u'RNO', 
u'335', u'', u'11.0', u'0', u'N', u'1', u'', u'', u'', u'', u'']

我想使用带有制表符分隔符的 saveAsTextFile 将上述文件保存到 HDFS 路径 谁能告诉我如何在 python 中将分隔符从逗号更改为制表符

最佳答案

实现此目的的一种方法是将 RDD 转换为数据帧,并使用 csv 格式保存数据帧,并将分隔符选项设置为制表符,如下所示。

rdd = spark.sparkContext.parallelize([['1763', '2008', '1', '3', '4', '922.0'], ['1763', '2008', '1', '3', '4', '922.0'], ['1763', '2008', '1', '3', '4', '922.0']])
df = spark.createDataFrame(rdd.map(lambda x: tuple(x)))
df.write.format('com.databricks.spark.csv').option("delimiter", '\t').save('/path/to/csv/file/')

如果您不想将 rdd 转换为数据框,请遵循以下代码段。

rdd.map(lambda x: '\t'.join(x)).saveAsTextFile('test_dir/output')

与上述方法相比,推荐使用 DataFrame 方法。

关于python - 在 Pyspark rdd 中更改 saveAsTextFile 选项中的分隔符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44356885/

相关文章:

Python 通过 SSH 身份验证克隆私有(private) GitHub 存储库,无需访问 ssh 二进制文件(在 Azure Functions 下)

scala - 在 spark-streaming 上下文中将 RDD 写入 HDFS

scala - 如何将 Spark 行(StructType)转换为 scala 案例类

java - 如何在触发时在java代码中为hadoop作业设置优先级?

hadoop - 从HDFS将Jar文件添加到Hive

apache-spark - 如何将 Spark 指标发送到独立集群上的 Graphite?

python - PySpark 将 DataFrame 保存到实际的 JSON 文件

python - pandas dataframe groupby 索引并将行值转换为列

javascript - 将向量绘制为直线

python - 在 recordlinkage 库中检索匹配的记录 ID