在 apache beam 中跟随 Pcollection 左连接的更好方法是什么?
pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), ('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), ('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]
预期输出是
[('a', 1), ('x', 10)]
[('b', 2), ('x', 10)]
[('c', 3), ('x', 10)]
[('d', 4), ('x', 10)]
[('e', 5), ('x', 10)]
[('f', 6), ('x', 10)]
[('a', 12), ('x', 20)]
[('b', 21), ('x', 20)]
[('c', 13), ('x', 20)]
[('a', 21)]
[('b', 23)]
[('c', 31)]
我已经使用 CoGroupByKey() 和 Pardo() 实现了左连接器。 beam Python SDK 中是否有其他实现左连接器的方法?
left_joined = (
{'left': pcoll1, 'right': pcoll2}
| 'LeftJoiner: Combine' >> beam.CoGroupByKey()
| 'LeftJoiner: ExtractValues' >> beam.Values()
| 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn())
)
class LeftJoinerFn(beam.DoFn):
def __init__(self):
super(LeftJoinerFn, self).__init__()
def process(self, row, **kwargs):
left = row['left']
right = row['right']
if left and right:
for each in left:
yield each + right[0]
elif left:
for each in left:
yield each
最佳答案
您可以使用以下代码为连接的右侧使用侧输入,假设右侧总是将一个元素映射到每个键,这意味着它的大小总是比左侧 pcollection 小得多.
此外,如果您的 pcollection 是通过从外部源而不是内存数组读取创建的,您将需要传递 right_list=beam.pvalue.asList(pcoll2)
而不是 right_list=pcoll2
到 ParDo 。检查here了解更多信息
class LeftJoinerFn(beam.DoFn):
def __init__(self):
super(LeftJoinerFn, self).__init__()
def process(self, row, **kwargs):
right_dict = dict(kwargs['right_list'])
left_key = row[0]
if left_key in right_dict:
for each in row[1]:
yield each + right_dict[left_key]
else:
for each in row[1]:
yield each
class Display(beam.DoFn):
def process(self, element):
LOG.info(str(element))
yield element
p = beam.Pipeline(options=pipeline_options)
pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), \
('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), \
('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])\
]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]
left_joined = (
pcoll1
| 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn(), right_list=pcoll2)
| 'Display' >> beam.ParDo(Display())
)
p.run()
关于python - 左加入 apache beam,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45514653/