c# - 使用数据库作为从kafka消费的消息状态的存储是否正确?

标签 c# .net-core apache-kafka confluent-platform

目前我已经实现了一个 kafka 消费者,其工作原理如下:

在 while 循环内:

  1. 消费来自kafka的消息
  2. 将消费的消息放到单独的任务中处理,这样主线程和消费者循环就不会被阻塞 2.1 仅当处理成功或尝试处理次数超出时才提交消息。

步骤 #2.1 可能需要1 秒到 6 小时才能完成

问题是,如果应用程序崩溃并且有任务尚未完成,则在应用程序重新启动(甚至重新平衡)时,这些消息将被再次使用和处理。

我不想自动提交偏移量,因为它只能保证最多一次交付。我正在考虑使用数据库作为消息状态的存储并实现消费者,如下所示:

在 while 循环内:

  1. 消费来自kafka的消息
  2. 检查数据库是否存在此类消息
  • 如果消息存在于数据库中并且状态为“已完成”,则提交消息

  • 如果数据库中存在消息,但状态为“进行中”,则直接转到步骤#4

  • 如果消息不存在,则转到步骤 #3

  • 将消息保存到数据库中,状态为“正在进行”
  • 将消费的消息放到单独的任务中处理,这样主线程和消费者循环就不会被阻塞 4.1 仅当处理成功或尝试处理次数超出时,才提交消息并将数据库中的状态更改为“已完成”。
  • 我不确定使用数据库是否是正确的方法,因为如果我有很多消息,它会减慢消费者的速度。您能给我一些关于如何正确实现消费者以便每条消息仅处理一次的建议吗?

    最佳答案

    您的消费者应该从流(Kafka)中获取任务,以便流不再包含该任务。如果您的工作节点在运行任务时崩溃,您需要实现冗余/错误处理,即全局异常处理和持久临时存储。因此,我不建议将任务存储在流旁边的数据库中,但是如果您打算这样做,那么您不妨在 Kafka 中创建一个表,因为它们是持久的。

    当错误处理时,实现策略取决于你,因为有几种方法可以解决它,即如果节点崩溃,则将任务准备回流中,准备被另一个节点捕获,或者你可以只是记录任务并通知用户任务失败。

    关于c# - 使用数据库作为从kafka消费的消息状态的存储是否正确?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56784108/

    相关文章:

    c# - 使用 Kinect 为简单的静态手势创建手势定义

    docker - 为什么我可以绑定(bind)到80端口

    apache-kafka - 如何使用 spring-kafka 为监听器传递多个引导服务器

    scala - 我可以从外部方法遍历 KTable 中的项目吗

    c# - 组合空/空数组

    C# 在声明后立即使用 Lazy.Value

    c# - 使 Firebird 与 Entity Framework Code First 和迁移一起工作

    c# - Selenium 与 .net core : performance impact, IWebElement 中的多个线程?

    .net - 如何使用 IHostedService 访问控制台应用程序中的命令行参数?

    elasticsearch - 寻找一个kafka连接器以将数据上传到elasticsearch