google-cloud-platform - Google Dataflow 中的单例

标签 google-cloud-platform google-cloud-dataflow apache-beam

我有一个从 PubSub 读取消息的数据流。我需要使用几个 API 来丰富此消息。我想要这个 API 的单个实例用于处理所有记录。这是为了避免为每个请求初始化 API。

我尝试创建静态变量,但仍然看到 API 被初始化了很多次。

如何避免在 Google Dataflow 中多次初始化变量?

最佳答案

Dataflow 使用多台机器并行进行数据分析,因此您的 API 必须在每台机器上至少初始化一次。

事实上,Dataflow 对这些机器的生命周期没有强有力的保证,因此它们的来去可能相对频繁。

让您的作业访问外部服务并避免过多初始化 API 的一个简单方法是在您的 DoFn 中对其进行初始化:

class APICallingDoFn extends DoFn {
    private ExternalServiceHandle handle = null;

    @Setup
    public void initializeExternalAPI() {
      // ...
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // ... process each element -- setup will have been called
    }
}

您需要这样做,因为 Beam 和 Dataflow 都保证 DoFn 实例或工作线程的持续时间。

希望这有帮助。

关于google-cloud-platform - Google Dataflow 中的单例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44646447/

相关文章:

java - Java SDK 2.11.0 上的数据流作业无法扩展

node.js - 如何使用服务 worker 缓存handlebars.js

python - 使用最少的依赖关系/权限将Docker容器镜像镜像到Google Container Registry

google-cloud-dataflow - 如何在IDE上本地开发Beam Pipeline并在Dataflow上运行?

python - 如何从 PCollection Apache Beam Python 创建 N 个元素组

google-cloud-platform - 错误: 10: Developer console is not set up correctly (Not Using Firebase) (One Tap sign-up)

google-cloud-dataflow - '_UnwindowedValues' 类型的对象没有 len() 是什么意思?

python - 如何使用 'add_value_provider_argument'初始化运行时参数?

google-cloud-functions - 如何从 GCP 中的 Cloud Functions 调用用 Python(3.x) 编写的 Dataflow 作业

java - 如何在 Apache Beam 中使用流输入 PCollection 请求 Redis 服务器?