apache-kafka - kafka 主题中的唯一消息检查

标签 apache-kafka logstash

我们使用 Logstash,希望从 Oracle 数据库读取一张表并将这些消息(如下所示)发送到 Kafka:

Topic1: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"2019-07-30"}
        message4: {"name":"name-4", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}

请注意,message1message4 是具有相同 ID 号的重复项。

现在,我们希望确保所有消息都是唯一的,那么我们如何过滤 topic1 并使所有消息都唯一,然后发送到 topic2

我们想要的最终结果:

Topic2: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"

最佳答案

这称为一次性处理

您可能对 Kafka FAQ 的第一部分感兴趣描述了如何避免数据生产重复的一些方法(即从生产者端):

Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.

There are two approaches to getting exactly once semantics during data production:

  1. Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded
  2. Include a primary key (UUID or something) in the message and deduplicate on the consumer.

If you do one of these things, the log that Kafka hosts will be duplicate-free. However, reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example, if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.

I think there are two improvements that would make this a lot easier:

  1. Producer idempotence could be done automatically and much more cheaply by optionally integrating support for this on the server.
  2. The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon

另一个选项(这不完全是您正在寻找的)是日志压缩。假设您的重复消息具有相同的 key ,那么当日志压缩策略有效时,日志压缩最终将删除重复项。

关于apache-kafka - kafka 主题中的唯一消息检查,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57272421/

相关文章:

tcp - 如何查看 Apache Kafka 生成的完整 TCP 数据包?

Python SysLogHandler -> syslog :logstash. 设施未更改

elasticsearch - 弹性堆栈-无法运行logstash

apache-kafka - Kafka 基于键/值对的消息传递的目的是什么?

与卡夫卡消费者一起去 channel

python - 如何在Kafka中发布字典?

elasticsearch - Logstash Grok过滤器模式

elasticsearch - Elasticsearch和Kibana始终显示具有堆限制的面板

elasticsearch - 在带有kibi的 map 中显示位置点

Avro 与 Protobuf 的性能指标