mongodb - 从 MongoDB (tMongoDbInput) 加载最近尚未加载的行的数据

标签 mongodb talend talend-mdm

背景:

我创建了一个作业,从 MongoDB 读取数据并将其加载到 MS-SQL。

当前行为:

每当我运行该作业时,它都会从 MongoDB 获取所有数据。

预期行为:

当作业运行时,它应该只获取尚未加载的数据。我在 mongoDB 文档中有一个时间戳字段。

示例

时间戳:2022-07-29T08:14:14.657+00:00

解决方案 1:

我尝试将查询添加到 mongo 以仅加载最后 15 分钟。

但问题是,例如我的工作组件仍然关闭 1 小时。

当它再次出现时,在下一次作业运行时,它将仅加载最后 15 分钟的数据,而我们丢失了 45 分钟的数据。

所需解决方案:

如果作业第一次运行,那么它将提取所有时间的数据并加载到 SQL。

当作业下次运行时(假设 15 分钟后),它将自动假设这些是新创建的,并且仅加载新行。

更新

现在我已经写了一篇关于这个解决方案的完整文章。 https://medium.com/@raowaqasakram/fetch-latest-data-from-mongodb-talend-1f21ba7b98b5

最佳答案

最合理的解决方案是直接从 SQL 表中获取最后导出的时间戳(假设该字段仅增加):

// get the last timestamp from SQL
const lastTimestamp = SQLClient("select timestamp from sqltable order by timestamp desc limit 1");

const documentsToExport = db.collection.find({ timestamp: { $gt: new Date(lastTimestamp) }});

... export logic ...

这样,即使当前作业在中间失败,您也始终可以获得最后上传的文档,您还应该确保插入文档以支持这一点。


您还可以维护一些包含作业详细信息的元数据集合,例如(采用 Nodejs 语法):

// get the last job saved.
const lastJob = await db.jobCollection.findOne({}, { sort: { _id: -1 });

// if this is the first job timestamp will be 1970 otherwise use previous job timestamp.
const nextTimestamp = lastJob?.timestamp ?? new Date('1970');

const documentsToExport = await db.collection.find({ timestamp: {$gt: nextTimestamp }}).sort({timestamp: 1});

if (documentsToExport.length) {
    ... upload to sql ...

    // insert the latest timestamp from mongo, you can add additional metadata fields here.
   // like run date, time took, documents inserted etc.
    await.jobCollection.insertOne({ 
     timestamp: documentsToExport[documentsToExport.length - 1].timestamp,
     documentsInserted: documentsToExport.length,
     createdAt: new Date()
 })
}

显然这个过程的容错能力较差,这就是为什么我建议您如果可能的话使用第一个。


最后一个解决方案是在不同的答案中提供的,即在每个文档上添加一个字段,表明它是否已上传,但对于更大的集合,它将需要额外的索引和架构更改,这是我的事情如果可能的话,我宁愿避免,我觉得这里就是这种情况。

关于mongodb - 从 MongoDB (tMongoDbInput) 加载最近尚未加载的行的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73197933/

相关文章:

node.js - MongoDB 错误 "can' 无法从对象中提取地理键,几何形状错误?”

mongodb - 如何在 Rust 中实现 MongoDB 模式和业务逻辑?

node.js - Mongo DB文档过期后不会删除

java - TALEND:如何存储 tRest 组件中所需的刷新 token ?

hadoop - 连接 talend 和 Hadoop

node.js - 类型错误 : userDetails. findAll 不是函数

sql-server - 使用 Talend 在 SQL Server 和 Postgres 之间转换日期

Talend tMSSQLInput

java - 构建选项在 Talend Job 中不起作用