python - 谷歌 Bigquery 存储 : DeadlineExceeded Error when reading to_dataframe

标签 python google-cloud-platform google-bigquery google-cloud-storage grpc

尝试从 Google BigQuery 存储库运行 to_dataframe() 方法时出现以下错误。

完整代码在这里:

import google.auth
import os
import time
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1beta1
import fastavro
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'

credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

bqstorageclient = bigquery_storage_v1beta1.BigQueryStorageClient(
    credentials=credentials
)

table = bigquery_storage_v1beta1.types.TableReference()
table.project_id = "bigquery-public-data"
table.dataset_id = "new_york_taxi_trips"
table.table_id = "tlc_yellow_trips_2018"

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.row_restriction = 'pickup_location_id = "48"'
read_options.selected_fields.append("vendor_id")
read_options.selected_fields.append("passenger_count")
read_options.selected_fields.append("trip_distance")
read_options.selected_fields.append("rate_code")
read_options.selected_fields.append("store_and_fwd_flag")
read_options.selected_fields.append("payment_type")
read_options.selected_fields.append("fare_amount")
read_options.selected_fields.append("extra")
read_options.selected_fields.append("mta_tax")
read_options.selected_fields.append("tip_amount")
read_options.selected_fields.append("tolls_amount")
read_options.selected_fields.append("imp_surcharge")
read_options.selected_fields.append("total_amount")
read_options.selected_fields.append("pickup_location_id")
read_options.selected_fields.append("dropoff_location_id")

parent = "projects/{}".format(your_project_id)
session = bqstorageclient.create_read_session(
    table, parent, read_options=read_options
)

now=time.time()

stream = session.streams[0]
position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
reader = bqstorageclient.read_rows(position)
dataframe = reader.to_dataframe(session)

我在 Windows 10 上使用 Conda。Google Libs 的版本如下:

google-api-core           1.14.2           py37h21ff451_0    conda-forge
google-api-core-grpc      1.14.2               h21ff451_0    conda-forge
google-api-python-client  1.7.11                     py_0    conda-forge
google-auth               1.6.3                      py_0    conda-forge
google-auth-httplib2      0.0.3                      py_2    conda-forge
google-cloud-bigquery     1.19.0                   py37_0    conda-forge
google-cloud-bigquery-storage 0.7.0                         0    conda-forge
google-cloud-bigquery-storage-core 0.7.0            py37h21ff451_0    conda-forge
google-cloud-core         1.0.3                      py_0    conda-forge
google-resumable-media    0.3.3                      py_0    conda-forge
googleapis-common-protos  1.6.0            py37h21ff451_0
grpcio                    1.16.1           py37h351948d_1

这是我得到的错误:

---------------------------------------------------------------------------
_Rendezvous                               Traceback (most recent call last)
~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
     78         try:
---> 79             return six.next(self._wrapped)
     80         except grpc.RpcError as exc:

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\grpc\_channel.py in __next__(self)
    363     def __next__(self):
--> 364         return self._next()
    365 

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\grpc\_channel.py in _next(self)
    346             else:
--> 347                 raise self
    348             while True:

_Rendezvous: <_Rendezvous of RPC that terminated with:
    status = StatusCode.DEADLINE_EXCEEDED
    details = "Deadline Exceeded"
    debug_error_string = "{"created":"@1568284475.885000000","description":"Error received from peer","file":"src/core/lib/surface/call.cc","file_line":1017,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>

The above exception was the direct cause of the following exception:

DeadlineExceeded                          Traceback (most recent call last)
<ipython-input-9-8e16e005ecd6> in <module>
     48 position = bigquery_storage_v1beta1.types.StreamPosition(stream=stream)
     49 reader = bqstorageclient.read_rows(position)
---> 50 dataframe = reader.to_dataframe(session)

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in to_dataframe(self, read_session, dtypes)
    220             raise ImportError(_PANDAS_REQUIRED)
    221 
--> 222         return self.rows(read_session).to_dataframe(dtypes=dtypes)
    223 
    224 

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in to_dataframe(self, dtypes)
    313 
    314         frames = []
--> 315         for page in self.pages:
    316             frames.append(page.to_dataframe(dtypes=dtypes))
    317         return pandas.concat(frames)

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in pages(self)
    261         # Each page is an iterator of rows. But also has num_items, remaining,
    262         # and to_dataframe.
--> 263         for message in self._reader:
    264             self._status = message.status
    265             yield ReadRowsPage(self._stream_parser, message)

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\cloud\bigquery_storage_v1beta1\reader.py in __iter__(self)
    126         while True:
    127             try:
--> 128                 for message in self._wrapped:
    129                     rowcount = message.row_count
    130                     self._position.offset += rowcount

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\google\api_core\grpc_helpers.py in next(self)
     79             return six.next(self._wrapped)
     80         except grpc.RpcError as exc:
---> 81             six.raise_from(exceptions.from_grpc_error(exc), exc)
     82 
     83     # Alias needed for Python 2/3 support.

~\AppData\Local\Continuum\anaconda3\envs\test101\lib\site-packages\six.py in raise_from(value, from_value)

DeadlineExceeded: 504 Deadline Exceeded

在我运行此行 dataframe = reader.to_dataframe(session) 后,代码立即失败并出现 Deadline Exceeded 错误。其余代码执行无误。

我还在 GitHub 上创建了问题,但没有得到任何有用的回复:https://github.com/googleapis/google-cloud-python/issues/9135

我尝试了旧版本的 Google Libs。我在另一台 Windows PC(装有 Windows 10 的笔记本电脑)上遇到了完全相同的问题。在 Linux 机器上一切正常。

如有任何帮助,我们将不胜感激。

最佳答案

我相信(不确定)这个错误意味着请求超过了默认的超时限制。如果你看test_reader.py这些评论似乎表明,如果引发截止日期异常,则不会发生重试。

'# 不要在 DeadlineException 上重新连接。这允许用户指定的超时”

您可以通过以秒为单位向 read_rows 手动传递 timeout 参数来覆盖默认超时:

reader = bqstorageclient.read_rows(position)

所以我认为它看起来像这样,虽然我不知道超时限制有多大:

reader = bqstorageclient.read_rows(position, timeout=100)

另请参阅 client.py 的第 99 行用于讨论 read_rows 超时参数。

关于python - 谷歌 Bigquery 存储 : DeadlineExceeded Error when reading to_dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57904944/

相关文章:

python - 是否有任何允许解析 .po 文件(消息,包括上下文注释)的 Python 包?

python - 我的 python Tornado 代码不起作用......为什么

java - 使用 spring boot 从 Secret Manager GCP 获取值

google-cloud-platform - 数据流服务帐户无法访问网络默认设置

google-bigquery - 6小时后查询超时,如何优化?

python - Pygame拖动背景图片

python - Celery 的类似信号量的机制

python - 实现由 GCS 触发的云功能以发布到 pubsub

firebase - 当 Firebase Functions 发送时,Timestamp 在 BigQuery 上返回错误

google-bigquery - 删除最后 x 个字符,直到达到特定字符