apache-spark - Spark 加载数据并将文件名添加为数据框列

标签 apache-spark pyspark apache-spark-sql

我正在使用包装函数将一些数据加载到 Spark 中:

def load_data( filename ):
    df = sqlContext.read.format("com.databricks.spark.csv")\
        .option("delimiter", "\t")\
        .option("header", "false")\
        .option("mode", "DROPMALFORMED")\
        .load(filename)
    # add the filename base as hostname
    ( hostname, _ ) = os.path.splitext( os.path.basename(filename) )
    ( hostname, _ ) = os.path.splitext( hostname )
    df = df.withColumn('hostname', lit(hostname))
    return df

具体来说,我使用 glob 一次加载一堆文件:

df = load_data( '/scratch/*.txt.gz' )

这些文件是:

/scratch/host1.txt.gz
/scratch/host2.txt.gz
...

我希望“主机名”列实际上包含正在加载的文件的真实名称而不是全局名称(即 host1host2 等,而不是 * )。

我怎样才能做到这一点?

最佳答案

您可以使用 input_file_name哪一个:

Creates a string column for the file name of the current Spark task.



from  pyspark.sql.functions import input_file_name

df.withColumn("filename", input_file_name())

在 Scala 中同样的事情:

import org.apache.spark.sql.functions.input_file_name

df.withColumn("filename", input_file_name)

关于apache-spark - Spark 加载数据并将文件名添加为数据框列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39868263/

相关文章:

scala - 从数据帧列读取路径并从数据帧添加另一列

apache-spark - 使用 Dataframes 从 Informix 到 Spark 的 JDBC

apache-spark - 是作为执行引擎还是应用程序?

python - PySpark DataFrames - 使用不同类型的列之间的比较进行过滤

apache-spark - PySpark-列中的to_date格式

java - 重新加载的 Spark 模型似乎不起作用

java - 在 Spark 1.6.0 上,获取与 spark.driver.maxResultSize 相关的 org.apache.spark.SparkException

apache-spark - 无法将考拉系列指定为考拉中的新列

python - Dataframe 加入空安全条件使用

python - 如何在不更改 log4j.properties 的情况下关闭 PySpark 中日志的信息?