java - 来自 App Engine 的管道提交

标签 java google-app-engine google-cloud-dataflow

我需要将数据存储实体发送到 BigQuery 表,同时进行数据转换。 目前我的设计如下:
AppEngine Java 应用程序将数据发布到 PUB/SUB 服务中的主题 - 使其正常工作。 然后让 DataflowPipeline 订阅主题并阅读消息。然后完成转换并将结果写入 BigQuery。我正在运行一些示例代码来对此进行测试。

我有一个可以在我的本地开发机器上运行的原始流水线,我可以运行它——所有这些都作为演示代码工作。这是通过本地运行 mvn appengine:devserver

现在的问题是:如何从 Google App Engine 部署数据流管道?开发机器无法访问生产环境,因此我无法在 Google Pipeline Service 上运行我的 Pipeline。 我试图从 Google App Engine 提交这个,但收到了内存不足的错误。这似乎与某些身份验证问题有关。从 StackOverflow 上的其他帖子看来,App Engine 的这种“部署”似乎不受“官方”支持。

那么在生产环境中如何做到这一点呢?

到目前为止的环境依赖:
行家 3.3.0
谷歌应用引擎 1.9.28
谷歌 API 客户端 1.20.0
Java 1.7.0_79
工作站 - Windows 7
Google 开发环境:黄金套餐
这是我让管道进程运行的示例代码....

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        options.setNumWorkers(2);
        options.setRunner(DataflowPipelineRunner.class);
        options.setStagingLocation("gs://pipeline_bucket2");
        options.setProject("projectname");
        options.setJobName("starterpipeline");
        options.setUpdate(true);

        Pipeline p = Pipeline.create(options);

        p.apply(Create.of("Hello", "World")).apply(ParDo.named("StringExtract").of(new DoFn<String, String>() {
            @Override
            public void processElement(ProcessContext c) {
                c.output(c.element().toUpperCase());
            }
        })).apply(ParDo.named("StringLogger").of(new DoFn<String, Void>() {
            @Override
            public void processElement(ProcessContext c) {
                LOG.info(c.element());
            }
        }));

        p.run();

这是我在尝试运行上面的代码时出现错误的堆栈跟踪:

Uncaught exception from servlet
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection$BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
    at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
    at com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
    at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
    at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
    at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
    at java.util.concurrent.FutureTask.run(FutureTask.java:260)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
    at java.lang.Thread.run(Thread.java:745)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

最佳答案

Dataflow 在上传您的应用工件时与 Google Cloud Storage 通信时使用 64mb 缓冲区。如果您使用的实例没有足够的内存,则可能会导致 OOM,例如,如果您使用 AppEngine instance内存为 128mbs。

另请注意,每当您更新模块或 AppEngine 进行内部更新时,第一次执行您的数据流管道时,数据流 SDK 需要将所有更改的应用程序工件上传到 Google Cloud Storage。根据应用程序的大小,这可能需要超过 60 秒,这是前端实例请求的限制,并可能导致 deadline exceeded errors .

关于java - 来自 App Engine 的管道提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33647161/

相关文章:

java - 我是否误解了 Java Bean 方法命名约定,或者这是一个异常情况?

google-app-engine - 在本地环境中重定向后,我是否应该期待过时的结果?

java - 如何在 Android Activity 中从位图创建 Blob?

java - 适用于 JFLAP 的 IP 验证正则表达式

java - 为 Eclipse Java 调试器禁用热代码替换

python - Cloud Function触发数据流时如何在数据流中传递requirements.txt参数?

google-cloud-storage - 谷歌云存储与 HDFS

java - 创建数据存储实体并在数据流管道中使用它的正确方法是什么

java - 如何为 IO 文件创建 stub 以在 Java 中进行单元测试?

python - GAE : how to add new data in a memcache var