java - 数据流中的动态目标问题

标签 java google-cloud-storage google-cloud-dataflow apache-beam

我有一个数据流作业,它从 pubsub 读取数据,并根据时间和文件名将内容写入 GCS,其中文件夹路径基于 YYYY/MM/DD。这允许根据日期在文件夹中生成文件,并使用 apache beam 的 FileIODynamic Destinations

大约两周前,我注意到未确认消息异常堆积。重新启动 df 作业后,错误消失,新文件正在 GCS 中写入。

过了几天,写又停了,只是这次有报错说处理卡住了。经过一些可靠的 SO 研究,我发现这可能是 caused由于 Beam 2.90 之前的死锁问题,因为它使用 Conscrypt 库作为默认安全提供程序。所以,我从 Beam 2.8 升级到 Beam 2.11。

再一次,它起作用了,直到它不起作用。我更仔细地查看了错误并注意到它有一个 SimpleDateFormat 对象的问题,它不是线程安全的。所以,我转而使用 Java.time 和 DateTimeFormatter,它们是线程安全的。它一直有效,直到它没有。然而,这一次,错误略有不同,并没有指向我的代码中的任何内容: 下面提供了错误。

Processing stuck in step FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:469)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:76)
  at org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub.getStateData(MetricTrackingWindmillServerStub.java:202)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:409)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:311)
  at org.apache.beam.runners.dataflow.worker.WindmillStateReader$BagPagingIterable$1.computeNext(WindmillStateReader.java:700)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:145)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:140)
  at org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.hasNext(MultitransformedIterator.java:47)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:701)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)

此错误在作业部署后大约 5 小时开始发生,并且随着时间的推移发生率越来越高。写作在 24 小时内明显变慢。我有 60 名 worker ,我怀疑每次出现错误时都会有一名 worker 失败,这最终会导致工作中断。

在我的作者中,我分析了某些关键字的行(可能不是最好的方法)以确定它属于哪个文件夹。然后我继续使用确定的文件名将文件插入 GCS。这是我为作者使用的代码:

分区函数提供如下:

@SuppressWarnings("serial")
public static class datePartition implements SerializableFunction<String, String> {     
    private String filename;

    public datePartition(String filename) {
        this.filename = filename;
    }

    @Override
    public String apply(String input) {

        String folder_name = "NaN";             
        String date_dtf    = "NaN";     
        String date_literal = "NaN";
        try {
            Matcher foldernames = Pattern.compile("\"foldername\":\"(.*?)\"").matcher(input);
            if(foldernames.find()) {
                folder_name = foldernames.group(1);
            }
            else {
                Matcher folderid = Pattern.compile("\"folderid\":\"(.*?)\"").matcher(input);
                if(folderid.find()) {
                    folder_name = folderid.group(1);
                }   
            }

            Matcher date_long = Pattern.compile("\"timestamp\":\"(.*?)\"").matcher(input);
            if(date_long.find()) {
                date_literal = date_long.group(1);
                if(Utilities.isNumeric(date_literal)) {
                    LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.valueOf(date_literal)), ZoneId.systemDefault());
                    date_dtf = date.format(dtf);                        
                }
                else {
                    date_dtf = date_literal.split(":")[0].replace("-", "/").replace("T", "/");
                }
            }
            return folder_name + "/" + date_dtf + "h/" + filename;
        }

        catch(Exception e) {
            LOG.error("ERROR with either foldername or date");
            LOG.error("Line : " + input);
            LOG.error("folder : " + folder_name);
            LOG.error("Date : " + date_dtf);

            return folder_name + "/" + date_dtf + "h/" + filename;
        }           
    }
}

而实际部署和运行流水线的地方可以在下面找到:

public void streamData() {

    Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
            .apply(options.getWindowDuration() + " Window",
                        Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
                                  .triggering(AfterWatermark.pastEndOfWindow()) 
                                  .discardingFiredPanes()
                                  .withAllowedLateness(parseDuration("24h")))
                .apply(new GenericFunctions.extractMsg())
                .apply(FileIO.<String, String>writeDynamic()
                                 .by(new datePartition(options.getOutputFilenamePrefix()))
                                 .via(TextIO.sink())
                                 .withNumShards(options.getNumShards())
                                 .to(options.getOutputDirectory())
                                 .withNaming(type -> FileIO.Write.defaultNaming(type, ".txt"))
                                 .withDestinationCoder(StringUtf8Coder.of()));

    pipeline.run();
}

最佳答案

错误“Processing stuck ...”表示某些特定操作花费的时间超过 5m,而不是作业永久停滞。但是,由于步骤 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles 被卡住并且作业被取消/终止,所以我会在作业写入临时文件时考虑一个问题。

我找到了 BEAM-7689与用于写入临时文件的第二粒度时间戳 (yyyy-MM-dd_HH-mm-ss) 有关的问题。发生这种情况是因为多个并发作业可以共享同一个临时目录,这可能导致其中一个作业在其他作业完成之前将其删除。

根据之前的链接,要缓解此问题,请升级到 SDK 2.14。如果错误消失了,请告诉我们。

关于java - 数据流中的动态目标问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55748746/

相关文章:

java - 在 Netbeans 中构建项目时出错

java - 使用谷歌云存储的文档管理系统

javascript - 如何使用API​​从Google云平台读取文本文件

python - 使用 PYTHON 运行 Google 数据流模板

java - J2me RecordStore 错误 : java. lang.NullPointerException: 0 - 下面的代码有什么问题

java - Tomcat 7.0 + Chrome 不显示 JSON

java - 设计和对象责任的问题

php - Google Cloud Storage 一个文件同时访问多个

java - 写入 Elasticsearch 时出错,无法从数据流插入某些元素

google-cloud-dataflow - 为什么 Dataflow-BigTable 连接器不支持增量?