pyspark - 如何使用 Databrick 截断和/或使用通配符

标签 pyspark pyspark-sql databricks azure-databricks

我正在尝试在数据 block 中编写一个脚本,该脚本将根据文件名中的某些字符或仅根据文件中的日期戳来选择文件。

例如,以下文件如下所示:

LCMS_MRD_Delta_LoyaltyAccount_1992_2018-12-22 06-07-31

我在 Databricks 中创建了以下代码:

import datetime
now1 = datetime.datetime.now()
now = now1.strftime("%Y-%m-%d")

使用上面的代码,我尝试使用以下方法选择文件:

LCMS_MRD_Delta_LoyaltyAccount_1992_%s.csv'% now

但是,如果你仔细观察,你会发现 datestamp 和时间戳之间有一个空格,即 22 和 06 之间

LCMS_MRD_Delta_LoyaltyAccount_1992_2018-12-22 06-07-31

这是因为如果这个空间阻止我上面的代码工作。

我认为 Databricks 不支持通配符,因此以下内容不起作用:

LCMS_MRD_Delta_LoyaltyAccount_1992_%s.csv'% now

有人曾经建议截断时间戳。

有人可以让我知道如果:

A.TRUNCATING 会解决这个问题 B.我的代码 LCMS_MRD_Delta_LoyaltyAccount_1992_%s.csv'% now 有办法吗

要选择整个文件?请记住,我绝对需要根据当前日期进行选择。我只想能够使用我的代码来选择文件。

最佳答案

您可以使用 dbutils 读取文件名,并可以检查 if 语句中的模式是否匹配:if now in filname。因此,您无需直接读取具有特定模式的文件,而是获取文件列表,然后复制与所需模式匹配的具体文件。

以下代码适用于 databricks python 笔记本:

1。将三个文件写入文件系统:

data = """
{"a":1, "b":2, "c":3}
{"a":{, b:3} 
{"a":5, "b":6, "c":7}

"""

dbutils.fs.put("/mnt/adls2/demo/files/file1-2018-12-22 06-07-31.json", data, True)
dbutils.fs.put("/mnt/adls2/demo/files/file2-2018-02-03 06-07-31.json", data, True)
dbutils.fs.put("/mnt/adls2/demo/files/file3-2019-01-03 06-07-31.json", data, True)

2。以列表形式读取文件名:

files = dbutils.fs.ls("/mnt/adls2/demo/files/")

3。获取实际日期:

import datetime

now = datetime.datetime.now().strftime("%Y-%m-%d")
print(now)

输出:2019-01-03

4。复制实际文件:

for i in range (0, len(files)):
  file = files[i].name
  if now in file:  
    dbutils.fs.cp(files[i].path,'/mnt/adls2/demo/target/' + file)
    print ('copied     ' + file)
  else:
    print ('not copied ' + file)

输出:

未复制文件 1-2018-12-22 06-07-31.json

未复制文件 2-2018-02-03 06-07-31.json

复制文件3-2019-01-03 06-07-31.json

关于pyspark - 如何使用 Databrick 截断和/或使用通配符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54007074/

相关文章:

python - 在基于 Spark Dataframe 的 API 中过滤多列

python - 使用map或split(pyspark)后如何查看RDD的内容?

apache-spark - 创建DataFrame时Spark报错

azure - 在 azure devops 管道中找不到 Databricks 命令

sql - 将数据(直接查询模式)从 Databricks SQL 查询引入 Power BI

派斯帕克 : How to split pipe-separated column into multiple rows?

python - 如何根据 PySpark 中的条件修改行子集

excel - inferSchema using spark.read.format ("com.crealytics.spark.excel")正在为日期类型列推断 double

python - 与Pyspark合并

databricks - 以编程方式将库导入到 Databricks 中的工作区