python - 使用 pyspark 将 zip 压缩的 csv 转换为 parquet

标签 python csv apache-spark pyspark parquet

我在 S3 上存储了一个 zip 压缩的 csv。我想将此文件转换为 parquet 格式,并在 csv 中的特定列上分区。当我尝试以下操作时(使用 Python 3.6.5 和 Pyspark 2.7.14):

from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder.appName("Python Spark SQL basic example").config('spark.hadoop.fs.s3a.access.key','<my access key>').config('spark.hadoop.fs.s3a.secret.key','<my secret key>').getOrCreate()

df = spark.read.csv("s3a://mybucket/path/myfile.zip")
df.show(n=10)

这是输出:

+--------------------+                                                          
|                 _c0|
+--------------------+
|PK-    4*PM<ȏ...|
|����W����lq��...|
|jk�ulE����
           Uձ�...|
|U횵Сc�=t�kd�0z...|
|T�;t��gն>t�:�y...|
|ݵK!뼠PT���DЉ*�...|
|�}�B��h)t����H!k?...|
|              ��y�B֧|
|��� �1�NTȞB(�...|
+--------------------+
only showing top 10 rows

当我使用以下方法转换为 Parquet 时:

df.write.partitionBy("column").parquet("s3a://otherbucket/path/myfile_partitioned",mode="overwrite")

S3 中的结果与源文件中的实际列值不匹配。

我也尝试过使用:

sqlctx = SQLContext(spark)
df = sqlctx.read.csv("s3a://cta-ridership/seeds/nm45dayall_2.zip")

但是结果是一样的。我的 csv 有问题吗?我是 Pyspark 的新手,所以我可能会错过一些基本的东西。

更新:根据@Prazy 的帮助,我已将代码更新为:

spark = SparkSession.builder.appName("Python Spark SQL basic example").config('spark.hadoop.fs.s3a.access.key','<my key>').config('spark.hadoop.fs.s3a.secret.key','<my secret key>').getOrCreate()
sc = spark.sparkContext

rdd = sc.textFile("s3a://mybucket/mypath/myfile.zip")
print(rdd.take(10))

但这仍然返回:

['PK\x03\x04-\x00\x00\x00\t\x004*PM<ȏ\x1f��������\x13\x00\x14\x00nm45dayall_2017.csv\x01\x00\x10\x00� �\x07\x00\x00\x00�_J�\x00\x00\x00\x00����㺳���;�7Q�;�"%R�a�{;ܶg��3��9���\x0b\x17I��<Y��of��ڿU��\x19�\x01A\x10\x04\x12�?��\x7f�\x1f������/����������\x7f��\x7f�����?���������?��\x7f�������\x7f����?�����\x7f���������������������\x1f��\x7f����_����\x7f�\x7f��n\x7f������?�����?��������_�\x7f\x1b����\x7f���������g�\\�i�]���\x7f�����3���������ǟ��<_����}���_�������?�\x7f�n�1h��t��.5�Z\x05ͺk��1Zߎ��\x16���ڿE�A��\x1fE�<UAs\x11���z�\\�n��s��\x1ei�XiĐ\x1ej\x0cݪ������`�yH�5��8Rh-+�K�\x11z�N�)[�v̪}', "���\x10�W�\x07���\x12l\x10q��� �qq,i�6ni'��\x10\x14�h\x08��V\x04]��[P�!h�ڢ���GwF\x04=�.���@��>����h", 'jk�\x1culE\x15����\x0cUձ\x7f���#\x1d��\x10Tu���o����\x0eݎ\x16�E\x0f\x11r�q\x08Ce[�\x0c\x0e�s\x10z�?Th\x1aj��O\x1f�\x0f�\x10A��X�<�HC�Y�=~;���!', 'U횵Сc�=t�k\x15d�0\x14z\x16\x1d��R\x05M��', 'T�;t��\x10\x11gն>t�\x01:�y:�c�U��\x1d\x7ff�Т�a', 'ݵ\x19K!뼠PT�\x11��DЉ*\x10\u2d2e�<d� Й��\x08AQ\x03\x04AQ�� {��P����\x1e��Z\x7f���AG�3�b\x19T�E�%;�"ޡ�El�rס�}��qg���qg|������7�8�k\x1e:j�\x7f���c�Bv���\\t�[�ܚ�nz��PU���(\x14��\x08�����CϢc�=|\x14���Ⱥ', ')d]�\x10Z�o\x0e:�v����\x0er�oѣj��\x06DA%b�>', '�}�B��h)t����H!k?R�zf)���5k�B��h?�h���Ao}�S��\x17i\x14�\x1eU', '��y�B֧', '��\x16� �1�NT\x1b1ȞB(�\x16�k\x7f�B!�d��m\x0c:�\x03��˵\x1f�����ޥa�\x16@� ���V"Ա�k']

更新 再次感谢 Prazy 的帮助。我正在尝试使用以下方法将 RDD 转换为数据帧:

spark = SparkSession.builder.appName("Python Spark SQL basic example").config('spark.hadoop.fs.s3a.access.key','<mykey>').config('spark.hadoop.fs.s3a.secret.key','<myotherkey>').getOrCreate()

sc = spark.sparkContext
schema = StructType([
StructField("YYYYMMDD", IntegerType(), True),
StructField("ENTRANCE_ID", IntegerType(), True),
StructField("FARE_MEDIA_TYPE", IntegerType(), True),
StructField("TRANS_EVENT", IntegerType(), True),
StructField("HALFHOUR", FloatType(), True),
StructField("RIDES", IntegerType(), True)])

rdd = sc.textFile("s3a://mybucket/path/myfile.zip")
df = sqlctx.createDataFrame(rdd, schema)

df.show(n=10)

最佳答案

不直接支持 zip 文件。您可以点击链接herehere尝试解决方法。如果可能,使用 gzip 和其他支持的格式

关于python - 使用 pyspark 将 zip 压缩的 csv 转换为 parquet,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52879669/

相关文章:

sql - 使用Spark SQL从SQL Server读取数据

python - 如何使用 SparkSession 从列表创建数据框?

python - 如何在给定的上下文(本地和全局)中执行可调用?

Python3 XML 获取标签之间的文本

python - 将列表组合成一个列表,其中每个元素都是各个元素的总和

c - 如何在 C 中编辑 .csv 文件

python - optparse csv.reader

python - 在 python 中创建文件,指示整数是否在另一个列表中

java - 使用与时间相对应的 Csv 文件获取奇怪的输出

hadoop - Spark SQL saveAsTable 返回空结果