google-cloud-platform - 动态创建异常聚合器

标签 google-cloud-platform google-cloud-dataflow

我们使用聚合器来记录处理过程中的异常情况

public class BigTableWriter extends DoFn<String, Void> {

    private Aggregator<Integer, Integer> errorAggregator;
    public BigTableWriter(CloudBigtableOptions options) {
       errorAggregator = createAggregator("errors",new Sum.SumIntegerFn());
    }

    @Override
    public void processElement(DoFn<String, Void>.ProcessContext c){
        try {
          ....do work here
        }
        catch(Exception ex){
           errorAggregator.addValue(1);
        }

    }

}

我们希望使其更加精细,而不是保留单个聚合器来收集错误。文档说聚合器通常是在构造函数中创建的。是否可以为 catch block 内的每种异常类型创建一个聚合器?例如,我们想做类似的事情。

public class BigTableWriter extends DoFn<String, Void> {

    private Map<String, Aggregator<Integer, Integer> aggregatorMap;
    public BigTableWriter(CloudBigtableOptions options) {         
       aggregatorMap = new HashMap<>();
    }

    @Override
    public void processElement(DoFn<String, Void>.ProcessContext c){
        try {
          ....do work here
        }
        catch(Exception ex){
          aggregateException(ex.getCause().getMessage());
        }
    }

    public void aggregateException(String exceptionMessage) {
       Aggregator<Integer, Integer> aggregator = null;
       if(!aggregatorMap.containsKey(exceptionMessage){
          aggregator = createAggregator(exceptionMessage,new Sum.SumIntegerFn());
       }
       else {
          aggregator = aggregatorMap.get(exceptionMessage);
       }

         aggregator.put(exceptionMessage, aggregator);
    }

}

最佳答案

不幸的是,没有。当前初始化聚合器的逻辑要求它们在图构造时已知(也称为在 DoFn 构造期间创建)。不过,这是一个很好的功能请求。创建了一个问题来跟踪它:https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/55

关于google-cloud-platform - 动态创建异常聚合器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32469446/

相关文章:

python - 在Spark Cluster模式下使用 Pandas 读取数据时出现异常行为

java - 数据流 : string to pubsub message

google-cloud-datastore - 如何使用事务性 DatastoreIO

google-cloud-dataflow - 在 Apache Beam 中使用 for 循环执行步骤顺序是否可以

go - 如何以程序方式上传图片?

google-app-engine - 使用 Go Runtime Google App Engine 的 Google Cloud Storage Client App 错误

google-cloud-platform - 谷歌双子座API错误: "DefaultCredentialsError: Your default credentials were not found."

python - 通过应用程序引擎的 FTP 使用 Python 允许连接但超时响应

google-cloud-dataflow - 在 Apache Beam 中添加 2 个 Dofn 之间的依赖关系

google-cloud-platform - 使用 Google DataFlow/Apache Beam 并行化图像处理或抓取任务是否有意义?