python - Apache Beam 全局计数器

标签 python google-cloud-dataflow apache-beam

我有一个包含数百万条记录的文件,其中一些记录是坏记录(在 ParDo 中处理记录时就会知道)。我想将不良记录及其在文件中出现的行号写入单独的 PCollection,将良好记录写入单独的 PCollection。

有没有一种方法可以维护迄今为止在工作进程中读取的行的全局计数器,以便我可以使用它来写出行号?

最佳答案

您可以使用 Apache Beam 指标来保存全局监控计数器,您可以从您的计算机或运行程序的 UI 中查询该计数器。

如果您想保留所有不良记录的精确集合以及有关它们的信息(例如行号),那么您需要添加一个转换来实现这一点。像这样的事情:

original_records = p | LoadRecords()

class SplitRecords(beam.DoFn):
  BAD_RECORD_TAG = 'BadRecord'

  def process(self, record):
    if self.is_bad(record):
      # Output the record onto the 'special' BadRecord input.
      yield beam.pvalue.TaggedOutput(self.BAD_RECORD_TAG, record)
    else:
      yield record   # Output the record onto the main input

record_collections = (original_records | 
                      beam.ParDo(SplitRecords()).with_outputs(
                          SplitRecords.BAD_RECORD_TAG,
                          main='GoodRecords'))

bad_records = record_collections[SplitRecords.BAD_RECORD_TAG]

good_records = record_collections['GoodRecords']

有关更详细的示例,我建议您查看 Apache Beam 说明书目录,其中包含 example with a multiple-output ParDo

关于python - Apache Beam 全局计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51675154/

相关文章:

google-cloud-dataflow - Google Dataflow 可以生成 Parquet 文件吗

python - 如何在 Apache Beam Python 中获取窗口时间戳的结束

java - 如何修复 "incompatible types: org.apache.beam.sdk.options.ValueProvider<java.lang.String> cannot be converted to java.lang.String"

python - 数据流错误 : 'Clients have non-trivial state that is local and unpickleable'

google-cloud-dataflow - 如何在数据流/光束中将 PCollection<List<String>> 转换为 PCollection<String>

Python GroupBy 时间间隔

python - 如何在python中计算正态累积分布函数的倒数?

python - Mayavi:绕y轴旋转

Cloud DataFlow 中的 Python 依赖项,requirements.txt 在本地工作,但在工作线程上不起作用

python - 使用 python 抓取 crunchbase 数据