java - 来自 Beam PTransform 的 IllegalMutationException

标签 java google-cloud-dataflow apache-beam

这是我编写的 Apache Beam PTransform:

public class NormalizeTransform
  extends PTransform<PCollection<String>, PCollection<SimpleTable>> {

@Override
public PCollection<SimpleTable> expand(PCollection<String> lines) {
  ExtractFields extract_i = new ExtractFields();
  PCollection<SimpleTable> table = lines
    .apply("Extracting data model fields from lines",
           ParDo.of(extract_i));
}                                                   

public class ExtractFields extends DoFn<String, SimpleTable> {

@ProcessElement
public void processElement(ProcessContext c){
  try {
    String line = c.element();              
    // fill table
    for (Table_Struct st: this.struct){
      String o = line.substring(st.pos_1, st.pos_2));
      this.table.getClass().getField(st.Field_Name).set(
        this.table, o);                                                                     
    }
    c.output(this.table);
  }
}

并且偶尔会出现以下错误IllegalMutationException,这意味着我重复运行代码,有时有效,有时无效。

org.apache.beam.sdk.util.IllegalMutationException: PTransform Transform/Extracting data model fields from lines/ParMultiDo(ExtractFields) mutated value  after it was output (new value was ). Values must not be mutated in any way after being output.

at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:135)
at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:214)
at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$TimerIterableCompletionCallback.handleResult(ExecutorServiceParallelExecutor.java:268)
at org.apache.beam.runners.direct.TransformExecutor.finishBundle(TransformExecutor.java:168)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

我认为我没有专门更改代码中任何位置的值的任何输出。 MutationDetectors 将比较两个值:previousValue 和 newValue。就我而言, previousValue 通常是一个输入值,而 newValue 是另一个输入值。为什么 Transform 会尝试使用一个输入值来修改另一个输入值?

最佳答案

我不确定 this.table 来自哪里。

但为了帮助您理解错误消息,请记住,可能会在多个输入上调用 processElement。第一次调用将输出 this.table。下一次调用将在输出之前改变 this.table

如果此突变发生在第一次调用输出 this.table 之后且下游代码有机会读取 this.table 之前,您将得到不正确的结果。因此,此错误表明您在输出引用后更改了 this.table 的内容 - 这是您不应该做的事情。

考虑 (1) 输出 this.table 的副本或 (2) 将表创建为本地字段。例如:

@ProcessElement
public void processElement(ProcessContext c){
  try {
    String line = c.element(); 
    Table table = /* create the table */;             
    // fill table
    for (Table_Struct st: this.struct){
      String o = line.substring(st.pos_1, st.pos_2));
      this.table.getClass().getField(st.Field_Name)
        .set(table, o);                                                                     
    }
    c.output(table);
  }
}

另请注意,在每个 processElement 中执行反射可能会比预期慢。如果可以直接修改字段,可能会更好。

关于java - 来自 Beam PTransform 的 IllegalMutationException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44378033/

相关文章:

java - 以编程方式更改 Eclipse 项目设置

java - 使用 SSL 加密和 NTLM 身份验证的 HttpClient 失败

java - 如何创建一个通用的 HashMap 来插入集合和对象?

java - Bndtools 与 lombok "contains an unknown container"

google-cloud-platform - 如何在使用 Google Cloud Dataflow 清除 Cloud Memorystore 中的缓存后插入数据?

java - 使用向后兼容的编码器更改编码器以进行转换

python - 在 Google Cloud Dataflow 上安装 pandas 0.20.3 需要很长时间

python - Beam Python SDK : pd. 合并左连接错误(valueError:尝试对不可为空的字段编码 null)

python - Apache Beam 与 Python : How to compute the minimum in a session window, 并将其应用于所有相关的 PCollection

go - 当前用于 Google Dataflow 的 GoLang SDK 是否支持自动缩放和并行处理?