node.js - GCP 云函数和 BigQuery : Row is inserted but all columns are null

标签 node.js google-bigquery google-cloud-functions

我创建了一个 Google Cloud Platform 函数,用于监听 Pub/Sub 主题并将数据插入 BigQuery。 我设法让一些代码几乎可以工作。 几乎:插入指令没有报告错误,但 BigQuery 中插入的行的所有列均为空。

这是云函数的代码,运行在 NodeJs 6 上,内存 128Mb,由 Pub/Sub 触发

我已经尝试了以下两个变量的所有组合,有 2 条不同的错误消息,且忽略设置设置为 false(请参阅帖子底部)

'ignoreUnknownValues':true, 'raw':false

package.json

{
  "name": "sample-pubsub",
  "version": "0.0.1",
  "dependencies": {
    "@google-cloud/bigquery": "^1.3.0"
  }
}

函数体

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {!Object} event Event payload and metadata.
 * @param {!Function} callback Callback function to signal completion.
 */
exports.helloPubSub = (event, callback) => {
  const pubsubMessage = event.data;
  console.log(Buffer.from(pubsubMessage.data, 'base64').toString());

  const BigQuery = require('@google-cloud/bigquery');
  const bigquery = new BigQuery();


  bigquery
    .dataset("init_data")
    .table  ("tronc_queteur")
    .insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false})
    .then   ((data) => {
      console.log(`Inserted 1 rows`);
      console.log(data);
    })
    .catch(err => {
      if (err && err.name === 'PartialFailureError') {
        if (err.errors && err.errors.length > 0) {
          console.log('Insert errors:');
          err.errors.forEach(err => console.error(err));
        }
      } else {
        console.error('ERROR:', err);
      }
    });



  callback();
};

传递给函数的数据如下(从函数的第一个console.log()可以看出)

** 数据 **

{  
   "id":9999,
   "queteur_id":552,
   "point_quete_id":49,
   "tronc_id":281,
   "depart_theorique":"2018-06-17 08:09:33",
   "depart":"2018-06-17 08:09:33",
   "retour":"2018-06-17 10:26:20",
   "comptage":"2018-11-08 21:23:02",
   "last_update":"2018-11-08 21:23:02",
   "last_update_user_id":1,
   "euro500":0,
   "euro200":0,
   "euro100":0,
   "euro50":0,
   "euro20":1,
   "euro10":3,
   "euro5":1,
   "euro2":0,
   "euro1":37,
   "cents50":12,
   "cents20":0,
   "cents10":0,
   "cents5":0,
   "cents2":0,
   "cent1":93,
   "don_cheque":0,
   "don_creditcard":0,
   "foreign_coins":null,
   "foreign_banknote":null,
   "notes_depart_theorique":null,
   "notes_retour":null,
   "notes_retour_comptage_pieces":null,
   "notes_update":null,
   "deleted":false,
   "coins_money_bag_id":"2018-PIECE-059",
   "bills_money_bag_id":"2018-BILLET-013",
   "don_cb_sans_contact_amount":0,
   "don_cb_sans_contact_number":0,
   "don_cb_total_number":0,
   "don_cheque_number":0
}

这是表架构,我用于在 BigQuery 中创建表并加载数据:

** BigQuery 表定义 **

[
{"name": "id","type":"INTEGER"},
{"name": "queteur_id","type":"INTEGER"},
{"name": "point_quete_id","type":"INTEGER"},
{"name": "tronc_id","type":"INTEGER"},
{"name": "depart_theorique","type":"STRING"},
{"name": "depart","type":"STRING"},
{"name": "retour","type":"STRING"},
{"name": "comptage","type":"STRING"},
{"name": "last_update","type":"STRING"},
{"name": "last_update_user_id","type":"INTEGER"},
{"name": "euro500","type":"INTEGER"},
{"name": "euro200","type":"INTEGER"},
{"name": "euro100","type":"INTEGER"},
{"name": "euro50","type":"INTEGER"},
{"name": "euro20","type":"INTEGER"},
{"name": "euro10","type":"INTEGER"},
{"name": "euro5","type":"INTEGER"},
{"name": "euro2","type":"INTEGER"},
{"name": "euro1","type":"INTEGER"},
{"name": "cents50","type":"INTEGER"},
{"name": "cents20","type":"INTEGER"},
{"name": "cents10","type":"INTEGER"},
{"name": "cents5","type":"INTEGER"},
{"name": "cents2","type":"INTEGER"},
{"name": "cent1","type":"INTEGER"},
{"name": "foreign_coins","type":"INTEGER"},
{"name": "foreign_banknote","type":"INTEGER"},
{"name": "notes_depart_theorique","type":"STRING"},
{"name": "notes_retour","type":"STRING"},
{"name": "notes_retour_comptage_pieces","type":"STRING"},
{"name": "notes_update","type":"STRING"},
{"name": "deleted","type":"INTEGER"},
{"name": "don_creditcard","type":"FLOAT"},
{"name": "don_cheque","type":"FLOAT"},
{"name": "coins_money_bag_id","type":"STRING"},
{"name": "bills_money_bag_id","type":"STRING"},
{"name": "don_cb_sans_contact_amount","type":"FLOAT"},
{"name": "don_cb_sans_contact_number","type":"INTEGER"},
{"name": "don_cb_total_number","type":"INTEGER"},
{"name": "don_cheque_number","type":"INTEGER"}
]

with 'ignoreUnknownValues':false, 'raw':false

severity:  "ERROR"  
 textPayload:  "{ errors: [ { message: 'no such field.', reason: 'invalid' } ],
  row: 
   { '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
     attributes: { location: 'Detroit' },
     data: 'eyJpZCI6OT...'

数据是以下内容的base64编码:

{"id":9999,"queteur_id":552,"point_quete_id":49,"tronc_id":281,"depart_theorique":"2018-06-17 08:09:33","depart":"2018-06-17 08:09:33","retour":"2018-06-17 10:26:20","comptage":"2018-11-08 22:18:59","last_update":"2018-11-08 22:18:59","last_update_user_id":1,"euro500":0,"euro200":0,"euro100":0,"euro50":0,"euro20":1,"euro10":3,"euro5":1,"euro2":0,"euro1":37,"cents50":12,"cents20":0,"cents10":0,"cents5":0,"cents2":0,"cent1":93,"don_cheque":0,"don_creditcard":0,"foreign_coins":null,"foreign_banknote":null,"notes_depart_theorique":null,"notes_retour":null,"notes_retour_comptage_pieces":null,"notes_update":null,"deleted":false,"coins_money_bag_id":"2018-PIECE-059","bills_money_bag_id":"2018-BILLET-013","don_cb_sans_contact_amount":0,"don_cb_sans_contact_number":0,"don_cb_total_number":0,"don_cheque_number":0}

with 'ignoreUnknownValues':false, 'raw':true

消息:''

textPayload:  "{ errors: [ { message: '', reason: 'invalid' } ],
  row: 
   { '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
     attributes: { location: 'Detroit' },
     data: 'eyJpZCI6O...'

数据是完全相同的有效负载(与上面相同的base64屏幕)

** 在 bigquery 端 **

以下查询返回的行数不断增加,因为我在每列中仅使用空值进行测试

选择的是:

select *
from `init_data.tronc_queteur` as tq
where tq.id is null

结果如下:

Row id  queteur_id  point_quete_id  tronc_id    depart_theorique    depart  retour  comptage    last_update last_update_user_id euro500 euro200 euro100 euro50  euro20  euro10  euro5   euro2   euro1   cents50 cents20 cents10 cents5  cents2  cent1   foreign_coins   foreign_banknote    notes_depart_theorique  notes_retour    notes_retour_comptage_pieces    notes_update    deleted don_creditcard  don_cheque  coins_money_bag_id  bills_money_bag_id  don_cb_sans_contact_amount  don_cb_sans_contact_number  don_cb_total_number don_cheque_number   
1   null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    
2   null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    
3   null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    

最佳答案

问题出在您如何向 Cloud Big Query 插入功能提供数据。

在您发送编码消息的 .insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false}) 行中,Big Query 库找不到它需要列值,因为 it's expecting a JSON object (对于您的情况)因此它会插入所有 null 值。您必须将消息解码为 String 并将其解析为 JSON。

我得到的工作插入看起来像这样:

  bigquery
    .dataset("init_data")
    .table  ("tronc_queteur")
    .insert (JSON.parse(Buffer.from(pubsubMessage.data, 'base64').toString()), 
       {'ignoreUnknownValues':true, 'raw':false})
    .then   ((data) => {
      console.log(`Inserted 1 rows`);
      console.log(data);
    })
    .catch(err => {
      if (err && err.name === 'PartialFailureError') {
        if (err.errors && err.errors.length > 0) {
          console.log('Insert errors:');
          err.errors.forEach(err => console.error(err));
        }
      } else {
        console.error('ERROR:', err);
      }
    });

这可行,但我对代码并不完全满意。我将更多地了解 Cloud Functions 如何接收 Pub/Sub 消息。如果我发现任何相关内容,我会编辑此答案。

关于node.js - GCP 云函数和 BigQuery : Row is inserted but all columns are null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53217505/

相关文章:

python-3.x - 云函数属性错误: 'bytes' object has no attribute 'get' when reading json file from cloud storage

node.js - 为什么 gridfs get 不能仅按文件名处理文件 ID (ObjectId)

javascript - Express.js - 监听关机

javascript - 如何检测脚本是在浏览器中还是在 Node.js 中运行?

sql - 更新SQL-还购买了产品

sql - 查找组中第一行和剩余行之间的距离

python-3.x - 如何获取 HTTP 触发的 Google Cloud Functions 的执行 ID?

php - NodeJS 中的正则表达式模式问题

google-bigquery - Bigquery同时并行读写

javascript - 使用命名存储桶创建存储触发器时权限被拒绝