我有一个包含数百万条记录的文件,其中一些记录是坏记录(在 ParDo 中处理记录时就会知道)。我想将不良记录及其在文件中出现的行号写入单独的 PCollection,将良好记录写入单独的 PCollection。
有没有一种方法可以维护迄今为止在工作进程中读取的行的全局计数器,以便我可以使用它来写出行号?
最佳答案
您可以使用 Apache Beam 指标来保存全局监控计数器,您可以从您的计算机或运行程序的 UI 中查询该计数器。
如果您想保留所有不良记录的精确集合以及有关它们的信息(例如行号),那么您需要添加一个转换来实现这一点。像这样的事情:
original_records = p | LoadRecords()
class SplitRecords(beam.DoFn):
BAD_RECORD_TAG = 'BadRecord'
def process(self, record):
if self.is_bad(record):
# Output the record onto the 'special' BadRecord input.
yield beam.pvalue.TaggedOutput(self.BAD_RECORD_TAG, record)
else:
yield record # Output the record onto the main input
record_collections = (original_records |
beam.ParDo(SplitRecords()).with_outputs(
SplitRecords.BAD_RECORD_TAG,
main='GoodRecords'))
bad_records = record_collections[SplitRecords.BAD_RECORD_TAG]
good_records = record_collections['GoodRecords']
有关更详细的示例,我建议您查看 Apache Beam 说明书目录,其中包含 example with a multiple-output ParDo
关于python - Apache Beam 全局计数器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51675154/