在我的项目中,我尝试向管道中处理的数据添加一些元数据。元数据位于 src 文件夹旁边名为 resources 的子文件夹中的 DBF 文件中。
src 文件夹包含主类,我有几个包(IO、处理、聚合、utils)。
我在定义管道的主类中读取并处理带有元数据的文件。我用来访问该文件的代码如下:
File temp1 = new File("resources/xxx.dbf");
我检查是否找到该文件:
LOG.info(temp1.exists())
运行良好。
我使用 PubSubIO 读取了一些以字符串形式传入的消息。我使用此文件的内容来填充包含键和值的映射。
Map<String, ArrayList<Double>> sensorToCoordinates = coordinateData.getSensorLocations();
然后,我在我制作的名为“SensorValues”的自定义类中设置一个静态变量:
SensorValue.setKeyToCoordinates(sensorToCoordinates);
当我使用 ParDo 函数(从 PCollection 到 PCollection)将传入消息从字符串解析为 SensorValue 类时,该映射将在 SensorValue 类的构造函数中使用。
使用 DirectPipelineRunner 运行此代码效果非常好。但是,当我使用 DataflowPipelineRunner 并尝试访问 SensorValue 构造函数中的映射时,我遇到了 NullPointerException。
现在我想知道为什么 setter 在使用 DataflowPipelineRunner 时不起作用(我猜这与在多个工作人员之间分配执行有关)以及最佳实践是什么,使用任何静态资源文件来丰富您的管道?
最佳答案
你说得对,问题是因为 ParDo
的执行被分发给多个工作线程。他们没有本地文件,也可能没有 map 的内容。
这里有几个选项:
将文件放入 GCS,让管道读取文件的内容(使用 TextIO 或类似的东西)并将其用作 side-input供您稍后处理。
将该文件包含在管道的资源中,并将其加载到需要它的
DoFn
的startBundle
中(将来会有方法使这种情况发生的频率低于每个 bundle )。您可以将映射的内容序列化为
DoFn
的参数,方法是将其作为非静态字段传递给该类的构造函数。
随着该文件大小的增加,选项 1 更好(因为它可以支持将其分割成多个部分并进行查找),而选项 2 可能会减少检索文件的网络流量。仅当文件非常小时,选项 3 才有效,因为它将显着增加序列化 DoFn
的大小,这可能导致作业太大而无法提交到 Dataflow 服务。
关于java - 在 Google Dataflow 中作为 DataflowPipelineRunner 运行时访问资源文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39649173/