google-cloud-platform - 使用本地直接运行程序时,Apache Beam 返回 "Input values must not be mutated in any way."

标签 google-cloud-platform google-cloud-dataflow apache-beam

我写了一个 Apache Beam DoFn

static class FillLocation extends DoFn<TrackingRequest, TrackingRequest> {
        @ProcessElement
        public void processElement(ProcessContext c) {    
            TrackingRequest rq = c.element();
            rq.location = getLocationFromIP(rq.IP);         
            c.output(rq);
        }
}

在本地测试时它给了我这个错误 PTransform ..非法变异值..类......

 Input values must not be mutated in any way.
    at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.verifyUnmodified(ImmutabilityEnforcementFactory.java:96)
    at org.apache.beam.runners.direct.ImmutabilityEnforcementFactory$ImmutabilityCheckingEnforcement.afterElement(ImmutabilityEnforcementFactory.java:71)
    at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:149)
    at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

最佳答案

您的函数修改了输入 TrackingRequest 元素的位置字段。数据流不允许这样做。

doc说:

The current element of the input PCollection is returned by c.element(). It should be considered immutable. The Dataflow runtime will not mutate the element, so it is safe to cache, etc. The element should not be mutated by any of the DoFn methods, because it may be cached elsewhere, retained by the Dataflow runtime, or used in other unspecified ways.

您可以创建输入元素的副本,修改字段,然后将副本作为输出发送出去。

关于google-cloud-platform - 使用本地直接运行程序时,Apache Beam 返回 "Input values must not be mutated in any way.",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43142900/

相关文章:

python - 带有 DirectRunner (SUBPROCESS_SDK) 的 Apache Beam 只使用一个工作人员,我如何强制它使用所有可用的工作人员?

java - 从 PCollection<TableRow> 获取单个字段

java - 在 Apache Beam 上确认 Google Pub/Sub 消息

google-cloud-platform - 不同项目内的 Cloud SQL 连接

javascript - 基于属性值的 Firebase Firestore 安全规则

tensorflow - 预测失败 : contents must be scalar

google-cloud-platform - 关于签名 URL 和对象上传的说明

error-handling - 如何接收管道数据流作业失败的根本原因

java - DirectRunner 不按照我在 Beam Java SDK 中使用 FixWindows 指定的方式从 Pub/Sub 读取数据

python - Dataflow 模板是否支持 BigQuery 接收器选项的模板输入?