python - 如何使用 Apache Beam Python 将输出写入动态路径

标签 python google-cloud-platform google-cloud-storage apache-beam dataflow

我对 apache beam 很陌生。我的场景如下,

我有多个 json 格式的事件。在每个事件中,event_time 列指示事件的创建时间,我正在使用 event_time 计算其创建日期。 我想将这些事件单独写在它们的日期分区下。我的代码就像

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import TaggedOutput
import json
import time


class EventFormatter(beam.DoFn):

  def process(self, element, *args, **kwargs):
    tmp_dict = {}
    for i in range(len(element['properties'])):
        tmp_dict['messageid'] = element['messageid']
        tmp_dict['userid'] = element['userid']
        tmp_dict['event_time'] = element['event_time']
        tmp_dict['productid'] = element['properties'][i]['productid']

        yield tmp_dict


class DateParser(beam.DoFn):

    def process(self, element, *args, **kwargs):
        key = time.strftime('%Y-%m-%d', time.localtime(element.get('event_time')))
        print(key, element)
        yield TaggedOutput(time.strftime('%Y-%m-%d', time.localtime(element.get('event_time'))), element)


with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | 'Sample Events' >> beam.Create([
          {"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
          {"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
        ])
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )


  output = events | "Parse Date" >> WriteToText('/Users/oguz.aydin/Desktop/event_folder/date={}/'.format(....))

我无法找到如何完成格式 block 。当我运行代码来打印结果时,它给出

('2020-08-27', {'productid': 'product-173', 'userid': 'user-78', 'event_time': 1598516997, 'messageid': '6b1291ea-e50d-425b-9940-44c2aff089c1'})
('2020-08-25', {'productid': 'product-143', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
('2020-08-25', {'productid': 'product-144', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})

作为例子。我想在 date=2020-08-25 文件夹下写入 2 个事件,另外一个 date=2020-08-27 文件夹下。

在一天结束时,我想将每个事件写入其创建日期文件夹下。

我该怎么做?

感谢您的帮助,

奥古兹。

最佳答案

具体来说,要为每个键写入多个元素,您可以执行类似的操作

class WriteByKey(apache_beam.DoFn):
    def process(self, kvs):
         # All values with the same key will come in at once.
         key, values = kvs
         with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension', 'w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')

with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | ...
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )
  output = events | beam.GroupByKey() | beam.ParDo(WriteByKey())

请注意,运行程序可能需要在失败时重试元素,因此,与其直接写入输出,更安全的方法是写入临时文件,然后在成功时自动重命名,例如

class WriteByKey(apache_beam.DoFn):
    def process(self, kvs):
         # All values with the same key will come in at once.
         key, values = kvs
         nonce = random.randint(1, 1e9)
         path = f'gs://bucket/path/{key}.extension'
         temp_path = f'{path}-{nonce}'
         with beam.io.gcp.gcsio.GcsIO().open(temp_path, 'w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')
         beam.io.gcp.gcsio.GcsIO().rename(temp_path, path)

关于python - 如何使用 Apache Beam Python 将输出写入动态路径,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63615492/

相关文章:

python - dateutil 2.5.0 是最低要求的版本

python - 从谷歌云应用引擎中的私有(private) github repo pip 安装包

google-apps-script - 如何在谷歌电子表格中的多个(不同)电子表格之间共享相同的应用程序脚本

google-cloud-storage - 谷歌云存储 "Minimum storage duration"的含义

python - 带有 gspythonlibrary 的 Google 云存储

google-cloud-storage - 在 Google 存储桶中创建目录

python - 颜色栏中的偏移刻度标签

python - 输出金字塔的高度

python - 读/写 NetworkX 图形对象

firebase - 正确的 Firestore 带宽/网络/互联网导出计费因素