python-2.7 - PySpark - 从文本文件创建数据框

标签 python-2.7 apache-spark apache-spark-sql spark-dataframe pyspark-sql

我有一个简单的文本文件,其中包含“交易”。

第一行是列名,例如“START_TIME”、“END_TIME”、“SIZE”……大约 100 个列名。

文件中的列名没有引号。

我想使用 Spark,将此文件转换为带有列名的数据框,

然后从文件中删除所有列,但删除一些特定的列。

我在将文本文件转换为数据框时遇到了一些麻烦。

到目前为止,这是我的代码:

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# Load relevant objects
sc = SparkContext('local')
log_txt = sc.textFile("/path/to/text/file.txt")
sqlContext = SQLContext(sc)

# Construct fields with names from the header, for creating a DataFrame
header = log_txt.first()
fields = [StructField(field_name, StringType(), True)
      for field_name in header.split(',')]

# Only columns\fields 2,3,13,92 are relevant. set them to relevant types
fields[2].dataType = TimestampType()    # START_TIME in yyyymmddhhmmss format
fields[3].dataType = TimestampType()    # END_TIME in yyyymmddhhmmss
fields[13].dataType = IntegerType()     # DOWNSTREAM_SIZE, in bytes
fields[92].dataType = BooleanType()     # IS_CELL_CONGESTED, 0 or 1
schema = StructType(fields)             # Create a schema object

# Build the DataFrame
log_txt = log_txt.filter(lambda line: line != header) # Remove header from the txt file
temp_var = log_txt.map(lambda k: k.split("\t"))

log_df = sqlContext.createDataFrame(temp_var, schema) # PROBLEMATIC LINE

我的问题是最后一行,我担心在最后一步之前我错过了一些步骤。

你能帮我确定缺少哪些步骤吗?

最后一行代码产生了很多错误。
如果需要,将在帖子中更新它们。

文件格式为(2 行示例)
TRANSACTION_URL,RESPONSE_CODE,START_TIME,END_TIME,.... <more names>
http://www.google.com<\t seperator>0<\t seperator>20160609182001<\t seperator>20160609182500.... <more values>
http://www.cnet.com<\t seperator>0<\t seperator>20160609192001<\t seperator>20160609192500.... <more values>

另外,有人可以帮我从数据框中删除不需要的列吗?

谢谢

最佳答案

我觉得你有点想多了。
想象一下我们有一些不那么复杂的东西,下面的例子

`cat sample_data.txt`
field1\tfield2\tfield3\tfield4
0\tdog\t20160906182001\tgoogle.com
1\tcat\t20151231120504\tamazon.com

打开 pyspark
sc.setLogLevel("WARN")
#setup the same way you have it
log_txt=sc.textFile("/path/to/data/sample_data.txt")
header = log_txt.first()

#filter out the header, make sure the rest looks correct
log_txt = log_txt.filter(lambda line: line != header)
log_txt.take(10)
  [u'0\\tdog\\t20160906182001\\tgoogle.com', u'1\\tcat\\t20151231120504\\tamazon.com']

temp_var = log_txt.map(lambda k: k.split("\\t"))

#here's where the changes take place
#this creates a dataframe using whatever pyspark feels like using (I think string is the default). the header.split is providing the names of the columns
log_df=temp_var.toDF(header.split("\\t"))
log_df.show()
+------+------+--------------+----------+
|field1|field2|        field3|    field4|
+------+------+--------------+----------+
|     0|   dog|20160906182001|google.com|
|     1|   cat|20151231120504|amazon.com|
+------+------+--------------+----------+
#note log_df.schema
#StructType(List(StructField(field1,StringType,true),StructField(field2,StringType,true),StructField(field3,StringType,true),StructField(field4,StringType,true)))

# now lets cast the columns that we actually care about to dtypes we want
log_df = log_df.withColumn("field1Int", log_df["field1"].cast(IntegerType()))
log_df = log_df.withColumn("field3TimeStamp", log_df["field1"].cast(TimestampType()))

log_df.show()
+------+------+--------------+----------+---------+---------------+
|field1|field2|        field3|    field4|field1Int|field3TimeStamp|
+------+------+--------------+----------+---------+---------------+
|     0|   dog|20160906182001|google.com|        0|           null|
|     1|   cat|20151231120504|amazon.com|        1|           null|
+------+------+--------------+----------+---------+---------------+
log_df.schema
StructType(List(StructField(field1,StringType,true),StructField(field2,StringType,true),StructField(field3,StringType,true),StructField(field4,StringType,true),StructField(field1Int,IntegerType,true),StructField(field3TimeStamp,TimestampType,true)))

#now let's filter out the columns we want
log_df.select(["field1Int","field3TimeStamp","field4"]).show()
+---------+---------------+----------+
|field1Int|field3TimeStamp|    field4|
+---------+---------------+----------+
|        0|           null|google.com|
|        1|           null|amazon.com|
+---------+---------------+----------+

数据框需要为它遇到的每个字段都有一个类型,是否实际使用该字段取决于您。
您必须使用 spark.SQL 函数之一将字符串日期转换为实际时间戳,但不应该太难。

希望这有帮助

PS:对于您的具体情况,要制作初始数据框,请尝试:log_df=temp_var.toDF(header.split(','))

关于python-2.7 - PySpark - 从文本文件创建数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41144218/

相关文章:

python - 将二进制转换为十进制整数输出

apache-spark - Python pyspark array_contains 不区分大小写

java - 用 Java Lambda 编写的 Spark UDF 引发 ClassCastException

scala - Spark union 因嵌套 JSON 数据帧而失败

dataframe - 逐行计算pyspark数据帧中的空数

python-2.7 - 在 Windows 8 上安装 Python 2.7

python - 字典中的键和 dict.keys() 中的键有什么区别?

python-2.7 - 如何使用 cpppo 写入 Compactlogix plc?

java - 通过 Spark 进行 Cassandra 数据聚合

python - PySpark jdbc谓词错误: Py4JError: An error occurred while calling o108. jdbc