我的代码首先使用正则表达式提取数据并将该数据写入文本文件(字符串格式)。 然后,我尝试根据文本文件中的内容创建一个数据框,以便我可以拥有单独的列,从而导致错误。 (将其写入 csv 文件会将整个内容写入一列)。
with open("C:\\Sample logs\\dataframe.txt",'a') as f:
f.write(str(time))
f.write(" ")
f.write(qtype)
f.write(" ")
f.write(rtype)
f.write(" ")
f.write(domain)
f.write("\n")
new = sc.textFile("C:\\Sample logs\\dataframe.txt").cache() # cause df requires an rdd
lines1 = new.map(lambda x: (x, ))
df = sqlContext.createDataFrame(lines1)
但是我收到以下错误:
TypeError: Can not infer schema for type: type 'unicode'
我尝试了一些其他方法,但没有帮助。我想要做的就是在执行写入操作后,我想创建一个具有单独列的数据帧以便使用 groupBy()。
文本文件中的输入:
1472128348.0 HTTP - tr.vwt.gsf.asfh
1472237494.63 HTTP - tr.sdf.sff.sdfg
1473297794.26 HTTP - tr.asfr.gdfg.sdf
1474589345.0 HTTP - tr.sdgf.gdfg.gdfg
1472038475.0 HTTP - tr.sdf.csgn.sdf
预期的 csv 格式输出:
The same thing as above but separated into columns so i can perform groupby operations.
最佳答案
为了将“空格分隔的单词”替换为您需要替换的单词列表:
lines1 = new.map(lambda x: (x, ))
与
lines1 = new.map(lambda line: line.split(' '))
我在我的机器上尝试过,执行以下命令后
df = sqlContext.createDataFrame(lines1)
创建了一个新的 DF:
df.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
|-- _4: string (nullable = true)
df.show()
+-------------+----+---+-----------------+
| _1| _2| _3| _4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP| -| tr.vwt.gsf.asfh|
|1472237494.63|HTTP| -| tr.sdf.sff.sdfg|
|1473297794.26|HTTP| -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP| -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP| -| tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+
您可以执行groupBy:
>>> df2 = df.groupBy("_1")
>>> type(df2)
<class 'pyspark.sql.group.GroupedData'>
>>>
为了使用架构,您需要首先定义它: 请参阅:https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
下面可以找到架构示例(您需要添加字段、更新名称、键入以便将其应用到您的案例中)
from pyspark.sql.types import *
schema = StructType([
StructField("F1", StringType(), True),
StructField("F2", StringType(), True),
StructField("F3", StringType(), True),
StructField("F4", StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
之后您将能够使用架构运行它:
df = sqlContext.createDataFrame(lines1,schema)
现在,您将获得字段的名称:
df.show()
+-------------+----+---+-----------------+
| F1| F2| F3| F4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP| -| tr.vwt.gsf.asfh|
|1472237494.63|HTTP| -| tr.sdf.sff.sdfg|
|1473297794.26|HTTP| -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP| -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP| -| tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+
为了将其保存为 CSV,您需要使用“to_pandas()”和“to_csv()” (Python pandas 的一部分)
http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html
df.toPandas().to_csv('mycsv.csv')
csv文件的内容:
cat mycsv.csv
,F1,F2,F3,F4
0,1472128348.0,HTTP,-,tr.vwt.gsf.asfh
1,1472237494.63,HTTP,-,tr.sdf.sff.sdfg
2,1473297794.26,HTTP,-,tr.asfr.gdfg.sdf
3,1474589345.0,HTTP,-,tr.sdgf.gdfg.gdfg
4,1472038475.0,HTTP,-,tr.sdf.csgn.sdf
请注意,您可以使用“.cast()”来转换列,例如将 F1 转换为 float 类型 - 添加一个 float 类型的新列,并删除旧列)
df = df.withColumn("F1float", df["F1"].cast("float")).drop("F1")
关于python - 如何将spark数据输出到具有单独列的csv文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39270584/