csv - 将唯一的连续行号添加到 pyspark 中的数据框

标签 csv dataframe pyspark rdd

我想在 pyspark 中将唯一行号添加到我的数据框中,并且不想使用 monotonicallyIncreasingId 和 partitionBy 方法。 我认为这个问题可能与之前提出的类似问题重复,无论我是否以正确的方式做,仍在寻找一些建议。 以下是我的代码片段: 我有一个包含以下输入记录集的 csv 文件:

1,VIKRANT SINGH RANA    ,NOIDA   ,10000
3,GOVIND NIMBHAL        ,DWARKA  ,92000
2,RAGHVENDRA KUMAR GUPTA,GURGAON ,50000
4,ABHIJAN SINHA         ,SAKET   ,65000
5,SUPER DEVELOPER       ,USA     ,50000
6,RAJAT TYAGI           ,UP      ,65000
7,AJAY SHARMA           ,NOIDA   ,70000
8,SIDDHARTH BASU        ,SAKET   ,72000
9,ROBERT                ,GURGAON ,70000

我已将此 csv 文件加载到数据框中。

PATH_TO_FILE="file:///u/user/vikrant/testdata/EMP_FILE.csv"

emp_df = spark.read.format("com.databricks.spark.csv") \
  .option("mode", "DROPMALFORMED") \
  .option("header", "true") \
  .option("inferschema", "true") \
  .option("delimiter", ",").load(PATH_TO_FILE)

+------+--------------------+--------+----------+
|emp_id|            emp_name|emp_city|emp_salary|
+------+--------------------+--------+----------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|
|     5|SUPER DEVELOPER  ...|USA     |     50000|
|     6|RAJAT TYAGI      ...|UP      |     65000|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|
|     9|ROBERT           ...|GURGAON |     70000|
+------+--------------------+--------+----------+

empRDD = emp_df.rdd.zipWithIndex()
newRDD=empRDD.map(lambda x: (list(x[0]) + [x[1]]))
 newRDD.take(2);
[[1, u'VIKRANT SINGH RANA    ', u'NOIDA   ', 10000, 0], [3, u'GOVIND NIMBHAL        ', u'DWARKA  ', 92000, 1]]

当我将 int 值包含到我的列表中时,我丢失了数据框架构。

newdf=newRDD.toDF(['emp_id','emp_name','emp_city','emp_salary','row_id'])
newdf.show();

+------+--------------------+--------+----------+------+
|emp_id|            emp_name|emp_city|emp_salary|row_id|
+------+--------------------+--------+----------+------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|     0|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|     1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|     2|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|     3|
|     5|SUPER DEVELOPER  ...|USA     |     50000|     4|
|     6|RAJAT TYAGI      ...|UP      |     65000|     5|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|     6|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|     7|
|     9|ROBERT           ...|GURGAON |     70000|     8|
+------+--------------------+--------+----------+------+

我做的对吗?或者有没有更好的方法在 pyspark 中添加或保留数据框的架构?

是否也可以使用 zipWithIndex 方法为大型数据框添加唯一的连续行号?我们可以使用此 row_id 重新分区数据帧以在分区之间均匀分布数据吗?

最佳答案

我找到了一个解决方案,而且非常简单。 因为我的数据框中没有在所有行中具有相同值的列,所以在将 row_number 与 partitionBy 子句一起使用时不会生成唯一的行号。

让我们向现有数据框中添加一个新列,其中包含一些默认值。

emp_df= emp_df.withColumn("new_column",lit("ABC"))

并使用该列“new_column”创建一个带有 paritionBy 的窗口函数

w = Window().partitionBy('new_column').orderBy(lit('A'))
df = emp_df.withColumn("row_num", row_number().over(w)).drop("new_column")

你会得到想要的结果:

+------+--------------------+--------+----------+-------+
|emp_id|            emp_name|emp_city|emp_salary|row_num|
+------+--------------------+--------+----------+-------+
|     1|VIKRANT SINGH RAN...|NOIDA   |     10000|      1|
|     2|RAGHVENDRA KUMAR ...|GURGAON |     50000|      2|
|     7|AJAY SHARMA      ...|NOIDA   |     70000|      3|
|     9|ROBERT           ...|GURGAON |     70000|      4|
|     4|ABHIJAN SINHA    ...|SAKET   |     65000|      5|
|     8|SIDDHARTH BASU   ...|SAKET   |     72000|      6|
|     5|SUPER DEVELOPER  ...|USA     |     50000|      7|
|     3|GOVIND NIMBHAL   ...|DWARKA  |     92000|      8|
|     6|RAJAT TYAGI      ...|UP      |     65000|      9|
+------+--------------------+--------+----------+-------+

关于csv - 将唯一的连续行号添加到 pyspark 中的数据框,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53082891/

相关文章:

pyspark - kafka到pyspark结构化流,将json解析为数据帧

javascript - 如何从 csv 文件中提取特定对象?

pandas 中的正则表达式根据另一列中的字符串查找匹配项

r - 如何将最新的 csv 文件导入 RStudio

c# - C# 中的 Deedle 数据帧按行切片

python - Pandas - Groupby 条件公式

python-3.x - Py4JJavaError : An error occurred while calling o26. Parquet 。 (阅读 Parquet 文件)

python - 如何根据列索引列表从pyspark中的csv文件中选择某些列,然后确定它们的不同长度

string - 如何从matlab中的表格单元格中提取字符串

linux - CSV Bash 循环变量问题