python - 流式传输前 BigQuery 表截断不起作用

标签 python google-bigquery

我们正在使用 BigQuery Python API 运行一些分析。为此,我们创建了以下适配器:

def stream_data(self, table, data, schema, how=None):
    r = self.connector.tables().list(projectId=self._project_id,
                                     datasetId='lbanor').execute()
    table_exists = [row['tableReference']['tableId'] for row in
                    r['tables'] if
                    row['tableReference']['tableId'] == table]
    if table_exists:
        if how == 'WRITE_TRUNCATE':
            self.connector.tables().delete(projectId=self._project_id,
                                           datasetId='lbanor',
                                           tableId=table).execute()
            body = {
                'tableReference': {
                    'tableId': table,
                    'projectId': self._project_id,
                    'datasetId': 'lbanor'
                },
                'schema': schema
            }
            self.connector.tables().insert(projectId=(
                                           self._project_id),
                                           datasetId='lbanor',
                                           body=body).execute()
    else:
        body = {
            'tableReference': {
                'tableId': table,
                'projectId': self._project_id,
                'datasetId': 'lbanor'
            },
            'schema': schema
        }
        self.connector.tables().insert(projectId=(
                                       self._project_id),
                                       datasetId='lbanor',
                                       body=body).execute()

    body = {
        'rows': [
            {
                'json': data,
                'insertId': str(uuid.uuid4())
            }
        ]
    }
    self.connector.tabledata().insertAll(projectId=(
                                         self._project_id),
                                         datasetId='lbanor',
                                         tableId=table,
                                               body=body).execute(num_retries=5)

connector 只是构建对象。

它的主要目的是将数据流式传输到给定的表。如果该表已经存在并且“如何”输入作为“WRITE_TRUNCATE”传递,则首先删除并重新创建该表。 之后,继续做数据流。

如果不一遍又一遍地删除表,一切正常。

例如,这是我们在不模拟写截断选项的情况下运行脚本时的结果(for 循环不断调用 stream_datahow=None):

[
  {
    "date": "2016-04-25",
    "unix_date": "1461606664981207",
    "init_cv_date": "2016-03-12",
    "end_cv_date": "2016-03-25",
    "days_trained": "56",
    "days_validated": "14",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.31729249914663893"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606599745107",
    "init_cv_date": "2016-03-06",
    "end_cv_date": "2016-03-25",
    "days_trained": "80",
    "days_validated": "20",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32677143128667446"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606688950415",
    "init_cv_date": "2016-03-14",
    "end_cv_date": "2016-03-25",
    "days_trained": "48",
    "days_validated": "12",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.3129267723358932"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606707195122",
    "init_cv_date": "2016-03-16",
    "end_cv_date": "2016-03-25",
    "days_trained": "40",
    "days_validated": "10",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.310620987663015"
  },
  {
    "date": "2016-04-25",
    "unix_date": "1461606622432947",
    "init_cv_date": "2016-03-08",
    "end_cv_date": "2016-03-25",
    "days_trained": "72",
    "days_validated": "18",
    "navigated_score": "1",
    "carted_score": "3",
    "purchased_score": "10",
    "description": "First trial of top seller alg. No filter nor any condition is applied. Skus not present in train count as rank=0.5",
    "metric": "rank",
    "result": "0.32395802949369296"
  }
]

但是当我们使用带有输入 how="WRITE_TRUNCATE"的同一个适配器时,它的行为发生了变化并且变得不可预测。

有时它可以工作并且数据被保存到表中。但有时,即使没有出现错误,也没有数据保存到表中。

尝试查询表时,没有返回任何数据。它只返回“查询返回零结果”。

删除表、重新创建表和流式传输数据时出错。我们是不是犯了什么错误?

如果您需要更多信息,请告诉我。提前致谢!

最佳答案

查看 Jordan Tigani 的回答和 Sean Chen 对 https://stackoverflow.com/a/36417177/132438 的评论(均为 BigQuery 工程师)。

总结是:

  • 当重新创建或截断表时“您需要等待超过 2 分钟才能进行流式传输,以避免数据被丢弃。

这样就可以解释为什么会出现这种不确定的行为。

关于python - 流式传输前 BigQuery 表截断不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36846571/

相关文章:

google-bigquery - 将数据流式传输到 Bigquery 与将数据上传到 PubSub 然后使用数据流将数据插入到 Bigquery 之间的优缺点是什么

sql - BigQuery - 查找最近的区域

python - 反转链表并像原来一样显示它

python - 在 groupby 函数之后合并 2 个不同大小的数据帧

python - 如何在Qt中添加组合框大小的间距

javascript - 如何转义 Google BigQuery 的 JavaScript UDF 中的字符?

authentication - BigQuery - GSheet 表上的脚本运行查询不起作用

Python 名称可见性

python - 正在从视频.avi文件中读取文件

python - BigQuery - 更新架构,从 Python 客户端在任意位置添加 NULL 列