python-2.7 - 如何将参数传递给数据流模板以进行管道构建

标签 python-2.7 google-cloud-datastore google-cloud-dataflow

我正在尝试进行像这样的祖先查询 example并转为模板版本。

问题是参数ancestor_id是用于管道构建期间函数make_query的。 如果我在创建和暂存模板时没有传递它,我将收到 RuntimeValueProviderError: RuntimeValueProvider(option:ancestor_id, type: int).get() not call from a runtime context。但是如果我在创建模板时传递它,它看起来就像一个 StaticValueProvider 在我执行模板时永远不会改变。

构建管道时向模板传递参数的正确方法是什么?

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper
from googledatastore import PropertyFilter

class Test(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--ancestor_id', type=int)

def make_query(ancestor_id):
    ancestor = entity_pb2.Key()
    datastore_helper.add_key_path(ancestor, KIND, ancestor_id)
    query = query_pb2.Query()
    datastore_helper.set_kind(query, KIND)
    datastore_helper.set_property_filter(query.filter, '__key__', PropertyFilter.HAS_ANCESTOR, ancestor)
    return query

pipeline_options = PipelineOptions()
test_options = pipeline_options.view_as(TestOptions)
with beam.Pipeline(options=pipline_options) as p:
  entities = p | ReadFromDatastore(PROJECT_ID, make_query(test_options.ancestor_id.get()))

最佳答案

两个问题。

  1. ValueProvider.value.get()方法只能在运行时方法中运行,如 ParDo.process() 。请参阅example .

  2. 此外,您面临的挑战是您正在使用 Google Cloud Datastore IO(来自数据存储区的查询)。截至今日(2018年5月), official documentation表示 Datastore IO 尚未接受运行时模板参数。

对于 python,尤其如此,

The following connectors accept runtime parameters. File-based IOs: textio, avroio, tfrecordio

解决方法:您可能可以首先运行不带任何模板化参数的查询来获取实体的 PCollection。此时,由于任何转换器都可以接受模板化参数,因此您可以将其用作过滤器。但这取决于您的用例,并且可能不适用于您。

关于python-2.7 - 如何将参数传递给数据流模板以进行管道构建,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50271254/

相关文章:

python - Django:用户对象的 'email' 属性

Python JSONPath 过滤器表达式错误,jsonpath-rw 1.4.0 的意外字符

python - Google App Engine 数据库模型对象对象不可迭代错误

python - 在 Google App Engine 上同步 Memcache 和 Datastore

java - 使用 Java API/Dataflow 将重复记录插入 Big Query - "Repeated field must be imported as a JSON array"

java - 循环遍历 PCollection 以创建 Graph 数据结构,然后将其作为 SideInput 传递到管道转换

google-cloud-dataflow - 没有接收器的流式数据流管道

python-2.7 - 从 Pandas 的另一列中填充一列的缺失值

python - 删除保持样式的 html(标签)部分 - python

python - 使用ndb和python从GAE中通过URL传递的自动分配的ID中检索 key