python - Pyspark 读取 csv 并组合日期和时间列并基于它进行过滤

标签 python csv apache-spark filter pyspark

我有大约 10,000 个 csv 文件,每个文件包含 14 列。它们包含有关金融组织、交易值(value)、日期和时间的数据。

一些 csv 文件只是标题,其中没有数据。我设法加载本地 hadoop 文件系统上的所有 csv 文件。我想要实现的是过滤数据以包含仅发生在上午 9 点到下午 6 点之间的记录。

我该如何实现这一目标?我对 lambda 和过滤器感到非常困惑,所有这些东西都存在于 Spark-Python 中。

您能告诉我如何过滤此数据并使用过滤后的数据进行其他分析吗?

P.S,还需要考虑冬季时间和夏季时间,我想我应该有一些功能来将时间更改为 UTC 格式?

由于我关心的是根据 csv 文件中的“时间”列过滤数据,因此我简化了 csv。让我们说:

CSV 1:(过滤器.csv)

  • ISIN、货币、日期、时间
  • "1","欧元",2018-05-08,07:00
  • "2","欧元",2018-05-08,17:00
  • "3","欧元",2018-05-08,06:59
  • "4","欧元",2018-05-08,17:01

CSV 2:(NoFilter.csv)

  • ISIN、货币、日期、时间
  • "1","欧元",2018-05-08,07:01
  • "2","欧元",2018-05-08,16:59
  • "3","欧元",2018-05-08,10:59
  • "4","欧元",2018-05-08,15:01

我的代码是:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

sqlc = SQLContext(sc)

ehsanLocationFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/Filter.csv'
ehsanLocationNonFiltered = 'hdfs://localhost:54310/user/oxclo/ehsanDbs/NoFilter.csv'

df = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationNonFiltered)

dfFilter = sqlContext.read.format('com.databricks.spark.csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered)

data = df.rdd
dataFilter = dfFilter.rdd

data.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')
dataFilter.filter(lambda row: row.Time > '07:00' and row.Time < '17:00')

print data.count()
print dataFilter.count()

我期望看到 data.count 返回 4,因为所有时间都符合范围,而 dataFilter.count 返回 0,因为没有匹配时间。

谢谢!

最佳答案

在您的代码中,您只能使用'csv'作为格式

from pyspark import SparkContext, SparkConf
ehsanLocationFiltered = '/FileStore/tables/stackoverflow.csv'
df = sqlContext.read.format('csv')\
.options(header='true', inferschema='true')\
.load(ehsanLocationFiltered).rdd
result=data.map(lambda row: row.Time > '07:00' and row.Time < '17:00')
result.count()

关于python - Pyspark 读取 csv 并组合日期和时间列并基于它进行过滤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52111009/

相关文章:

python - 将屏幕与 kivy 中的 GridLayout 类相关联

Python 闭包函数丢失外部变量访问

python - SPSS/Python - 访问变量标签

csv - AWK 拆分带有标题的大型 CSV 文件并根据列值打印输出文件

scala - 如何在 Apache Spark 中编码分类特征

python - 如何将函数(BigramCollocationFinder)应用于 Pandas DataFrame

regex - 使用 Notepad++ 正则表达式格式化电话号码

r - R 中 read.csv() 和 read.csv2() 之间的区别

java - 从数据框中的列中删除特殊字符

java - 调整 Yarn 中的 Spark 作业