java - 数据流 + 数据存储 = DatastoreException : I/O error

标签 java google-cloud-platform google-cloud-datastore google-cloud-dataflow

我正在尝试使用 com.google.cloud.datastore 从 DataFlow 写入 DataStore。

我的代码如下所示(受到 [1] 中示例的启发):

public void processElement(ProcessContext c) {
    LocalDatastoreHelper HELPER = LocalDatastoreHelper.create(1.0);
    Datastore datastore = HELPER.options().toBuilder().namespace("ghijklmnop").build().service();
    Key taskKey = datastore.newKeyFactory()
        .ancestors(PathElement.of("TaskList", "default"))
        .kind("Task")
        .newKey("sampleTask");
    Entity task = Entity.builder(taskKey)
        .set("category", "Personal")
        .set("done", false)
        .set("priority", 4)
        .set("description", "Learn Cloud Datastore")
        .build();
    datastore.put(task);
}

我收到此错误:

exception: "java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: com.google.cloud.datastore.DatastoreException: I/O error
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.sideOutputWindowedValue(DoFnRunnerBase.java:314)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.sideOutput(DoFnRunnerBase.java:470)
at com.google.cloud.dataflow.sdk.transforms.Partition$PartitionDoFn.processElement(Partition.java:172)

我尝试使用 DatastoreIO 接收器,但流式运行程序当前似乎不支持它。

如何避免该错误?或者从 DataFlow 写入 DataStore 的最佳方式是什么?

[1] https://github.com/GoogleCloudPlatform/java-docs-samples/blob/master/datastore/src/main/java/com/google/datastore/snippets/Concepts.java

最佳答案

按照@Sam McVeety的建议,我尝试将我的数据存储区代码隔离在数据流之外。我确实遇到了同样的错误!

但这也让我看到了异常的原因,而我在数据流日志中没有看到:

Caused by: java.net.ConnectException: Connection refused

线索就在我使用的导入行中:com.google.cloud.datastore.testing.LocalDatastoreHelper

它是一个测试助手,负责在本地模拟数据存储区 API。哎呀。

这是我经过一些本地调试后得到的代码:

public void processElement(ProcessContext c) {
    final Datastore datastore = DatastoreOptions.defaultInstance().service();
    final KeyFactory keyFactory = datastore.newKeyFactory().kind("Task");

    Key key = datastore.allocateId(keyFactory.newKey());
    Entity task = Entity.builder(key)
        .set("description", StringValue.builder(":D").excludeFromIndexes(true).build())
        .set("created", DateTime.now())
        .set("done", false)
        .build();
    datastore.put(task);
}

主要区别是:

LocalDatastoreHelper.create(1.0).options().toBuilder().namespace("ghijklmnop").build().service()

成为

DatastoreOptions.defaultInstance().service();

关于java - 数据流 + 数据存储 = DatastoreException : I/O error,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39334896/

相关文章:

python - 使用 Google Cloud Functions 在两个 BigQuery 项目之间传输数据

google-cloud-platform - 寻找一种将 GCP Datastore 实体数据导出到 csv 文件的方法

java - Objectify 应用程序引擎 - 使用值列表查询嵌入实体

java - 服务时间与线程数成正比

java - 需要帮助修改输入验证和错误消息的代码

apache-spark - GCP Dataproc 与 Elasticsearch

google-app-engine - 什么是了解 GAE 数据存储架构的好资源?

java - 仅针对内容类型 x-www-form-urlencoded 接受 Spring + Spring Security 请求

java - 如何在 Java 中获取 URL 对象内路径的父级?

kubernetes - GKE 是否需要 prometheus-to-sd?我可以删除它吗?