我有一个如下所示的处理流:
mysql.database -> debezium-connector -> database topic -> faust.agent(stream processing to add a field) -> sink topic -> elasticsearch-sink-connector -> elasticsearch cluster
此处理流在大部分情况下都可以正常工作,但我无法弄清楚如何处理来自数据库主题的已删除行事件。就像一行被删除一样,我希望它也从elasticsearch 中删除。我可以在 faust 部分使用可以操纵事件的条件。有没有办法标记一个事件,以便当它被 elasticsearch-sink-connector 拾取时,它会删除给定的文档而不是添加它?我查看了文档,但没有看到这方面的具体细节。接收器连接器是否仅用于将文档添加到索引?
最佳答案
查看config for the connector看起来您可以将 behavior.on.null.values
设置为 delete
。然后,您只需要确保针对应删除文档的键设置逻辑删除(空)。
Debezium 将 by default生成用于删除的逻辑删除消息。
关于elasticsearch - 卡夫卡连接 Elasticsearch : how to send deletes of documents?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59295718/