我正在 Databricks 工作,我正在尝试从按序列排序的 S3 实例中获取 Parquet 数据,但序列的某些部分丢失了。因此文件系统可能如下所示:
's3a://databricks-data/STAGING/18',
's3a://databricks-data/STAGING/17',
's3a://databricks-data/STAGING/16',
's3a://databricks-data/STAGING/15',
's3a://databricks-data/STAGING/14',
's3a://databricks-data/STAGING/13',
's3a://databricks-data/STAGING/12',
's3a://databricks-data/STAGING/10',
's3a://databricks-data/STAGING/09',
's3a://databricks-data/STAGING/08',
's3a://databricks-data/STAGING/07'
您会注意到 11
丢失了,这就是问题所在。偶尔会有一些文件夹丢失,并且它不是系统性的或可预测的。因此,来自 Python 背景的我想我可以根据序列创建一个文件夹列表,然后测试这样的文件夹是否存在,如果存在,则读取数据,如果不存在,则跳过。像这样的事情:
paths = ["s3a://databricks-data/STAGING/" + str(ii) for ii in range(100)]
paths = [p for p in paths if p.exists()] #**this check -- "p.exists()" -- is what I'm looking for**
df = spark.read.parquet(*paths)
有谁知道如何检查 Databricks 中是否存在文件夹/目录?我尝试过使用 dbutils,但要么我正在使用不同的版本,要么我不知道我在做什么(也许两者都是),因为没有任何效果。我对 Databricks 还很陌生,所以请告诉我是否可以澄清这个问题。
最佳答案
您可以使用dbutils.fs.ls
来实现这样的功能:
def path_accessible(path):
try:
dbutils.fs.ls(path)
return True
except:
pass
return False
然后使用它:
paths = [p for p in paths if path_accessible(p)]
请注意,它会检查路径是否可访问 - 您可能由于凭据无效等而无法访问 - 您可以通过添加相应的 except
子句来改进它。
附注如果您这样做是为了加载数据增量,我建议考虑使用 Auto Loader 进行结构化流处理(即使在 Trigger.Once 模式下)。 - 如果目录中有很多文件,从性能角度来看可能会更好。
关于amazon-s3 - 检查 Databricks 笔记本中是否存在 S3 目录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66429888/