python - Dataflow Streaming 使用 Python SDK : Transform for PubSub Messages to BigQuery Output

标签 python google-bigquery google-cloud-dataflow apache-beam dataflow

我正在尝试使用数据流读取 pubsub 消息并将其写入大查询。 Google 团队向我授予了 alpha 访问权限,并使提供的示例正常工作,但现在我需要将其应用到我的场景中。

发布订阅负载:

Message {
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
    attributes: {}
}

大查询架构:

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP',

我的目标是简单地读取消息负载并插入到 bigquery 中。我正在努力了解转换以及如何将键/值映射到大查询模式。

我对此很陌生,因此不胜感激。

当前代码:https://codeshare.io/ayqX8w

谢谢!

最佳答案

我能够通过定义将其加载到 json 对象中的函数成功解析 pubsub 字符串(请参阅 parse_pubsub())。我遇到的一个奇怪的问题是我无法在全局范围内导入 json。我收到“NameError:全局名称‘json’未定义”错误。我不得不在函数中导入 json。

请参阅下面我的工作代码:

from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window

'''Normalize pubsub string to json object'''
# Lines look like this:
  # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['mac']), (record['status']), (record['datetime'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  parser.add_argument(
      '--output_table', required=True,
      help=
      ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:
    # Read the pubsub topic into a PCollection.
    lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
                | beam.Map(parse_pubsub)
                | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
                | beam.io.WriteToBigQuery(
                    known_args.output_table,
                    schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

关于python - Dataflow Streaming 使用 Python SDK : Transform for PubSub Messages to BigQuery Output,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46854167/

相关文章:

python - Stackplot 使用字典值列表 (Python 3.x)

google-bigquery - Google BigQuery/Amazon Redshift 使用基于列的关系数据库还是 NoSQL 数据库?

java - 如何控制有界源 split ?

google-cloud-platform - 如何使用 API 在 GCP 数据流中检索当前工作人员计数

java - BigTable 的 InstanceID 和 TableID 管道未接受 ValueProvider<String>

python - 识别串行/USB 设备 python

python - JSON 响应格式奇怪

python - OS X 蓝牙编程

json - BigQuery 在导入 JSON 时处理缺失字段和未知/额外字段

google-bigquery - 什么是大查询最大行大小?