我正在接收最终在 Kafka 中的事件。从这些事件中,我使用 Kafka Streams 应用程序获取 id,并将其作为另一个主题中的一对 (id, 1) 发回 Kafka。然后我想看看 id 是否已经存在于 ElasticSearch 中,如果存在则更新其计数器,否则在 ElasticSearch 中创建一条新记录,其中 id 来自 Kafka 并将计数器设置为 1,即记录 (id, 1) 的 upsert到 ES。
我希望为此使用 Kafka Connect to ElasticSearch,但如果可能的话,它似乎并不那么简单。我可以看到向 ES 添加记录是有效的,但与现有记录合并似乎是我尚未发现的事情。这是否已经成为可能?如果可能,如何实现?如果不可能,是否计划在附近的版本中实现?
最佳答案
我 fork 了 datamountaineer ES sink connector允许更新插入。有了它,您可以指定一个 PK 并使用 docAsUpsert 运行更新到 ES。您可以从 my github fork 获取项目并编译 Jar。 .
关于elasticsearch - Kafka 连接到 ElasticSearch 是否可以进行更新,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41158974/