python - 定义管道的 beam.ParDo 阶段

标签 python class apache-beam

我正在 Google Cloud Platform 上使用 AI Platform Jupyter Notebook 环境(适用于 Python 3 的 Apache Beam 2.28.0)开发 Apache Beam 管道。浏览我必须亲自操作的代码,我发现了这样应用的转换:new_pColl = pColl | "Process images" >> ProcessImages(*args)哪里ProcessImages是一个由以下定义的类:

import datetime
import json
import time
from ssl import SSLError

import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError

from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
                                 text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory

class ProcessImages(PTransform):
def __init__(self, creative_source_type):
    self.operations_map = {
        'image_properties_annotation': image,
        'text_annotations': text,
        'safe_search_annotation': safe_search,
        'label_annotations': label,
        'logo_annotations': logo,
        'face_annotations': face,
        'object_annotations': objects,
    }
    self.creative_source_type = creative_source_type

def expand(self, pColl):
    image_endpoints = ['image_properties_annotation', 'text_annotations',
                       'safe_search_annotation', 'label_annotations',
                       'logo_annotations', 'face_annotations', 'object_annotations']

    annotated_creatives = (
        pColl | 'Annotate image creatives' >> beam.ParDo(
            ExtractImageMetadata())
        | 'Extract Top Level Features' >> beam.ParDo(
            base.Extract()))

    features = {}

    features['base'] = annotated_creatives | 'collect base features' >> beam.ParDo(
        base.Collect())

    for endpoint in image_endpoints:
        endpoint_features = (annotated_creatives
                             | f'Filter {endpoint}' >> beam.ParDo(
                                 util.FilterAPIOutput(), endpoint=endpoint)
                             | f"Extract {endpoint} features" >> beam.ParDo(
                                 self.operations_map[endpoint].Extract())
                             | f"Collect {endpoint} by key" >> beam.ParDo(
                                 self.operations_map[endpoint].Collect()))
        features[endpoint] = endpoint_features

    performance_DoFn = PerformanceCollectFactory(
    ).CreatePerformanceCollect
    features['performance'] = pColl | 'extract performance' >> beam.ParDo(performance_DoFn,
                                                                          self.creative_source_type)
    
    return (features | 'Group features' >> beam.CoGroupByKey()
            | 'Flatten features' >> beam.ParDo(Flatten()))

我的观点是,我想重构那段代码,最终得到一种更经典的方法,其中转换将由 new_pColl = pColl | "Process images" >> beam.ParDo(ProcessImages(), *args) 调用。就像我的管道的其他步骤一样。

我尝试了几次,但总是出错,并且不知道如何正确重构。 这是我为该类编写的定义,因为我希望使用它:

import datetime
import json
import time
from ssl import SSLError

import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError

from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
                                 text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory

class ProcessImages(beam.DoFn):
    def process_element(self, element, creative_source_type):
        self.operations_map = {
            "image_properties_annotation": image,
            "text_annotations": text,
            "safe_search_annotation": safe_search,
            "label_annotations": label,
            "logo_annotations": logo,
            "face_annotations": face,
            "object_annotations": objects,
        }
        self.creative_source_type = creative_source_type
        
        image_endpoints = [k for k in self.operations_map.keys()]
        
        annotated_creatives = (
            element | "Annotate image creatives" >> beam.ParDo(
                ExtractImageMetadata())
            | "Extract Top Level Features" >> beam.ParDo(
                base.Extract())
        )
        
        features = {}
        
        features["base"] = annotated_creatives | "Collect base features" >> beam.ParDo(
            base.Collect()
        )
        
        for endpoint in image_endpoints:
            endpoint_features = (annotated_creatives
                                 | f"Filter {endpoint}" >> beam.ParDo(
                                     util.FilterAPIOutput(), endpoint=endpoint
                                 )
                                 | f"Extract {endpoint} features" >> beam.ParDo(
                                     self.operations_map[endpoint].Extract()
                                 )
                                 | f"Collect {endpoint} by key" >> beam.ParDo(
                                     self.operations_map[endpoint].Collect())
                                )
            features[endpoint] = endpoint_features
            
        features["performance"] = element | "Extract performance" >> beam.ParDo(PerformanceCollectFactory().CreatePerformanceCollect,
self.creative_source_type)
        return (
            features | "Group features" >> beam.CoGroupByKey()
            | "Flatten features" >> beam.ParDo(Flatten())
        )

运行管道时遇到的错误:

    ---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
    637     """
--> 638     raise NotImplementedError
    639 

NotImplementedError: 

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-13-0f9bbda0a56f> in <module>
----> 1 ib.show(process_images)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
    226   def run_within_progress_indicator(*args, **kwargs):
    227     with ProgressIndicator('Processing...', 'Done.'):
--> 228       return func(*args, **kwargs)
    229 
    230   return run_within_progress_indicator

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
    484   recording_manager = ie.current_env().get_recording_manager(
    485       user_pipeline, create_if_absent=True)
--> 486   recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
    487 
    488   # Catch a KeyboardInterrupt to gracefully cancel the recording and

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
    441           category=DeprecationWarning)
    442       pf.PipelineFragment(list(uncomputed_pcolls),
--> 443                           self.user_pipeline.options).run()
    444       result = ie.current_env().pipeline_result(self.user_pipeline)
    445     else:

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py in run(self, display_pipeline_graph, use_cache, blocking)
    114       self._runner_pipeline.runner._force_compute = not use_cache
    115       self._runner_pipeline.runner._blocking = blocking
--> 116       return self.deduce_fragment().run()
    117     finally:
    118       self._runner_pipeline.runner._skip_display = preserved_skip_display

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    557         finally:
    558           shutil.rmtree(tmpdir)
--> 559       return self.runner.run_pipeline(self, self._options)
    560     finally:
    561       shutil.rmtree(self.local_tempdir, ignore_errors=True)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
    196 
    197     main_job_result = PipelineResult(
--> 198         pipeline_to_execute.run(), pipeline_instrument)
    199     # In addition to this pipeline result setting, redundant result setting from
    200     # outer scopes are also recommended since the user_pipeline might not be

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    557         finally:
    558           shutil.rmtree(tmpdir)
--> 559       return self.runner.run_pipeline(self, self._options)
    560     finally:
    561       shutil.rmtree(self.local_tempdir, ignore_errors=True)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    131       runner = BundleBasedDirectRunner()
    132 
--> 133     return runner.run_pipeline(pipeline, options)
    134 
    135 

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    181 
    182     self._latest_run_result = self.run_via_runner_api(
--> 183         pipeline.to_runner_api(default_environment=self._default_environment))
    184     return self._latest_run_result
    185 

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    191     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    192     #   the teststream (if any), and all the stages).
--> 193     return self.run_stages(stage_context, stages)
    194 
    195   @contextlib.contextmanager

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    357           stage_results = self._run_stage(
    358               runner_execution_context,
--> 359               bundle_context_manager,
    360           )
    361           monitoring_infos_by_stage[stage.name] = (

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    553               input_timers,
    554               expected_timer_output,
--> 555               bundle_manager)
    556 
    557       final_result = merge_results(last_result)

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    593 
    594     result, splits = bundle_manager.process_bundle(
--> 595         data_input, data_output, input_timers, expected_timer_output)
    596     # Now we collect all the deferred inputs remaining from bundle execution.
    597     # Deferred inputs can be:

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
    894             process_bundle_descriptor.id,
    895             cache_tokens=[next(self._cache_token_generator)]))
--> 896     result_future = self._worker_handler.control_conn.push(process_bundle_req)
    897 
    898     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    378       self._uid_counter += 1
    379       request.instruction_id = 'control_%s' % self._uid_counter
--> 380     response = self.worker.do_instruction(request)
    381     return ControlFuture(request.instruction_id, response)
    382 

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    605       # E.g. if register is set, this will call self.register(request.register))
    606       return getattr(self, request_type)(
--> 607           getattr(request, request_type), request.instruction_id)
    608     else:
    609       raise NotImplementedError

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    642         with self.maybe_profile(instruction_id):
    643           delayed_applications, requests_finalization = (
--> 644               bundle_processor.process_bundle(instruction_id))
    645           monitoring_infos = bundle_processor.monitoring_infos()
    646           monitoring_infos.extend(self.state_cache_metrics_fn())

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    998           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
    999             input_op_by_transform_id[element.transform_id].process_encoded(
-> 1000                 element.data)
   1001 
   1002       # Finish all operations.

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    226       decoded_value = self.windowed_coder_impl.decode_from_stream(
    227           input_stream, True)
--> 228       self.output(decoded_value)
    229 
    230   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.ConsumerSet.receive()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

~/apache-beam-2.28.0/lib/python3.7/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
    444         if traceback == Ellipsis:
    445             _, _, traceback = sys.exc_info()
--> 446         raise exc.with_traceback(traceback)
    447 
    448 else:

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()

~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
    636       An Iterable of output elements or None.
    637     """
--> 638     raise NotImplementedError
    639 
    640   def setup(self):

RuntimeError: NotImplementedError [while running '[10]: Process images']

我不知道如何正确实现该转换。我在哪里配置我的转换错误?我是否传递了一个不能被视为 pCollection 的参数?给我的ProcessImages变换?

最佳答案

我认为这里有两个问题。首先,如果您子类 DoFn ,您需要实现 process方法,而不是 process_element你现在有的方法。 Per the documentation :

Method to use for processing elements.
This is invoked by DoFnRunner for each element of a input PCollection.

因此,运行程序正在尝试调用您尚未实现的方法,这解释了您收到的错误。

也许更大的问题是,如上所述,ParDo调用DoFn输入 Pcollection 的每个元素作为参数,而不是 PCollection本身。您正在执行的逻辑适用于 PCollection作为一个整体,所以你不能使用 DoFn要做到这一点。例如:

annotated_creatives = (
            element | "Annotate image creatives" >> beam.ParDo(
                ExtractImageMetadata())
            | "Extract Top Level Features" >> beam.ParDo(
                base.Extract())
        )

element这里不是PCollection ,所以它没有 | (或 __ror__ )方法。您想要实现的是管道的可重用部分,这正是 PTransform 的作用。用于您最初的实现。我建议阅读"Composite Transform" section of the Beam user guide其中讨论了这些概念。

关于python - 定义管道的 beam.ParDo 阶段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66471439/

相关文章:

python - 在 Beam 中读取和写入序列化的 protobuf

python - pandas groupby 中两个系列的最大值和最小值

python - Python的日志格式可以根据消息日志级别进行修改吗?

python - 将字节字符串拆分为行

python - 通过 Python (lxml) 在 xml 中更改父级

Java - 将 MessagePack 时间戳转换为日期

c++ - 类标识符从不工作?

Python - 为类的实例创建列表

c# - 动态调用方法和类名

java - 使用 Apache Beam 2.9.0 Java SDK 的 Google 数据流作业陷入困境