elasticsearch - 卡夫卡连接 Elasticsearch : how to send deletes of documents?

标签 elasticsearch apache-kafka apache-kafka-connect confluent-platform

我有一个如下所示的处理流:

mysql.database -> debezium-connector -> database topic -> faust.agent(stream processing to add a field) -> sink topic -> elasticsearch-sink-connector -> elasticsearch cluster

此处理流在大部分情况下都可以正常工作,但我无法弄清楚如何处理来自数据库主题的已删除行事件。就像一行被删除一样,我希望它也从elasticsearch 中删除。我可以在 faust 部分使用可以操纵事件的条件。有没有办法标记一个事件,以便当它被 elasticsearch-sink-connector 拾取时,它会删除给定的文档而不是添加它?我查看了文档,但没有看到这方面的具体细节。接收器连接器是否仅用于将文档添加到索引?

最佳答案

查看config for the connector看起来您可以将 behavior.on.null.values 设置为 delete。然后,您只需要确保针对应删除文档的键设置逻辑删除(空)。

Debezium 将 by default生成用于删除的逻辑删除消息。

关于elasticsearch - 卡夫卡连接 Elasticsearch : how to send deletes of documents?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59295718/

相关文章:

java - Kafka Streams 分组和串联

elasticsearch - ElasticSearch到Kafka事件-使用Logstash进行的每次更改

java - 卡夫卡 : ERROR Plugin class loader for connector: 'org.apache.kafka.connect.file.FileStreamSourceConnector' was not found

java - 获取NoSuchFileException : while starting Kafka instance

amazon-s3 - 强制 Confluence s3 水槽冲洗

java - elasticsearch 在同时进行更新和搜索时需要时间

ruby-on-rails - 使用ElasticSearch自动生成标签(或Thinking Sphinx/pg-search)

rest - 使用现有 "id"字段索引 Elasticsearch 文档

Elasticsearch 对相同的文档给出不同的分数

elasticsearch - 如何使用 Docker Swarm 或 Kubernetes 容器在两个节点上使用 DC/OS、Kafka 和 ElasticSearch 设置 POC 环境?