elasticsearch - Kafka JDBC连接器加载所有数据,然后增量

标签 elasticsearch apache-kafka apache-kafka-connect confluent-platform

我试图弄清楚如何最初从查询中获取所有数据,然后使用 kafka 连接器增量地仅进行更改。这样做的原因是我想将所有数据加载到 Elasticsearch 中,然后使 es 与我的 kafka 流保持同步。 目前,我首先使用模式=批量的连接器来执行此操作,然后将其更改为时间戳。这很好用。

但是,如果我们想要将所有数据重新加载到 Streams 和 ES,这意味着我们必须编写一些脚本来以某种方式清理或删除 kafka 流和 es 索引数据,修改连接 ini 将模式设置为批量,然后重新启动一切,给它时间加载所有数据,然后再次将脚本修改为时间戳模式,然后再次重新启动所有内容(需要这样的脚本的原因是,偶尔,批量更新会通过我们尚未执行的 etl 过程来纠正历史数据具有控制权,并且此过程不会更新时间戳)

有人在做类似的事情并找到了更优雅的解决方案吗?

最佳答案

时隔很长一段时间才回到这个话题。该方法能够解决这个问题,并且永远不必使用批量模式

  1. 停止连接器
  2. 删除每个连接器 jvm 的偏移文件
  3. (可选)如果您想要执行完整的删除和加载,您可能还想使用 kafka/connect utils/rest api 删除您的主题(并且不要忘记状态主题)
  4. 重新启动连接。

关于elasticsearch - Kafka JDBC连接器加载所有数据,然后增量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43772042/

相关文章:

elasticsearch - Elastic Topbeats是否显示流程参数

apache-kafka - 我可以在 KSQL 中编写子查询吗?

java - Kafka,Java - 这个 "_"在整数文字中代表什么

elasticsearch - 寻找一个kafka连接器以将数据上传到elasticsearch

apache-kafka - Confluence Kafka Connect HDFS Sink 连接器延迟

java - 找不到 ElasticSearch Lucene UnicodeUtil

elasticsearch - Elasticsearch -QueryBuilder.rangeQuery大于它在查询中返回 “From”

elasticsearch - elasticsearch短语_前缀预期结果

java - ReplicaFetcher 崩溃 - 第一个偏移量 XXXXXXX 小于下一个偏移量 YYYYYYYY

docker - cp-kafka-connect 是开源组件还是专有组件?