google-cloud-dataflow - 安排 Google Cloud Dataflow 作业的最简单方法

标签 google-cloud-dataflow

我只需要每天运行一个数据流管道,但在我看来,像 App Engine Cron Service 这样需要构建整个网络应用程序的建议解决方案似乎有点太多了。
我正在考虑从 Compute Engine Linux VM 中的 cron 作业运行管道,但也许这太简单了:)。这样做有什么问题,为什么没有人(我猜除了我之外)建议这样做?

最佳答案

这就是我使用 Cloud Functions、PubSub 和 Cloud Scheduler 做到的
(这假设您已经创建了一个 Dataflow 模板并且它存在于您的 GCS 存储桶中的某处)

  • 在 PubSub 中创建一个新主题。这将用于触发云函数
  • 创建从模板启动 Dataflow 作业的 Cloud Functions。我发现从 CF 控制台创建它是最简单的。确保您选择的服务帐号有权创建数据流作业。函数的 index.js 看起来像:
  • const google = require('googleapis');
    
    exports.triggerTemplate = (event, context) => {
      // in this case the PubSub message payload and attributes are not used
      // but can be used to pass parameters needed by the Dataflow template
      const pubsubMessage = event.data;
      console.log(Buffer.from(pubsubMessage, 'base64').toString());
      console.log(event.attributes);
    
      google.google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        console.error('Error occurred: ' + err.toString());
        throw new Error(err);
      }
    
      const dataflow = google.google.dataflow({ version: 'v1b3', auth: authClient });
    
      dataflow.projects.templates.create({
            projectId: projectId,
            resource: {
              parameters: {},
              jobName: 'SOME-DATAFLOW-JOB-NAME',
              gcsPath: 'gs://PATH-TO-YOUR-TEMPLATE'
            }
          }, function(err, response) {
            if (err) {
              console.error("Problem running dataflow template, error was: ", err);
            }
            console.log("Dataflow template response: ", response);
          });
      });
    };
    

    package.json 看起来像
    {
      "name": "pubsub-trigger-template",
      "version": "0.0.1",
      "dependencies": {
        "googleapis": "37.1.0",
        "@google-cloud/pubsub": "^0.18.0"
      }
    }
    
  • 转到 PubSub 和您创建的主题,手动发布消息。这应该会触发 Cloud Function 并启动 Dataflow 作业
  • 使用 Cloud Scheduler 按计划发布 PubSub 消息
    https://cloud.google.com/scheduler/docs/tut-pub-sub
  • 关于google-cloud-dataflow - 安排 Google Cloud Dataflow 作业的最简单方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43816707/

    相关文章:

    python - 修改 MinimalWordCount 示例以从 BigQuery 读取

    google-cloud-dataflow - 我可以修改 apache 光束变换中的元素吗?

    python - Apache Beam - 澄清 Python SDK 上输出类型提示的预期行为

    java - Uberjar仅具有依赖项

    python - 如何使用 Dataflow 在 apache beam 中跳过 io 级别的错误元素?

    google-cloud-platform - 允许用户对 gcp 数据流项目进行写访问

    python - 避免在 Beam Python SDK 中重新计算所有云存储文件的大小

    google-cloud-platform - 如何从 com.google.api.client.googleapis.json.GoogleJsonResponseException : 410 Gone 上的 Cloud Dataflow 作业中恢复失败

    java - BigQuery,表行 : access repeated records with java api

    python-3.x - Python 3.7 和 Dataflow - SSL 证书问题