java - BigQuery - 如何在 Java 客户端库中设置读取超时

标签 java apache-spark google-bigquery google-http-client

我正在使用 Spark 将一些数据加载到 BigQuery 中。这个想法是从 S3 读取数据并使用 Spark 和 BigQuery 客户端 API 加载数据。以下是插入 BigQuery 的代码。

val bq = createAuthorizedClientWithDefaultCredentialsFromStream(appName, credentialStream)
val bqjob = bq.jobs().insert(pid, job, data).execute() // data is a InputStream content

通过这种方法,我看到很多 SocketTimeoutException。

Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:911)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:703)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1534)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1439)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)

看起来从 S3 读取的延迟导致 Google http 客户端超时。我想增加超时并尝试了以下选项。

val req = bq.jobs().insert(pid, job, data).buildHttpRequest()
req.setReadTimeout(3 * 60 * 1000)
val res = req.execute()

但这会导致 BigQuery 中的前提条件失败。它期望 mediaUploader 为 null,但不知道为什么。

Exception in thread "main" java.lang.IllegalArgumentException
    at com.google.api.client.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:76)
    at com.google.api.client.util.Preconditions.checkArgument(Preconditions.java:37)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.buildHttpRequest(AbstractGoogleClientRequest.java:297)

这导致我在 BigQuery 上尝试第二个插入 API

val req = bq.jobs().insert(pid, job).buildHttpRequest().setReadTimeout(3 * 60 * 1000).setContent(data)
val res = req.execute()

这一次它因不同的错误而失败。

Exception in thread "main" com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
{
  "code" : 400,
  "errors" : [ {
    "domain" : "global",
    "message" : "Job configuration must contain exactly one job-specific configuration object (e.g., query, load, extract, spreadsheetExtract), but there were 0: ",
    "reason" : "invalid"
  } ],
  "message" : "Job configuration must contain exactly one job-specific configuration object (e.g., query, load, extract, spreadsheetExtract), but there were 0: "
}

请建议我如何设置超时。如果我做错了什么,也请指出我。

最佳答案

我将回答标题中的主要问题:如何使用 Java 客户端库设置超时。

要设置超时,您需要在客户端中配置自定义 HttpRequestInitializer。例如:

Bigquery.Builder builder =
    new Bigquery.Builder(new UrlFetchTransport(), new JacksonFactory(), credential);
final HttpRequestInitializer existing = builder.getHttpRequestInitializer();
builder.setHttpRequestInitializer(new HttpRequestInitializer() {
    @Override
    public void initialize(HttpRequest request) throws IOException {
      existing.initialize(request);
      request
          .setReadTimeout(READ_TIMEOUT)
          .setConnectTimeout(CONNECTION_TIMEOUT);
      }
    });
Bigquery client = builder.build();

我认为这并不能解决您面临的所有问题。一些可能有用的想法,但我不完全理解这个场景,所以这些可能偏离了轨道:

  • 如果您要移动大文件:请考虑先将它们暂存在 GCS 上,然后再将其加载到 BigQuery 中。
  • 如果您使用媒体上传来发送请求中的数据:这些数据不能太大,否则您将面临超时或网络连接失败的风险。
  • 如果您正在运行极其并行的数据迁移,并且数据 block 相对较小,则 bigquery.tabledata.insertAll 可能更适合此类大型扇入场景。请参阅https://cloud.google.com/bigquery/streaming-data-into-bigquery了解更多详情。

感谢您的提问!

关于java - BigQuery - 如何在 Java 客户端库中设置读取超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32876343/

相关文章:

java - Java 应用程序中的哈希密码

java - "prototype"bean如何让Spring只在getBean()获取到时才初始化?

Java UTF-8 编码未设置为 URLConnection

scala - Scala 中的 =!= 运算符是什么?

java - 如何从 Java 在 BigQuery 中创建分区表?

google-bigquery - 计费层级超出预期范围

java - 值 org.springframework.web.bind.annotation.RequestMapping 中的 `.+` 是什么意思?

apache-spark - Spark : build recursive tree path for every node of an hierarchy DataFrame

sql - 使用 Google BigQuery 的逗号作为带有 IN 子句的 UNION ALL

apache-spark - 如何从命令行检查 Spark 应用程序的状态?