我创建了一个 Google Cloud Platform 函数,用于监听 Pub/Sub 主题并将数据插入 BigQuery。 我设法让一些代码几乎可以工作。 几乎:插入指令没有报告错误,但 BigQuery 中插入的行的所有列均为空。
这是云函数的代码,运行在 NodeJs 6 上,内存 128Mb,由 Pub/Sub 触发
我已经尝试了以下两个变量的所有组合,有 2 条不同的错误消息,且忽略设置设置为 false(请参阅帖子底部)
'ignoreUnknownValues':true, 'raw':false
package.json
{
"name": "sample-pubsub",
"version": "0.0.1",
"dependencies": {
"@google-cloud/bigquery": "^1.3.0"
}
}
函数体
/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload and metadata.
* @param {!Function} callback Callback function to signal completion.
*/
exports.helloPubSub = (event, callback) => {
const pubsubMessage = event.data;
console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
const BigQuery = require('@google-cloud/bigquery');
const bigquery = new BigQuery();
bigquery
.dataset("init_data")
.table ("tronc_queteur")
.insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log(`Inserted 1 rows`);
console.log(data);
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:');
err.errors.forEach(err => console.error(err));
}
} else {
console.error('ERROR:', err);
}
});
callback();
};
传递给函数的数据如下(从函数的第一个console.log()可以看出)
** 数据 **
{
"id":9999,
"queteur_id":552,
"point_quete_id":49,
"tronc_id":281,
"depart_theorique":"2018-06-17 08:09:33",
"depart":"2018-06-17 08:09:33",
"retour":"2018-06-17 10:26:20",
"comptage":"2018-11-08 21:23:02",
"last_update":"2018-11-08 21:23:02",
"last_update_user_id":1,
"euro500":0,
"euro200":0,
"euro100":0,
"euro50":0,
"euro20":1,
"euro10":3,
"euro5":1,
"euro2":0,
"euro1":37,
"cents50":12,
"cents20":0,
"cents10":0,
"cents5":0,
"cents2":0,
"cent1":93,
"don_cheque":0,
"don_creditcard":0,
"foreign_coins":null,
"foreign_banknote":null,
"notes_depart_theorique":null,
"notes_retour":null,
"notes_retour_comptage_pieces":null,
"notes_update":null,
"deleted":false,
"coins_money_bag_id":"2018-PIECE-059",
"bills_money_bag_id":"2018-BILLET-013",
"don_cb_sans_contact_amount":0,
"don_cb_sans_contact_number":0,
"don_cb_total_number":0,
"don_cheque_number":0
}
这是表架构,我用于在 BigQuery 中创建表并加载数据:
** BigQuery 表定义 **
[
{"name": "id","type":"INTEGER"},
{"name": "queteur_id","type":"INTEGER"},
{"name": "point_quete_id","type":"INTEGER"},
{"name": "tronc_id","type":"INTEGER"},
{"name": "depart_theorique","type":"STRING"},
{"name": "depart","type":"STRING"},
{"name": "retour","type":"STRING"},
{"name": "comptage","type":"STRING"},
{"name": "last_update","type":"STRING"},
{"name": "last_update_user_id","type":"INTEGER"},
{"name": "euro500","type":"INTEGER"},
{"name": "euro200","type":"INTEGER"},
{"name": "euro100","type":"INTEGER"},
{"name": "euro50","type":"INTEGER"},
{"name": "euro20","type":"INTEGER"},
{"name": "euro10","type":"INTEGER"},
{"name": "euro5","type":"INTEGER"},
{"name": "euro2","type":"INTEGER"},
{"name": "euro1","type":"INTEGER"},
{"name": "cents50","type":"INTEGER"},
{"name": "cents20","type":"INTEGER"},
{"name": "cents10","type":"INTEGER"},
{"name": "cents5","type":"INTEGER"},
{"name": "cents2","type":"INTEGER"},
{"name": "cent1","type":"INTEGER"},
{"name": "foreign_coins","type":"INTEGER"},
{"name": "foreign_banknote","type":"INTEGER"},
{"name": "notes_depart_theorique","type":"STRING"},
{"name": "notes_retour","type":"STRING"},
{"name": "notes_retour_comptage_pieces","type":"STRING"},
{"name": "notes_update","type":"STRING"},
{"name": "deleted","type":"INTEGER"},
{"name": "don_creditcard","type":"FLOAT"},
{"name": "don_cheque","type":"FLOAT"},
{"name": "coins_money_bag_id","type":"STRING"},
{"name": "bills_money_bag_id","type":"STRING"},
{"name": "don_cb_sans_contact_amount","type":"FLOAT"},
{"name": "don_cb_sans_contact_number","type":"INTEGER"},
{"name": "don_cb_total_number","type":"INTEGER"},
{"name": "don_cheque_number","type":"INTEGER"}
]
with 'ignoreUnknownValues':false, 'raw':false
severity: "ERROR"
textPayload: "{ errors: [ { message: 'no such field.', reason: 'invalid' } ],
row:
{ '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
attributes: { location: 'Detroit' },
data: 'eyJpZCI6OT...'
数据是以下内容的base64编码:
{"id":9999,"queteur_id":552,"point_quete_id":49,"tronc_id":281,"depart_theorique":"2018-06-17 08:09:33","depart":"2018-06-17 08:09:33","retour":"2018-06-17 10:26:20","comptage":"2018-11-08 22:18:59","last_update":"2018-11-08 22:18:59","last_update_user_id":1,"euro500":0,"euro200":0,"euro100":0,"euro50":0,"euro20":1,"euro10":3,"euro5":1,"euro2":0,"euro1":37,"cents50":12,"cents20":0,"cents10":0,"cents5":0,"cents2":0,"cent1":93,"don_cheque":0,"don_creditcard":0,"foreign_coins":null,"foreign_banknote":null,"notes_depart_theorique":null,"notes_retour":null,"notes_retour_comptage_pieces":null,"notes_update":null,"deleted":false,"coins_money_bag_id":"2018-PIECE-059","bills_money_bag_id":"2018-BILLET-013","don_cb_sans_contact_amount":0,"don_cb_sans_contact_number":0,"don_cb_total_number":0,"don_cheque_number":0}
with 'ignoreUnknownValues':false, 'raw':true
消息:''
textPayload: "{ errors: [ { message: '', reason: 'invalid' } ],
row:
{ '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
attributes: { location: 'Detroit' },
data: 'eyJpZCI6O...'
数据是完全相同的有效负载(与上面相同的base64屏幕)
** 在 bigquery 端 **
以下查询返回的行数不断增加,因为我在每列中仅使用空值进行测试
选择的是:
select *
from `init_data.tronc_queteur` as tq
where tq.id is null
结果如下:
Row id queteur_id point_quete_id tronc_id depart_theorique depart retour comptage last_update last_update_user_id euro500 euro200 euro100 euro50 euro20 euro10 euro5 euro2 euro1 cents50 cents20 cents10 cents5 cents2 cent1 foreign_coins foreign_banknote notes_depart_theorique notes_retour notes_retour_comptage_pieces notes_update deleted don_creditcard don_cheque coins_money_bag_id bills_money_bag_id don_cb_sans_contact_amount don_cb_sans_contact_number don_cb_total_number don_cheque_number
1 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
2 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
3 null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null null
最佳答案
问题出在您如何向 Cloud Big Query 插入功能提供数据。
在您发送编码消息的 .insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false})
行中,Big Query 库找不到它需要列值,因为 it's expecting a JSON object (对于您的情况)因此它会插入所有 null
值。您必须将消息解码为 String
并将其解析为 JSON。
我得到的工作插入看起来像这样:
bigquery
.dataset("init_data")
.table ("tronc_queteur")
.insert (JSON.parse(Buffer.from(pubsubMessage.data, 'base64').toString()),
{'ignoreUnknownValues':true, 'raw':false})
.then ((data) => {
console.log(`Inserted 1 rows`);
console.log(data);
})
.catch(err => {
if (err && err.name === 'PartialFailureError') {
if (err.errors && err.errors.length > 0) {
console.log('Insert errors:');
err.errors.forEach(err => console.error(err));
}
} else {
console.error('ERROR:', err);
}
});
这可行,但我对代码并不完全满意。我将更多地了解 Cloud Functions 如何接收 Pub/Sub 消息。如果我发现任何相关内容,我会编辑此答案。
关于node.js - GCP 云函数和 BigQuery : Row is inserted but all columns are null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53217505/