java - 在 Google Dataflow 中作为 DataflowPipelineRunner 运行时访问资源文件

标签 java google-cloud-dataflow

在我的项目中,我尝试向管道中处理的数据添加一些元数据。元数据位于 src 文件夹旁边名为 resources 的子文件夹中的 DBF 文件中。

src 文件夹包含主类,我有几个包(IO、处理、聚合、utils)。

我在定义管道的主类中读取并处理带有元数据的文件。我用来访问该文件的代码如下:

File temp1 = new File("resources/xxx.dbf");

我检查是否找到该文件:

LOG.info(temp1.exists())

运行良好。

我使用 PubSubIO 读取了一些以字符串形式传入的消息。我使用此文件的内容来填充包含键和值的映射。

Map<String, ArrayList<Double>> sensorToCoordinates = coordinateData.getSensorLocations();

然后,我在我制作的名为“SensorValues”的自定义类中设置一个静态变量:

SensorValue.setKeyToCoordinates(sensorToCoordinates);

当我使用 ParDo 函数(从 PCollection 到 PCollection)将传入消息从字符串解析为 SensorValue 类时,该映射将在 SensorValue 类的构造函数中使用。

使用 DirectPipelineRunner 运行此代码效果非常好。但是,当我使用 DataflowPipelineRunner 并尝试访问 SensorValue 构造函数中的映射时,我遇到了 NullPointerException。

现在我想知道为什么 setter 在使用 DataflowPipelineRunner 时不起作用(我猜这与在多个工作人员之间分配执行有关)以及最佳实践是什么,使用任何静态资源文件来丰富您的管道?

最佳答案

你说得对,问题是因为 ParDo 的执行被分发给多个工作线程。他们没有本地文件,也可能没有 map 的内容。

这里有几个选项:

  1. 将文件放入 GCS,让管道读取文件的内容(使用 TextIO 或类似的东西)并将其用作 side-input供您稍后处理。

  2. 将该文件包含在管道的资源中,并将其加载到需要它的 DoFnstartBundle 中(将来会有方法使这种情况发生的频率低于每个 bundle )。

  3. 您可以将映射的内容序列化为 DoFn 的参数,方法是将其作为非静态字段传递给该类的构造函数。

随着该文件大小的增加,选项 1 更好(因为它可以支持将其分割成多个部分并进行查找),而选项 2 可能会减少检索文件的网络流量。仅当文件非常小时,选项 3 才有效,因为它将显着增加序列化 DoFn 的大小,这可能导致作业太大而无法提交到 Dataflow 服务。

关于java - 在 Google Dataflow 中作为 DataflowPipelineRunner 运行时访问资源文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39649173/

相关文章:

java - Android - 从 ArrayList 中删除项目

java - WebLogic 8.1 支持 Sun JDK 1.5 吗?

java - 确定哪个 CSS 框架与 Selenium 一起使用

google-cloud-dataflow - 无法从工厂方法 DataflowRunner#fromOptions in beamSql 中构造实例,apache beam

java - apache.beam.sdk.schemas.Schema.FieldType 中数值的等效数据类型是什么

google-cloud-platform - 谷歌数据流 "Workflow failed"无缘无故

airflow - 谷歌数据流 : Import custom Python module

java - 了解 JPA 延迟加载

Java SE 6u28 无法在文件中下载

python - 在流式管道中使用 WriteToBigQuery FILE_LOADS 只会创建大量临时表(python SDK)