python - CloudDataflow 不能使用 "google.cloud.datastore"包?

标签 python google-cloud-datastore google-cloud-dataflow

我想将带有事务的数据存储放在 CloudDataflow 上。 所以,我在下面写道。

def exe_dataflow():
....
  from google.cloud import datastore
  # call from pipeline
  def ds_test(content):
    datastore_client = datastore.Client()

    kind = 'test_out'
    name = 'change'
    task_key = datastore_client.key(kind, name)

    for _ in range(3):
        with datastore_client.transaction():
            current_value = client.get(task_key)
            current_value['v'] += content['v']
            datastore_client.put(task)

    # pipeline
....
      | 'datastore test' >> beam.Map(ds_test)

但是,发生了错误,日志消息显示如下。

(7b75e0ef2db229da): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
  ...(SNIP)...
  File "/usr/local/lib/python2.7/dist-packages/dill/dill.py", line 767, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
AttributeError: 'module' object has no attribute 'datastore'

CloudDataflow 不能使用“google.cloud.datastore”包吗?

添加2018/2/28。

我将 --requirements_file 添加到 MyOption

  options = MyOptions(flags = ["--requirements_file", "./requirements.txt"])

然后我制作requirements.txt

google-cloud-datastore==1.5.0

但是,另一个错误发生了。

(366397598dcf7f02): Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 582, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 167, in execute
    op.start()
...(SNIP)...
  File "my_dataflow.py", line 66, in to_entity
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/__init__.py", line 60, in <module>
    from google.cloud.datastore.batch import Batch
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/batch.py", line 24, in <module>
    from google.cloud.datastore import helpers
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore/helpers.py", line 29, in <module>
    from google.cloud.datastore_v1.proto import datastore_pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/__init__.py", line 17, in <module>
    from google.cloud.datastore_v1 import types
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/types.py", line 21, in <module>
    from google.cloud.datastore_v1.proto import datastore_pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/proto/datastore_pb2.py", line 17, in <module>
    from google.cloud.datastore_v1.proto import entity_pb2 as google_dot_cloud_dot_datastore__v1_dot_proto_dot_entity__pb2
  File "/usr/local/lib/python2.7/dist-packages/google/cloud/datastore_v1/proto/entity_pb2.py", line 28, in <module>
    dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
  File "/usr/local/lib/python2.7/dist-packages/google/protobuf/descriptor.py", line 824, in __new__
    return _message.default_pool.AddSerializedFile(serialized_pb)
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/datastore_v1/proto/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
...(SNIP)...
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.

最佳答案

从 Cloud Dataflow Pipeline 与 Cloud Datastore 交互的推荐方法是使用 Datastore I/O API,可通过 Dataflow SDK 获得,并提供一些方法来将数据读取和写入 Cloud Datastore 数据库。

您可以在 this other link 中找到适用于 Python 的数据流 SDK 2.x 的数据存储 I/O 包的详细文档。 . datastore.v1.datastoreio module是您要使用的特定模块。我分享的链接中有很多信息,但简而言之,它是一个连接到 Datastore 的连接器,它使用 PTransform读取/写入/使用 ReadFromDatastore() 类从 Datastore 中删除 PCollection/WriteToDatastore() /DeleteFromDatastore() 分别。

您应该尝试使用它而不是自己实现调用。我怀疑这可能是您看到的错误的原因,因为 Datastore 实现已经存在于 Dataflow SDK 中:

"google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".

更新:

看起来这三个类收集了多个突变并在单个事务中执行它们。您可以在 code describing the classes 中查看.

如果目标是检索 (get()) 然后更新 (put()) Datastore 实体,您可能可以使用 write_mutations() function ,即 described in the documentation , 你可以使用完整的 batch of mutations执行您感兴趣的操作。

关于python - CloudDataflow 不能使用 "google.cloud.datastore"包?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48990784/

相关文章:

python - 将 Pandas 数据框导出为表格图像

python - 覆盖 get_by_key_name() 以使用内存缓存

python - 从 python 脚本测试 7-Zip 文件

python - subprocess.check_output() : show output on failure

python - 如何在Python中找到素数

java - Dataflow 如何与 BIgQuery 数据集配合使用

java - 如何在 Apache Beam 中以 byte[] 形式读取文件?

java - 在 Google 数据存储中使用不完整的 key 调用 get?

python - Google App Engine 上的动态属性与固定属性查询速度

java - 使用 Apache Beam 将重复的字符串写入 BigQuery