我需要将数据存储实体发送到 BigQuery 表,同时进行数据转换。
目前我的设计如下:
AppEngine Java 应用程序将数据发布到 PUB/SUB 服务中的主题 - 使其正常工作。
然后让 DataflowPipeline 订阅主题并阅读消息。然后完成转换并将结果写入 BigQuery。我正在运行一些示例代码来对此进行测试。
我有一个可以在我的本地开发机器上运行的原始流水线,我可以运行它——所有这些都作为演示代码工作。这是通过本地运行
mvn appengine:devserver
现在的问题是:如何从 Google App Engine 部署数据流管道?开发机器无法访问生产环境,因此我无法在 Google Pipeline Service 上运行我的 Pipeline。 我试图从 Google App Engine 提交这个,但收到了内存不足的错误。这似乎与某些身份验证问题有关。从 StackOverflow 上的其他帖子看来,App Engine 的这种“部署”似乎不受“官方”支持。
那么在生产环境中如何做到这一点呢?
到目前为止的环境依赖:
行家 3.3.0
谷歌应用引擎 1.9.28
谷歌 API 客户端 1.20.0
Java 1.7.0_79
工作站 - Windows 7
Google 开发环境:黄金套餐
这是我让管道进程运行的示例代码....
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setNumWorkers(2);
options.setRunner(DataflowPipelineRunner.class);
options.setStagingLocation("gs://pipeline_bucket2");
options.setProject("projectname");
options.setJobName("starterpipeline");
options.setUpdate(true);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("Hello", "World")).apply(ParDo.named("StringExtract").of(new DoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
})).apply(ParDo.named("StringLogger").of(new DoFn<String, Void>() {
@Override
public void processElement(ProcessContext c) {
LOG.info(c.element());
}
}));
p.run();
这是我在尝试运行上面的代码时出现错误的堆栈跟踪:
Uncaught exception from servlet
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection$BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
at com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)
最佳答案
Dataflow 在上传您的应用工件时与 Google Cloud Storage 通信时使用 64mb 缓冲区。如果您使用的实例没有足够的内存,则可能会导致 OOM,例如,如果您使用 AppEngine instance内存为 128mbs。
另请注意,每当您更新模块或 AppEngine 进行内部更新时,第一次执行您的数据流管道时,数据流 SDK 需要将所有更改的应用程序工件上传到 Google Cloud Storage。根据应用程序的大小,这可能需要超过 60 秒,这是前端实例请求的限制,并可能导致 deadline exceeded errors .
关于java - 来自 App Engine 的管道提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33647161/