我试图弄清楚如何最初从查询中获取所有数据,然后使用 kafka 连接器增量地仅进行更改。这样做的原因是我想将所有数据加载到 Elasticsearch 中,然后使 es 与我的 kafka 流保持同步。 目前,我首先使用模式=批量的连接器来执行此操作,然后将其更改为时间戳。这很好用。
但是,如果我们想要将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本来以某种方式清理或删除 kafka 流和 es 索引数据,修改连接 ini 将模式设置为批量,然后重新启动一切,给它时间加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动所有内容(需要这样的脚本的原因是,偶尔,批量更新会通过我们尚未执行的 etl 过程来纠正历史数据具有控制权,并且此过程不会更新时间戳)
有人在做类似的事情并找到了更优雅的解决方案吗?
最佳答案
时隔很长一段时间才回到这个话题。该方法能够解决这个问题,并且永远不必使用批量模式
- 停止连接器
- 删除每个连接器 jvm 的偏移文件
- (可选)如果您想要执行完整的删除和加载,您可能还想使用 kafka/connect utils/rest api 删除您的主题(并且不要忘记状态主题)
- 重新启动连接。
关于elasticsearch - Kafka JDBC连接器加载所有数据,然后增量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43772042/