我们有一个非常大的带有分片集合的 MongoDB 实例。它已经到了依赖 MongoDB 查询功能(包括聚合框架)来洞察数据变得过于昂贵的地步。
我四处寻找使数据可用且更易于使用的选项,并确定了两个有希望的选项:
- AWS Redshift
- Hadoop + Hive
我们希望能够使用类似 SQL 的语法来分析我们的数据,并且我们希望能够接近实时地访问数据(几分钟的延迟就可以了,我们只是不想等待整个 MongoDB夜间同步)。
据我所知,对于选项 2,可以使用 https://github.com/mongodb/mongo-hadoop将数据从 MongoDB 移动到 Hadoop 集群。
我一直在寻找高低,但我正在努力寻找一个类似的解决方案来让 MongoDB 进入 AWS Redshift。从查看亚马逊文章来看,正确的方法似乎是使用 AWS Kinesis 将数据导入 Redshift。也就是说,我找不到任何做类似事情的人的例子,也找不到任何库或连接器来将数据从 MongoDB 移动到 Kinesis 流中。至少没有什么看起来很有希望的。
有人做过这样的事吗?
最佳答案
我最终使用 NodeJS 编写了我们自己的迁移器。 我对解释什么是 redshift 和 MongoDB 的答案感到有些恼火,所以我决定花时间分享我最后必须做的事情。
时间戳数据
基本上,我们确保我们想要迁移到 redshift 中的表的所有 MongoDB 集合都带有时间戳,并根据该时间戳进行索引。
插件返回光标
然后,我们为要从 mongo 集合到 redshift 表的每次迁移编写一个插件。每个插件都返回一个游标,该游标将上次迁移的日期考虑在内(从迁移器引擎传递给它),并且仅返回自上次成功迁移该插件以来已更改的数据。
如何使用光标
然后迁移器引擎使用这个游标,并循环遍历每条记录。 它为每条记录回调插件,将文档转换为一个数组,然后迁移器使用该数组创建一个分隔行,并将其流式传输到磁盘上的文件。我们使用制表符来分隔这个文件,因为我们的数据包含很多逗号和管道。
从 S3 分隔导出到 redshift 上的表中
然后迁移器将分隔文件上传到 S3,并运行 redshift copy 命令将文件从 S3 加载到临时表中,使用插件配置获取名称和将其表示为临时表的约定。
例如,如果我有一个配置了表名employees
的插件,它将创建一个名为temp_employees
的临时表。
现在我们已经在这个临时表中获得了数据。这个临时表中的记录从原始 MongoDB 集合中获取它们的 id。这允许我们然后对目标表运行删除,在我们的示例中,员工表,其中 id 存在于临时表中。 如果任何表不存在,它会根据插件提供的架构动态创建。这样我们就可以将临时表中的所有记录插入到目标表中。这既适用于新记录,也适用于更新的记录。我们只对我们的数据进行软删除,因此它将在 redshift 中使用 is_deleted
标志进行更新。
完成整个过程后,迁移引擎会将插件的时间戳存储在 Redshift 表中,以便跟踪上次成功运行迁移的时间。然后在下次引擎决定它应该迁移数据时将此值传递给插件,从而允许插件使用它需要提供给引擎的游标中的时间戳。
总而言之,每个插件/迁移都为引擎提供以下内容:
- 一个游标,它可以选择使用传递给它的最后迁移日期 来自引擎,以确保仅移动增量 穿过。
- 一个转换函数,引擎使用该函数将光标中的每个文档转换为分隔字符串,然后附加到导出文件中
- 架构文件,这是一个 SQL 文件,其中包含 redshift 表的架构
关于MongoDB 到 AWS Redshift,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26546364/