python - 左加入 apache beam

标签 python apache-beam

在 apache beam 中跟随 Pcollection 左连接的更好方法是什么?

pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), ('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), ('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]

预期输出是

[('a', 1), ('x', 10)]
[('b', 2), ('x', 10)] 
[('c', 3), ('x', 10)] 
[('d', 4), ('x', 10)]
[('e', 5), ('x', 10)] 
[('f', 6), ('x', 10)]
[('a', 12), ('x', 20)]
[('b', 21), ('x', 20)] 
[('c', 13), ('x', 20)]
[('a', 21)]
[('b', 23)]
[('c', 31)]

我已经使用 CoGroupByKey() 和 Pardo() 实现了左连接器。 beam Python SDK 中是否有其他实现左连接器的方法?

left_joined = (
    {'left': pcoll1, 'right': pcoll2}
    | 'LeftJoiner: Combine' >> beam.CoGroupByKey()
    | 'LeftJoiner: ExtractValues' >> beam.Values()
    | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn())
)


class LeftJoinerFn(beam.DoFn):

    def __init__(self):
        super(LeftJoinerFn, self).__init__()

    def process(self, row, **kwargs):

        left = row['left']
        right = row['right']

        if left and right:
            for each in left:
                yield each + right[0]

        elif left:
            for each in left:
                yield each

最佳答案

您可以使用以下代码为连接的右侧使用侧输入,假设右侧总是将一个元素映射到每个键,这意味着它的大小总是比左侧 pcollection 小得多.

此外,如果您的 pcollection 是通过从外部源而不是内存数组读取创建的,您将需要传递 right_list=beam.pvalue.asList(pcoll2) 而不是 right_list=pcoll2 到 ParDo 。检查here了解更多信息

class LeftJoinerFn(beam.DoFn):

    def __init__(self):
        super(LeftJoinerFn, self).__init__()

    def process(self, row, **kwargs):

        right_dict = dict(kwargs['right_list'])
        left_key = row[0]

        if left_key in right_dict:
            for each in row[1]:
                yield each + right_dict[left_key]

        else:
            for each in row[1]:
                yield each

class Display(beam.DoFn):
    def process(self, element):
        LOG.info(str(element))
        yield element

p = beam.Pipeline(options=pipeline_options)

pcoll1 = [('key1', [[('a', 1)],[('b', 2)], [('c', 3)], [('d', 4)],[('e', 5)], [('f', 6)]]), \
        ('key2',[[('a', 12)],[('b', 21)], [('c', 13)]]), \
        ('key3',[[('a', 21)],[('b', 23)], [('c', 31)]])\
        ]
pcoll2 = [('key1', [[('x', 10)]]), ('key2', [[('x', 20)]])]


left_joined = (
    pcoll1 
    | 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn(), right_list=pcoll2)
    | 'Display' >> beam.ParDo(Display())
)
p.run()

关于python - 左加入 apache beam,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45514653/

相关文章:

python - 总结数组内所有不同形状的 numpy 子数组的所有值

python - Django 登录所需的中间件不起作用

python从特定索引继续循环

python - 使用 Dataflow 和 Apache Beam (Python) 将数据从 Pub/Sub 流式传输到 BigQuery 时出现问题

google-cloud-platform - 优化内存密集型数据流管道的 GCP 成本

python - 如何在python pyqt5中制作透明的十字符号

python - Tensorflow:使用更多不同矩阵进行张量乘法

java - 数据流管道和发布订阅模拟器

mysql - 带有 JdbcIO 编写器的 ApacheBeam/DataFlow runner 创建了太多连接

java - Java大数据处理