python - PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?

标签 python apache-spark bigdata pyspark rdd

我有一个非常大的 pyspark.sql.dataframe.DataFrame 名为 df。 我需要一些枚举记录的方法——因此,能够访问具有特定索引的记录。 (或选择具有索引范围的记录组)

在 Pandas 中,我可以做到

indexes=[2,3,6,7] 
df[indexes]

我想要类似的东西,(并且没有将数据框转换为 pandas)

我能得到的最接近的是:

  • 通过以下方式枚举原始数据框中的所有对象:

    indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
    
    • 使用 where() 函数搜索我需要的值。

问题:

  1. 为什么它不起作用以及如何让它起作用?如何向数据框添加一行?
  2. 以后做这样的东西行吗:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    
  3. 有没有更快更简单的处理方法?

最佳答案

它不起作用,因为:

  1. withColumn 的第二个参数应该是一个Column 而不是一个集合。 np.array 在这里不起作用
  2. 当您将 "index in indexes" 作为 SQL 表达式传递给 whereindexes 超出范围并且未解析为有效标识符

PySpark >= 1.4.0

您可以使用相应的窗口函数添加行号,并使用 Column.isin 方法或格式正确的查询字符串进行查询:

from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window

w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))

# Using DSL
indexed.where(col("index").isin(set(indexes)))

# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))

看起来没有 PARTITION BY 子句调用的窗口函数将所有数据移动到单个分区,所以上面可能不是最好的解决方案。

Any faster and simpler way to deal with it?

不是真的。 Spark DataFrame 不支持随机行访问。

PairedRDD 可以使用 lookup 方法访问,如果使用 HashPartitioner 对数据进行分区,该方法相对较快。还有indexed-rdd支持高效查找的项目。

编辑:

独立于 PySpark 版本你可以尝试这样的事情:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

row = Row("char")
row_with_index = Row("char", "index")

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)

## +----+
## |char|
## +----+
## |   a|
## |   b|
## |   c|
## |   d|
## |   e|
## +----+
## only showing top 5 rows

# This part is not tested but should work and save some work later
schema  = StructType(
    df.schema.fields[:] + [StructField("index", LongType(), False)])

indexed = (df.rdd # Extract rdd
    .zipWithIndex() # Add index
    .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
    .toDF(schema)) # It will work without schema but will be more expensive

# inSet in Spark < 1.3
indexed.where(col("index").isin(indexes))

关于python - PySpark DataFrames - 在不转换为 Pandas 的情况下进行枚举的方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32760888/

相关文章:

python - 为什么即使使用 limit 命令访问结果,SPARK\PYSPARK 也会计算所有内容?

scala - 无法使用 Scala 在 Apache Spark 中执行用户定义的函数

hadoop - 我是否需要为 Hadoop 集群的所有主机使用相同的配置?

python - mypy AnyStr 在简单示例的赋值中给出了不兼容的类型

python - 使用 JRuby/Jython 实现 Ruby/Python 互操作性?

scala - 如何从 HDFS 检索 Avro 数据?

hadoop - nutch1.14去重失败

python - 在Python中从第n行读取大型CSV文件(不是从头开始)

python - 如何逐列迭代 pandas 数据框,一次返回一个项目

python - 如何在 post 查询中传递 python 列表?