我正在尝试编写一个小型 DoFn 以将数据从数据流管道写入 Cloud Firestore。在本地,一切都按预期工作,但是当尝试在数据流上运行时,一切都会崩溃!
这是我的功能:
class FirestoreWriteDoFn(beam.DoFn):
def __init__(self):
super(FirestoreWriteDoFn, self).__init__()
def start_bundle(self):
import google.cloud
self.db = google.cloud.firestore.Client(project='ag-audience')
def process(self, element):
fb_data = {
'topics': element.get('page_keywords').split(','),
'title': element.get('page_title')
}
logging.info('Inserting into Firebase: %s', fb_data)
fb_doc = self.db.document('totallyNotBigtable', element.get('key'))
result = fb_doc.create(fb_data)
yield result
这是部署它的命令:
$ python pipe/main.py \
--runner=dataflow \
--project=ag-audience \
--region=europe-west1 \
--machine_type=n1-standard-4 \
--temp_location=gs://ag-dataflow/tmp \
--requirements_file requirements.txt \
--save_main_session \
--streaming
这是我的要求.txt:
google-cloud-firestore>=1.3.0
我尝试过很多事情:
- 在文件顶部全局导入 firestore 模块。
- 以不同方式导入:从 y 导入 x
、导入 y
。
- 在代码的各个部分导入它。
错误总是因为某些内容未定义:
NameError:未定义全局名称“google”[运行“generatedPtransform-480”时]
编辑:(添加管道代码)
with beam.Pipeline(argv=pipeline_args) as p:
crawled_features = (p
| 'ReadPubsubCrawls' >> ReadFromPubSub(topic=PUBSUB_TOPIC_CRAWLED_FEATURES).with_output_types(bytes)
| 'DebugLogInput' >> beam.ParDo(LogResults())
| 'JSONParse2' >> beam.Map(lambda x: json.loads(x))
)
firebase_stream = (crawled_features
| 'WriteFirebase' >> beam.ParDo(FirestoreWriteDoFn())
| 'LogFirebaseWriteResult' >> beam.ParDo(LogResults())
)
bigquery_stream = (crawled_features
| 'ShapeRow' >> beam.Map(ShapeBQRow)
| 'LogShapedBQRow' >> beam.ParDo(LogResults())
| 'WriteBigQuery' >> beam.io.WriteToBigQuery(
table=BIGQUERY_TABLE,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
最佳答案
问题出在 Beam 版本上。在 2.13.0 中可能存在一些错误,但在 2.12.0 中它可以正常工作,基于 Python package errors while running GCP Dataflow 。我也亲自验证过。
关于python - 在 Apache Beam 中导入 Google Firestore Python 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57286517/