java - 从 Spring Controller 执行 Google Cloud Dataflow 管道

标签 java spring google-app-engine google-cloud-dataflow apache-beam

我将如何使用 Spring 执行到 Google Cloud Dataflow 的 Apache Beam 管道?这个问题类似于Running Apache Beam pipeline in Spring Boot project on Google Data Flow ,但是这个更关注从 Spring Controller 而不是从 CommandLineRunner 启动管道。

@RestController
@RequestMapping("/task/import-csv-file")
public class ImportCsvController {
    @PostMapping("/process-csv-file")
    private ResponseEntity<Void> processCsvFile(
            @RequestParam String gcsFileName,
            @RequestParam String bucketName
    ) {
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

        options.setProject("same-project-as-this-app-engine-instance");
        options.setStagingLocation("gs://" + bucketName + "/binaries");
        options.setRunner(DataflowRunner.class);
        options.setJobname("process-csv");

        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("ReadFile", TextIO.read().from("gs://" + bucketName + "/" + gcsFileName));
        // ... apply some more transforms here, which will eventually 
        // write csv rows as Google Datastore entities ...
        pipeline.run().waitUntilFinish();
        return ResponseEntity.ok().build();
    }
}

我正在使用 Google Cloud Tasks 运行此 Controller ,使用以下代码:

@Service
public class TaskQueueService {
    private Queue csvImportsQueue;

    public TaskQueueService() {
        this.csvImportsQueue = QueueFactory.getQueue("csv-import-queue");
    }

    public void queueImportCsvFile(String gcsFileName, String bucketName) {
        String url = "/task/import-csv-file/process-csv-file";
        TaskOptions taskOptions = TaskOptions.Builder
                .withUrl(url)
                .method(POST)
                .param("gcsFileName", gcsFileName)
                .param("bucketName", bucketName);
        queue.add(ofy().getTransaction(), taskOptions);
    }
}

从 Google Cloud Logging 中,我收到此错误
java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)

在堆栈跟踪中,此错误消息看起来很有用:
Caused by: java.lang.IllegalArgumentException: Missing required value for [public abstract java.lang.String org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.getProject(), "Project id. Required when running a Dataflow in the cloud. See https://cloud.google.com/storage/docs/projects for further details."]. 

但是正如您在上面看到的,我确实使用行 options.setProject("same-project-as-this-app-engine-instance"); 设置了项目

编辑:我发现了一个不同的堆栈跟踪,它有不同的错误消息。这是整个堆栈跟踪。
java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
    at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
    at org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
    at com.example.application.controllers.tasks.ImportCsvController.processCsvFile(ImportCsvController.java:64)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:877)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
    at com.example.application.filters.SwitchUserProfileFilter.doFilter(SwitchUserProfileFilter.java:127)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:119)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLLogoutFilter.processLogout(SAMLLogoutFilter.java:168)
    at org.springframework.security.saml.SAMLLogoutFilter.doFilter(SAMLLogoutFilter.java:110)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLDiscovery.doFilter(SAMLDiscovery.java:137)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLLogoutProcessingFilter.processLogout(SAMLLogoutProcessingFilter.java:209)
    at org.springframework.security.saml.SAMLLogoutProcessingFilter.doFilter(SAMLLogoutProcessingFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLEntryPoint.doFilter(SAMLEntryPoint.java:102)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.metadata.MetadataDisplayFilter.doFilter(MetadataDisplayFilter.java:84)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.metadata.MetadataGeneratorFilter.doFilter(MetadataGeneratorFilter.java:87)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:66)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at com.example.application.filters.CustomDomainFilter.doFilterInternal(CustomDomainFilter.java:43)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:117)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.access$000(ErrorPageFilter.java:61)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter$1.doFilterInternal(ErrorPageFilter.java:92)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:110)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:48)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at com.google.apphosting.runtime.jetty9.ParseBlobUploadHandler.handle(ParseBlobUploadHandler.java:119)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1182)
    at com.google.apphosting.runtime.jetty9.AppEngineWebAppContext.doHandle(AppEngineWebAppContext.java:187)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:293)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.eclipse.jetty.server.Server.handle(Server.java:539)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
    at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:213)
    at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
    at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:134)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:757)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:720)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:690)
    at com.google.apphosting.runtime.JavaRuntime$NullSandboxRequestRunnable.run(JavaRuntime.java:882)
    at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:270)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException: null
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
    ... 123 common frames omitted
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    at com.sun.proxy.$Proxy164.getGcsUtil(Unknown Source)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.tryCreateDefaultBucket(GcpOptions.java:354)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:300)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:288)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    at com.sun.proxy.$Proxy149.getGcpTempLocation(Unknown Source)
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:249)
    ... 128 common frames omitted

编辑 2:我在我的源文件之一中输入了这两个导入:

import com.google.common.cache.CacheBuilder;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;

然后我按下 Cmd+Click 去他们在 Intellij 中的实现,我发现方法重载不存在(就像上面长的 stacktrace 中埋没的一样)。
~/.gradle/caches/modules-2/files-2.1/com.google.cloud.bigdataoss/gcsio/2.0.0/ba86cba5b74f7ded14feb682cc81ece7724573a7/gcsio-2.0.0-sources.jar!/com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl.java
private final LoadingCache<String, Boolean> autoBuckets =
    CacheBuilder.newBuilder()
        .expireAfterWrite(Duration.ofHours(1))
        .build(
            new CacheLoader<String, Boolean>() {
              final List<String> iamPermissions = ImmutableList.of("storage.buckets.get");

              @Override
              public Boolean load(String bucketName) throws Exception {
                try {
                  gcs.buckets()
                      .testIamPermissions(bucketName, iamPermissions)
                      .executeUnparsed()
                      .disconnect();
                } catch (IOException e) {
                  return errorExtractor.userProjectMissing(e);
                }
                return false;
              }
            });
~/.gradle/caches/modules-2/files-2.1/com.google.guava/guava/29.0-android/63f9bc5fbf2ebfe6b17683f8eac8419588295a28/guava-29.0-android-sources.jar!/com/google/common/cache/CacheBuilder.java
public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
  checkState(
      expireAfterWriteNanos == UNSET_INT,
      "expireAfterWrite was already set to %s ns",
      expireAfterWriteNanos);
  checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
  this.expireAfterWriteNanos = unit.toNanos(duration);
  return this;
}

编辑 3:这是我使用的 Apache Beam 版本:
dependencies {
    compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.20.0'
    compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.20.0'
}

最佳答案

要解决此问题,您应该使用非 Android Guava 版本 - 29.0-jre , 因为这是 GCSIO 库 depends on 的 Guava 版本.

关于java - 从 Spring Controller 执行 Google Cloud Dataflow 管道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61810399/

相关文章:

java - 比较两个字符串是否相等

java - 在扩展泛型类的类中初始化实例变量

java - GWT 解析不同语言环境中的日期

java - 如何使用 at 查询 at 这个? - JPA存储库 Spring 启动

spring - Spring Boot Rest API 中的 @Valid 与 @Validated @RequestBody

Java Swing 绘制矩形

python - AppEngine Python 3.7 标准环境上的 Websockets?

java - Mac 上使用 Dozer 时发出警告

python - OAuth2Decorator 是否存储用户凭证并更新 token ?

java - Spring Data JPA 审核功能在我的项目中不起作用