python - 在 PySpark 中读取文件并将其转换为 Pandas Dataframe 时如何将第一行作为标题

标签 python pandas apache-spark pyspark apache-spark-sql

我正在读取 PySpark 中的文件并形成它的 rdd。然后,我将其转换为普通的 dataframe,然后再转换为 pandas dataframe。我遇到的问题是我的输入文件中有标题行,我也想将其作为数据框列的标题,但它们是作为附加行而不是标题读入的。这是我当前的代码:

def extract(line):
    return line


input_file = sc.textFile('file1.txt').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)

input_data = (input_file
    .map(lambda line: line.split(";"))
    .filter(lambda line: len(line) >=0 )
    .map(extract)) # Map to tuples

df_normal = input_data.toDF()
df= df_normal.toPandas()

现在,当我查看 df 时,文本文件的标题行变为 dataframe 的第一行,并且 df 中还有其他标题> 以 0,1,2... 作为 header 。如何将第一行设为标题?

最佳答案

有几种方法可以做到这一点,具体取决于数据的确切结构。由于您没有提供任何详细信息,我将尝试使用数据文件 nyctaxicab.csv 来展示它,您可以 download .

如果你的文件是csv格式,你应该使用Databricks提供的相关spark-csv包。无需显式下载,只需按如下方式运行 pyspark:

$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0

然后

>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

>>> df.count()
249999

该文件包括标题在内有 250,000 行,因此 249,999 是实际记录的正确数量。这是包自动推断的架构:

>>> df.dtypes
[('_id', 'string'),
 ('_rev', 'string'),
 ('dropoff_datetime', 'string'),
 ('dropoff_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'string'),
 ('pickup_latitude', 'double'),
 ('pickup_longitude', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

您可以在我的relevant blog post 中查看更多详细信息.

如果出于某种原因,您不能使用 spark-csv 包,则必须从数据中减去第一行,然后使用它来构建您的模式。这是总体思路,您可以在 another blog post 中再次找到包含代码详细信息的完整示例。我的:

>>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
>>> taxiFile.count()
250000
>>> taxiFile.take(5)
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
 u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
 u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
 u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"',
 u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"']

# Construct the schema from the header 
>>> header = taxiFile.first()
>>> header
u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'
>>> schemaString = header.replace('"','')  # get rid of the double-quotes
>>> schemaString
u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
>>> schema = StructType(fields)

# Subtract header and use the above-constructed schema:
>>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job
>>> taxiHeader.collect() # for inspection purposes only
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']
>>> taxiNoHeader = taxiFile.subtract(taxiHeader)
>>> taxi_df = taxiNoHeader.toDF(schema)  # Spark dataframe
>>> import pandas as pd
>>> taxi_DF = taxi_df.toPandas()  # pandas dataframe 

为简洁起见,此处所有列最终都是 string 类型,但在 blog post 中我详细展示并解释了如何进一步优化特定字段所需的数据类型(和名称)。

关于python - 在 PySpark 中读取文件并将其转换为 Pandas Dataframe 时如何将第一行作为标题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34832312/

相关文章:

Python tkinter : Sorting the contents of an OptionMenu widget

python - Pandas read_table 多行列

python - 检查某列的值是否位于 pandas 数据框中的另一个列数组中

scala - Spark scala 抛出 java.util.NoSuchElementException : key not found: 0 exception

apache-spark - 数据流 SparkPipelineRunner - 有可用的示例吗?

python - Python 中的迭代器(循环变量)

python - 将 groupby 数据框 reshape 为固定尺寸

pandas - 使用条件计算连续的 pandas 数据帧行之间的天差

apache-spark - PySpark 找不到适合 jdbc :mysql://dbhost 的驱动程序

python - 性感无法处理 unicode 字符串?