python - 在 Apache Beam 中导入 Google Firestore Python 客户端

标签 python google-cloud-firestore google-cloud-dataflow apache-beam

我正在尝试编写一个小型 DoFn 以将数据从数据流管道写入 Cloud Firestore。在本地,一切都按预期工作,但是当尝试在数据流上运行时,一切都会崩溃!

这是我的功能:

class FirestoreWriteDoFn(beam.DoFn):
  def __init__(self):
    super(FirestoreWriteDoFn, self).__init__()

  def start_bundle(self):
    import google.cloud 
    self.db = google.cloud.firestore.Client(project='ag-audience')

  def process(self, element):
    fb_data = {
      'topics': element.get('page_keywords').split(','),
      'title': element.get('page_title')
    }
    logging.info('Inserting into Firebase: %s', fb_data)
    fb_doc = self.db.document('totallyNotBigtable', element.get('key'))
    result = fb_doc.create(fb_data)
    yield result

这是部署它的命令:

$ python pipe/main.py \
  --runner=dataflow \
  --project=ag-audience \
  --region=europe-west1 \
  --machine_type=n1-standard-4 \
  --temp_location=gs://ag-dataflow/tmp \
  --requirements_file requirements.txt \
  --save_main_session \
  --streaming

这是我的要求.txt:

google-cloud-firestore>=1.3.0

我尝试过很多事情: - 在文件顶部全局导入 firestore 模块。 - 以不同方式导入:从 y 导入 x导入 y。 - 在代码的各个部分导入它。

错误总是因为某些内容未定义: NameError:未定义全局名称“google”[运行“generatedPtransform-480”时]

编辑:(添加管道代码)

with beam.Pipeline(argv=pipeline_args) as p:

    crawled_features = (p 
      | 'ReadPubsubCrawls' >> ReadFromPubSub(topic=PUBSUB_TOPIC_CRAWLED_FEATURES).with_output_types(bytes)
      | 'DebugLogInput' >> beam.ParDo(LogResults())
      | 'JSONParse2' >> beam.Map(lambda x: json.loads(x))
    )

    firebase_stream = (crawled_features
      | 'WriteFirebase' >> beam.ParDo(FirestoreWriteDoFn())
      | 'LogFirebaseWriteResult' >> beam.ParDo(LogResults())
    )

    bigquery_stream = (crawled_features 
      | 'ShapeRow' >> beam.Map(ShapeBQRow)
      | 'LogShapedBQRow' >> beam.ParDo(LogResults())
      | 'WriteBigQuery' >> beam.io.WriteToBigQuery(
        table=BIGQUERY_TABLE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )

最佳答案

问题出在 Beam 版本上。在 2.13.0 中可能存在一些错误,但在 2.12.0 中它可以正常工作,基于 Python package errors while running GCP Dataflow 。我也亲自验证过。

关于python - 在 Apache Beam 中导入 Google Firestore Python 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57286517/

相关文章:

java - 写入 Elasticsearch 时出错,无法从数据流插入某些元素

python - 在 Python 中实现/运行合并排序

python - 用烧杯缓存

python - 字符串由标点组成

firebase - 以编程方式创建和配置 Firebase 项目

python - 使用 Dataflow + Beam + Python 从 Google Cloud Storage 读取 Shapefile

python - 根据已知角值估计数字矩阵?

android - 如何将我的 Firebase 实时数据库传输到 Firebase Cloud Firestore

javascript - Firestore Cloud Functions 优先级

google-cloud-dataflow - 是否可以使用自定义包运行 Cloud Dataflow?