我想在来自 Google Cloud Storage 的多个输入上运行 Dataflow 作业,但我想传递给作业的路径不能仅使用 *
glob 运算符指定。
考虑这些路径:
gs://bucket/some/path/20160208/input1
gs://bucket/some/path/20160208/input2
gs://bucket/some/path/20160209/input1
gs://bucket/some/path/20160209/input2
gs://bucket/some/path/20160210/input1
gs://bucket/some/path/20160210/input2
gs://bucket/some/path/20160211/input1
gs://bucket/some/path/20160211/input2
gs://bucket/some/path/20160212/input1
gs://bucket/some/path/20160212/input2
我希望我的工作处理 20160209
、20160210
和 20160211
目录中的文件,但不处理 20160208
(第一个)和 20160212
(最后一个)。实际上还有更多的日期,我希望能够为我的工作指定一个任意范围的日期。
Standard Java Filesystem glob patterns ("*", "?", "[..]") are supported.
但我无法让它工作。有一个链接到 Java Filesystem glob patterns ,它又链接到 getPathMatcher(String) ,它列出了所有通配符选项。其中之一是 {a,b,c}
,它看起来完全符合我的需要,但是,如果我通过 gs://bucket/some/path/201602{09,10 ,11}/*
到 TextIO.Read#from
我得到“无法扩展文件模式”。
也许文档意味着仅 *
、?
和[…]
受支持,如果那在这种情况下,我如何构建一个 Dataflow 将接受并且可以匹配任意日期范围(如我上面描述的日期范围)的 glob?
更新:我发现我可以编写一大块代码,这样我就可以将路径前缀作为逗号分隔列表传递,从每个和使用 这导致了很多其他问题,我会尝试让它工作,但感觉就像死胡同因为最初的重写。Flatten
转换,但这似乎是一种非常低效的方法。看起来第一步读取所有输入文件并立即将它们再次写出到 GCS 上的临时位置。只有在读取和写入所有输入后,实际处理才开始。这一步在我正在写的工作中是完全没有必要的。我希望作业读取第一个文件,开始处理它并读取下一个文件,依此类推。
最佳答案
确实,文档确实意味着仅支持 *
、?
和 [...]
。这意味着按字母顺序或数字顺序排列的任意子集或范围不能表示为单个 glob。
以下是一些可能适合您的方法:
- 如果文件路径中表示的日期也存在于文件中的记录中,那么最简单的解决方案是读取它们并使用
Filter
转换来选择您感兴趣的日期范围在. - 您在单独的
TextIO.Read
中尝试过多次读取的方法,转换并展平它们对于小型文件集是可行的;我们的tf-idf example做这个。您可以用少量的 glob 表达任意数值范围,因此这不需要每个文件读取一次(例如,两个字符范围“23 到 67”是2[3-]
加上[3-5][0-9]
加上6[0-7]
) - 如果文件子集更加随意,那么 globs/文件名的数量可能会超过最大图形大小,最后的建议是将文件列表放入
PCollection
并使用ParDo
转换为读取每个文件并发出其内容。
希望对您有所帮助!
关于java - 如何为 Dataflow 作业指定多个输入路径,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35740610/