python - 在流式管道中使用 WriteToBigQuery FILE_LOADS 只会创建大量临时表(python SDK)

标签 python google-cloud-dataflow apache-beam

我有一个流式管道,它从发布/订阅中获取消息,解析它们,然后写入 BigQuery。挑战在于,每条消息都会根据消息中的 event 属性转到不同的事件表,并且它们没有排序。

这意味着(我相信)WriteToBigQuery 方法无法有效地批处理写入,我看到它基本上一次写入每条消息,因此运行速度太慢。我还尝试添加一个 60 秒的窗口并添加一个 GroupByKey/FlatMap 来尝试对它们重新排序,但在加快速度方面收效甚微。

WriteToBigQuery 中使用 FILE_LOADS 方法,触发频率超过 60 秒,它似乎可以工作,发送加载作业,然后(至少有时)成功,我查看数据进入正确的表格。但是,创建的临时表永远不会被删除,所以我创建了数百个表(名称如 beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_NAME_STEP_756_37417blahblahblah)...这显然是不可持续的。

通过STREAMING_INSERTS 写入可以正常工作,只是速度较慢,这是一种提高效率的尝试。

如果有人能帮我弄清楚为什么这些表没有被删除,我认为那会给我一个有效、高效的管道。我尝试了更长的触发频率(最多 1 小时),但发生了相同的行为。

这是我的主要管道 - 同样,我对其余部分没有任何问题,只是提供上下文。


    events, non_events = (p 
        | 'ReadData' >> beam.io.ReadFromPubSub(subscription = known_args.input_subscription).with_output_types(bytes)
        | 'decode' >> beam.Map(lambda x: x.decode('utf-8'))
        | 'Parse JSON to Dict' >> beam.Map(lambda line: json.loads(line))
        | 'FilterOutNonEvents' >> beam.ParDo(FilterOutNonEvents()).with_outputs('MAIN_OUT', 'non_events')
    )
    
    parsed, missing_tables, _ = (events
        | 'ParseDict' >> beam.ParDo(ParseDict()).with_outputs('MAIN_OUT', 'missing_tables', 'ignore')
    )
    
    results, conversion_errors = (parsed
        | 'ConvertDataTypes' >> beam.ParDo(ConvertDataTypes()).with_outputs('MAIN_OUT', 'error_data')
    )
    
    final = (results
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
                table = lambda record: '{project}:{dataset}.{table}'.format(project = known_args.project, dataset = known_args.dataset, table = parse_event_to_dataset_name(patterns, record["event"])),
                schema = lambda tbl: {'fields':[{'name':c.split(':')[0], 'type':c.split(':')[1]} for c in schema_json[tbl.split('.')[-1]].split(',')]},
                create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND,
                method = 'FILE_LOADS',
                triggering_frequency = 60
        )
    )

table arg 由消息的 event 属性确定,schema arg 只是全局变量的重新格式化切片(最初从 GCS 读取,同样,使用 streaming_inserts 没有问题)。

感谢任何可以提供帮助的人!我为此苦思冥想(我对光束/数据流还很陌生)。

最佳答案

将 LOAD_FILES 与多个分区和/或动态目标一起使用时,行为应为 follows :

'''
2. Multiple partitions and/or Dynamic Destinations:

    When there are multiple partitions of files destined for a single
    destination or when Dynamic Destinations are used, multiple load jobs
    need to be triggered for each partition/destination. Load Jobs are
    triggered to temporary tables, and those are later copied to the actual
    appropriate destination table. This ensures atomicity when only some
    of the load jobs would fail but not other. If any of them fails, then
    copy jobs are not triggered.
'''

code也出现在加载作业之后,beam 应该等待它们完成,然后从临时表中复制数据并删除它们;然而,似乎当与流式管道一起使用时,它并没有完成这些步骤。在我使用 DirectRunner 进行复制时,它甚至没有到达 CopyJob。我建议将其报告给 apache beam 团队 here .

尽管如此,对于您的用例,我会重新考虑使用加载作业方法,因为您可能会达到 load 的配额和 copy工作很快;和 streaming inserts可能更适合这种情况,并且可能提供比每 60 秒以上加载作业更好的插入性能

关于python - 在流式管道中使用 WriteToBigQuery FILE_LOADS 只会创建大量临时表(python SDK),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64526500/

相关文章:

python - 当传递命名参数时,matplotlib 不会绘图

python - 通过 Selenium 中的 xpath 变量查找并执行操作 - Python

google-cloud-platform - Cloud SQL 到 BigQuery 增量式

python - 从数据流运行 tensorflow 模型训练

python - OpenCV 多边形检测方法

python - numpy.linalg.inv 如何计算正交矩阵的逆?

google-cloud-dataflow - 读取大量文件时,如何提高 TextIO 或 AvroIO 的性能?

python - 在美国位置未找到数据集

google-analytics - 如何将Google Analytics(分析)数据导入Google Cloud Platform?

python - 将 pcollection 的每一行拆分为多个 pcollection?