amazon-kinesis-firehose - Amazon Kinesis Firehose - 如何暂停流?

标签 amazon-kinesis-firehose

我需要能够在 AWS Kinesis Firehose 中暂停流。

当我需要执行需要重新创建表的架构更改时(例如,更改排序键),我需要它。

这些更改通常需要创建一个新表,将行插入到新表中,删除原始表并将新表重命名为原始名称。这样做会导致在此过程中流式传输的行丢失。

我可以考虑两种解决方法:

  1. 在流程开始时重命名原始表,然后强制 firehose 失败,然后重试,直到您进行更改并将其重命名回来。如果重试机制足够可靠,我不会起诉。
  2. 在加载之间定义几个小时的时间间隔(根据需要),然后观察“COPY”查询,并在 COPY 之后执行与 #1 相同的操作。这比 #1 更安全。

根据声明,这两种解决方法都不是最佳实践。

有更好的解决方案吗? 我的解决方案有多出色?

最佳答案

我遇到了同样的问题并做了以下操作。注意:要使此方法起作用,您必须在从 Kinesis 摄取到 Redshift 的事件上有时间戳(created_at 在下面的答案中)。

  1. 假设 table1 是您已有的表,并且 Kinesis 正在将事件从 firehose1 转储到其中。
  2. 创建一个新的流水线 firehose2,它将事件转储到新表 table2,该表与 table1 具有相同的架构。
  3. 一旦您可以确认事件正在登陆 table2,并且 table1 中的 max(created_at) 小于 min( created_at)table2 中,删除 firehose1。我们现在可以确定我们不会丢失任何数据,因为 table1table2 之间已经存在重叠。
  4. 创建一个与 table1 具有相同架构的表 table3。将所有事件从 table1 复制到 table3
  5. 删除 table1 并重新创建它,这次使用排序键。
  6. 重新创建 firehose1 以继续将事件转储到 table1
  7. 一旦事件再次开始登陆 table1,确认 table1 中的 min(created_at) 小于 max(created_at) table2 中。如果为真,请删除 firehose2
  8. 复制 table2created_at 严格大于 table3 中的 max(created_at) 且严格小于 max(created_at) 的所有事件比 table1 中的 min(created_at) 变成 table1。如果您的系统允许具有相同时间戳的事件,则此步骤中可能会引入重复项。
  9. table3 中的所有事件复制回新的 table1

编辑:如果您使用 alter tabletable1 重命名为 table1_old ,则可以避免使用 table3 然后在新的 table1 之上创建 table2

关于amazon-kinesis-firehose - Amazon Kinesis Firehose - 如何暂停流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45828292/

相关文章:

php - 有没有办法在通过 AWS Kinesis Firehose 插入时手动设置 ElasticSearch 文档 ID?

amazon-s3 - 从 Oracle 表流式传输到 Redshift

amazon-web-services - 在 AWS Firehose 中添加故障处理

amazon-web-services - Kinesis Firehose 上的慢 PutRecordBatch

google-cloud-platform - 谷歌云平台 : accumulate data from Pub/Sub to files in Cloud Storage without Dataflow

python-2.7 - Kinesis Agent 报告 python 流数据错误

amazon-web-services - 从WAF/Kinesis获取时间戳到Elasticsearch

amazon-web-services - 到 Kinesis Firehose 的 AWS API Gateway 服务代理

amazon-web-services - 目标流验证失败?

python - 将数据输入Elasticsearch和RabbitMQ的S3的最佳方法是什么?