java - Apache Beam 获取源文件名

标签 java google-cloud-dataflow apache-beam

编辑:已解决!

我有多个来自多种语言的文本文件。我想使用 Apache Beam 为每一行添加一个语言标记。

示例:

文件 text_en: 这是一个句子。

文件 text_de:Dies ist ein Satz.

我想要的是:

en: This is a sentence.
de: Dies ist ein Satz. 

What I've tried:

I initially tried to just use one TextIO.Read.From(dataSetDirectory+"/*") and look for an option that looks something like .getSource(). However, this doesn't seem to exist.

Next I tried to read every File one by one like this:

File[] files = new File(datasetDirectory).listFiles();
PCollectionList<String> dataSet=null;
for (File f: files) {
   String language = f.getName();
   logger.debug(language);
   PCollection<String> newPCollection = p.apply(
            TextIO.Read.from(f.getAbsolutePath()))
               .apply(ParDo.of(new LanguageTagAdder(language)));

   if (dataSet==null) {
       dataSet=PCollectionList.of(newPCollection);
   } else {
       dataSet.and(newPCollection);
   }
}
PCollection<String> completeDataset= dataSet.apply(Flatten.<String>pCollections())

以这种方式读取文件非常好,但是我的 DoFn LanguageTagAdder 仅使用第一种语言进行了初始化,因此所有文件都具有相同的添加语言。

LanguageTagAdder 看起来像这样:

public class LanguageTagAdder
            extends DoFn<String,String> {

        private String language;
        public LanguageTagAdder(String language) {
            this.language=language;
        }
        @ProcessElement
        public void processElement(ProcessContext c) {
            c.output(language+c.element());
        }
    }

我意识到这种行为是有意且需要的,以便可以并行化数据,但我将如何解决我的问题?有没有 Beam 方法来解决它?

PS:我在第二次创建 new LanguageTagAdder 时(使用第二种语言)收到以下警告:

DEBUG   2016-12-05 17:09:55,070 [main] de.kdld16.hpi.FusionDataset  - en
DEBUG   2016-12-05 17:09:56,216 [main] de.kdld16.hpi.FusionDataset  - de
WARN    2016-12-05 17:09:56,219 [main] org.apache.beam.sdk.Pipeline  - Transform TextIO.Read2 does not have a stable unique name. This will prevent updating of pipelines.

编辑: 问题是线路

dataSet.and(newPCollection);

需要重写为:

dataSet=dataSet.and(newPCollection);

原来,数据集只包含第一个文件....难怪他们都有相同的语言标签!

最佳答案

问题是线

dataSet.and(newPCollection);

需要重写为:

dataSet=dataSet.and(newPCollection);

原来,数据集只包含第一个文件。

关于java - Apache Beam 获取源文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41029628/

相关文章:

java - Spring 4+ hibernate 4 : Could not obtain transaction-synchronized Session for current thread

google-cloud-dataflow - 如何集成测试写入 Bigtable 的 Dataflow 管道?

python - 数据流管道中的外部 Python 依赖项

java - 从文件构建数组中的 NullPointerException

java - 用于自定义库的 JNA

java - JSON 加载/获取正确但无法显示

java - 用于最终确定 CheckpointMark 的 Beam 模型合约

java - 使用 CombineFn 从所有节点累积数据后,合并每个键的所有值

python - 修改单个 BigQuery 列并写入新表

python - Apache Beam + 大查询表读取