我正在 Google Cloud Platform 上使用 AI Platform Jupyter Notebook 环境(适用于 Python 3 的 Apache Beam 2.28.0)开发 Apache Beam 管道。浏览我必须亲自操作的代码,我发现了这样应用的转换:new_pColl = pColl | "Process images" >> ProcessImages(*args)
哪里ProcessImages
是一个由以下定义的类:
import datetime
import json
import time
from ssl import SSLError
import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError
from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory
class ProcessImages(PTransform):
def __init__(self, creative_source_type):
self.operations_map = {
'image_properties_annotation': image,
'text_annotations': text,
'safe_search_annotation': safe_search,
'label_annotations': label,
'logo_annotations': logo,
'face_annotations': face,
'object_annotations': objects,
}
self.creative_source_type = creative_source_type
def expand(self, pColl):
image_endpoints = ['image_properties_annotation', 'text_annotations',
'safe_search_annotation', 'label_annotations',
'logo_annotations', 'face_annotations', 'object_annotations']
annotated_creatives = (
pColl | 'Annotate image creatives' >> beam.ParDo(
ExtractImageMetadata())
| 'Extract Top Level Features' >> beam.ParDo(
base.Extract()))
features = {}
features['base'] = annotated_creatives | 'collect base features' >> beam.ParDo(
base.Collect())
for endpoint in image_endpoints:
endpoint_features = (annotated_creatives
| f'Filter {endpoint}' >> beam.ParDo(
util.FilterAPIOutput(), endpoint=endpoint)
| f"Extract {endpoint} features" >> beam.ParDo(
self.operations_map[endpoint].Extract())
| f"Collect {endpoint} by key" >> beam.ParDo(
self.operations_map[endpoint].Collect()))
features[endpoint] = endpoint_features
performance_DoFn = PerformanceCollectFactory(
).CreatePerformanceCollect
features['performance'] = pColl | 'extract performance' >> beam.ParDo(performance_DoFn,
self.creative_source_type)
return (features | 'Group features' >> beam.CoGroupByKey()
| 'Flatten features' >> beam.ParDo(Flatten()))
我的观点是,我想重构那段代码,最终得到一种更经典的方法,其中转换将由 new_pColl = pColl | "Process images" >> beam.ParDo(ProcessImages(), *args)
调用。就像我的管道的其他步骤一样。
我尝试了几次,但总是出错,并且不知道如何正确重构。 这是我为该类编写的定义,因为我希望使用它:
import datetime
import json
import time
from ssl import SSLError
import apache_beam as beam
from apache_beam.transforms.ptransform import PTransform
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient.errors import HttpError
from dataflow_helpers import util
from operations.features import (base, face, image, label, logo, objects, safe_search,
text)
from operations.features.flatten import Flatten
from operations.vision.image import ExtractImageMetadata
from operations.performance.performance_collect_factory import PerformanceCollectFactory
class ProcessImages(beam.DoFn):
def process_element(self, element, creative_source_type):
self.operations_map = {
"image_properties_annotation": image,
"text_annotations": text,
"safe_search_annotation": safe_search,
"label_annotations": label,
"logo_annotations": logo,
"face_annotations": face,
"object_annotations": objects,
}
self.creative_source_type = creative_source_type
image_endpoints = [k for k in self.operations_map.keys()]
annotated_creatives = (
element | "Annotate image creatives" >> beam.ParDo(
ExtractImageMetadata())
| "Extract Top Level Features" >> beam.ParDo(
base.Extract())
)
features = {}
features["base"] = annotated_creatives | "Collect base features" >> beam.ParDo(
base.Collect()
)
for endpoint in image_endpoints:
endpoint_features = (annotated_creatives
| f"Filter {endpoint}" >> beam.ParDo(
util.FilterAPIOutput(), endpoint=endpoint
)
| f"Extract {endpoint} features" >> beam.ParDo(
self.operations_map[endpoint].Extract()
)
| f"Collect {endpoint} by key" >> beam.ParDo(
self.operations_map[endpoint].Collect())
)
features[endpoint] = endpoint_features
features["performance"] = element | "Extract performance" >> beam.ParDo(PerformanceCollectFactory().CreatePerformanceCollect,
self.creative_source_type)
return (
features | "Group features" >> beam.CoGroupByKey()
| "Flatten features" >> beam.ParDo(Flatten())
)
运行管道时遇到的错误:
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
637 """
--> 638 raise NotImplementedError
639
NotImplementedError:
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
<ipython-input-13-0f9bbda0a56f> in <module>
----> 1 ib.show(process_images)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
226 def run_within_progress_indicator(*args, **kwargs):
227 with ProgressIndicator('Processing...', 'Done.'):
--> 228 return func(*args, **kwargs)
229
230 return run_within_progress_indicator
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
484 recording_manager = ie.current_env().get_recording_manager(
485 user_pipeline, create_if_absent=True)
--> 486 recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
487
488 # Catch a KeyboardInterrupt to gracefully cancel the recording and
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
441 category=DeprecationWarning)
442 pf.PipelineFragment(list(uncomputed_pcolls),
--> 443 self.user_pipeline.options).run()
444 result = ie.current_env().pipeline_result(self.user_pipeline)
445 else:
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py in run(self, display_pipeline_graph, use_cache, blocking)
114 self._runner_pipeline.runner._force_compute = not use_cache
115 self._runner_pipeline.runner._blocking = blocking
--> 116 return self.deduce_fragment().run()
117 finally:
118 self._runner_pipeline.runner._skip_display = preserved_skip_display
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
557 finally:
558 shutil.rmtree(tmpdir)
--> 559 return self.runner.run_pipeline(self, self._options)
560 finally:
561 shutil.rmtree(self.local_tempdir, ignore_errors=True)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py in run_pipeline(self, pipeline, options)
196
197 main_job_result = PipelineResult(
--> 198 pipeline_to_execute.run(), pipeline_instrument)
199 # In addition to this pipeline result setting, redundant result setting from
200 # outer scopes are also recommended since the user_pipeline might not be
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
557 finally:
558 shutil.rmtree(tmpdir)
--> 559 return self.runner.run_pipeline(self, self._options)
560 finally:
561 shutil.rmtree(self.local_tempdir, ignore_errors=True)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
131 runner = BundleBasedDirectRunner()
132
--> 133 return runner.run_pipeline(pipeline, options)
134
135
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
181
182 self._latest_run_result = self.run_via_runner_api(
--> 183 pipeline.to_runner_api(default_environment=self._default_environment))
184 return self._latest_run_result
185
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
191 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
192 # the teststream (if any), and all the stages).
--> 193 return self.run_stages(stage_context, stages)
194
195 @contextlib.contextmanager
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
357 stage_results = self._run_stage(
358 runner_execution_context,
--> 359 bundle_context_manager,
360 )
361 monitoring_infos_by_stage[stage.name] = (
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
553 input_timers,
554 expected_timer_output,
--> 555 bundle_manager)
556
557 final_result = merge_results(last_result)
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
593
594 result, splits = bundle_manager.process_bundle(
--> 595 data_input, data_output, input_timers, expected_timer_output)
596 # Now we collect all the deferred inputs remaining from bundle execution.
597 # Deferred inputs can be:
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
894 process_bundle_descriptor.id,
895 cache_tokens=[next(self._cache_token_generator)]))
--> 896 result_future = self._worker_handler.control_conn.push(process_bundle_req)
897
898 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
378 self._uid_counter += 1
379 request.instruction_id = 'control_%s' % self._uid_counter
--> 380 response = self.worker.do_instruction(request)
381 return ControlFuture(request.instruction_id, response)
382
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
605 # E.g. if register is set, this will call self.register(request.register))
606 return getattr(self, request_type)(
--> 607 getattr(request, request_type), request.instruction_id)
608 else:
609 raise NotImplementedError
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
642 with self.maybe_profile(instruction_id):
643 delayed_applications, requests_finalization = (
--> 644 bundle_processor.process_bundle(instruction_id))
645 monitoring_infos = bundle_processor.monitoring_infos()
646 monitoring_infos.extend(self.state_cache_metrics_fn())
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
998 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
999 input_op_by_transform_id[element.transform_id].process_encoded(
-> 1000 element.data)
1001
1002 # Finish all operations.
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
226 decoded_value = self.windowed_coder_impl.decode_from_stream(
227 input_stream, True)
--> 228 self.output(decoded_value)
229
230 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SdfProcessSizedElements.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/worker/operations.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
~/apache-beam-2.28.0/lib/python3.7/site-packages/future/utils/__init__.py in raise_with_traceback(exc, traceback)
444 if traceback == Ellipsis:
445 _, _, traceback = sys.exc_info()
--> 446 raise exc.with_traceback(traceback)
447
448 else:
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/runners/common.cpython-37m-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
~/apache-beam-2.28.0/lib/python3.7/site-packages/apache_beam/transforms/core.py in process(self, element, *args, **kwargs)
636 An Iterable of output elements or None.
637 """
--> 638 raise NotImplementedError
639
640 def setup(self):
RuntimeError: NotImplementedError [while running '[10]: Process images']
我不知道如何正确实现该转换。我在哪里配置我的转换错误?我是否传递了一个不能被视为 pCollection
的参数?给我的ProcessImages
变换?
最佳答案
我认为这里有两个问题。首先,如果您子类 DoFn
,您需要实现 process
方法,而不是 process_element
你现在有的方法。 Per the documentation :
Method to use for processing elements.
This is invoked byDoFnRunner
for each element of a inputPCollection
.
因此,运行程序正在尝试调用您尚未实现的方法,这解释了您收到的错误。
也许更大的问题是,如上所述,ParDo
调用DoFn
输入 Pcollection
的每个元素作为参数,而不是 PCollection
本身。您正在执行的逻辑适用于 PCollection
作为一个整体,所以你不能使用 DoFn
要做到这一点。例如:
annotated_creatives = (
element | "Annotate image creatives" >> beam.ParDo(
ExtractImageMetadata())
| "Extract Top Level Features" >> beam.ParDo(
base.Extract())
)
element
这里不是PCollection
,所以它没有 |
(或 __ror__
)方法。您想要实现的是管道的可重用部分,这正是 PTransform
的作用。用于您最初的实现。我建议阅读"Composite Transform" section of the Beam user guide其中讨论了这些概念。
关于python - 定义管道的 beam.ParDo 阶段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66471439/