背景:
我创建了一个作业,从 MongoDB 读取数据并将其加载到 MS-SQL。
当前行为:
每当我运行该作业时,它都会从 MongoDB 获取所有数据。
预期行为:
当作业运行时,它应该只获取尚未加载的数据。我在 mongoDB 文档中有一个时间戳字段。
示例
时间戳:2022-07-29T08:14:14.657+00:00
解决方案 1:
我尝试将查询添加到 mongo 以仅加载最后 15 分钟。
但问题是,例如我的工作组件仍然关闭 1 小时。
当它再次出现时,在下一次作业运行时,它将仅加载最后 15 分钟的数据,而我们丢失了 45 分钟的数据。
所需解决方案:
如果作业第一次运行,那么它将提取所有时间的数据并加载到 SQL。
当作业下次运行时(假设 15 分钟后),它将自动假设这些是新创建的,并且仅加载新行。
更新
现在我已经写了一篇关于这个解决方案的完整文章。 https://medium.com/@raowaqasakram/fetch-latest-data-from-mongodb-talend-1f21ba7b98b5
最佳答案
最合理的解决方案是直接从 SQL 表中获取最后导出的时间戳(假设该字段仅增加):
// get the last timestamp from SQL
const lastTimestamp = SQLClient("select timestamp from sqltable order by timestamp desc limit 1");
const documentsToExport = db.collection.find({ timestamp: { $gt: new Date(lastTimestamp) }});
... export logic ...
这样,即使当前作业在中间失败,您也始终可以获得最后上传的文档,您还应该确保插入文档以支持这一点。
您还可以维护一些包含作业详细信息的元数据集合,例如(采用 Nodejs 语法):
// get the last job saved.
const lastJob = await db.jobCollection.findOne({}, { sort: { _id: -1 });
// if this is the first job timestamp will be 1970 otherwise use previous job timestamp.
const nextTimestamp = lastJob?.timestamp ?? new Date('1970');
const documentsToExport = await db.collection.find({ timestamp: {$gt: nextTimestamp }}).sort({timestamp: 1});
if (documentsToExport.length) {
... upload to sql ...
// insert the latest timestamp from mongo, you can add additional metadata fields here.
// like run date, time took, documents inserted etc.
await.jobCollection.insertOne({
timestamp: documentsToExport[documentsToExport.length - 1].timestamp,
documentsInserted: documentsToExport.length,
createdAt: new Date()
})
}
显然这个过程的容错能力较差,这就是为什么我建议您如果可能的话使用第一个。
最后一个解决方案是在不同的答案中提供的,即在每个文档上添加一个字段,表明它是否已上传,但对于更大的集合,它将需要额外的索引和架构更改,这是我的事情如果可能的话,我宁愿避免,我觉得这里就是这种情况。
关于mongodb - 从 MongoDB (tMongoDbInput) 加载最近尚未加载的行的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73197933/