我们使用聚合器来记录处理过程中的异常情况
public class BigTableWriter extends DoFn<String, Void> {
private Aggregator<Integer, Integer> errorAggregator;
public BigTableWriter(CloudBigtableOptions options) {
errorAggregator = createAggregator("errors",new Sum.SumIntegerFn());
}
@Override
public void processElement(DoFn<String, Void>.ProcessContext c){
try {
....do work here
}
catch(Exception ex){
errorAggregator.addValue(1);
}
}
}
我们希望使其更加精细,而不是保留单个聚合器来收集错误。文档说聚合器通常是在构造函数中创建的。是否可以为 catch block 内的每种异常类型创建一个聚合器?例如,我们想做类似的事情。
public class BigTableWriter extends DoFn<String, Void> {
private Map<String, Aggregator<Integer, Integer> aggregatorMap;
public BigTableWriter(CloudBigtableOptions options) {
aggregatorMap = new HashMap<>();
}
@Override
public void processElement(DoFn<String, Void>.ProcessContext c){
try {
....do work here
}
catch(Exception ex){
aggregateException(ex.getCause().getMessage());
}
}
public void aggregateException(String exceptionMessage) {
Aggregator<Integer, Integer> aggregator = null;
if(!aggregatorMap.containsKey(exceptionMessage){
aggregator = createAggregator(exceptionMessage,new Sum.SumIntegerFn());
}
else {
aggregator = aggregatorMap.get(exceptionMessage);
}
aggregator.put(exceptionMessage, aggregator);
}
}
最佳答案
不幸的是,没有。当前初始化聚合器的逻辑要求它们在图构造时已知(也称为在 DoFn 构造期间创建)。不过,这是一个很好的功能请求。创建了一个问题来跟踪它:https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/55
关于google-cloud-platform - 动态创建异常聚合器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32469446/