python - 如何在Kafka中处理一次消息,以便服务重新启动时不会处理所有消息

标签 python events apache-kafka event-driven

第一次使用 Kafka,我使用微服务架构学习 Kafka,并且正在寻找下一个问题。

每次我重新启动服务时都会处理主题中的所有消息。有没有办法我只能处理这些消息一次,将它们标记为已读或其他什么?

这是我在 Pytho 3 中的代码片段:

class EmailStreamConsumer:
def __init__(self, bootstrap_servers='localhost:9092'):
    self.__bootstrap_servers = bootstrap_servers
    self.__new_emails_consumer = KafkaConsumer('NewEmails', bootstrap_servers=bootstrap_servers,
                                               auto_offset_reset='earliest')
    self.__sent_emails_consumer = KafkaConsumer('SentEmails', bootstrap_servers=bootstrap_servers,
                                                auto_offset_reset='earliest')

def start(self):
    for message in self.__new_emails_consumer:
        value = message.value.decode('utf-8')
        email = json.loads(value)
        self.send_email(email['content'], email['to_email'], email['title'], email['from_email'])
        print("%s:%d:%d: key=%s value=%s" % (
            message.topic, message.partition, message.offset, message.key, message.value))

我希望该服务只发送一次电子邮件。即使服务重新启动也是如此。

最佳答案

我认为您的问题是您的Kafka-Consumer没有GROUP ID

只需添加:

String groupId = "kafka-new-emails";
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

Your application will start read from the latest email as your consumer group labeled where the last commit you read was. Also, if you have more than one consumer and one of them gets down, consumer group will help you in making a rebalance as to make the consumer that is online to read from the partition that was assigned to the consumer that is down.

关于python - 如何在Kafka中处理一次消息,以便服务重新启动时不会处理所有消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59524775/

相关文章:

ssl - kafka 的密码套件支持

具有常量值类型的python字典

python - CSPRNG Python 2.7 实现

python - 计算网格 netCDF 文件中选定区域中的变量平均值

c# - 用户帮助的所有控件的全局事件处理程序

Python Kafka消费者读取已读消息

Python 和 sharepoint 集成

javascript - 使用 jquery 在顶部元素之前添加一个新元素

c# - 在类中打包事件参数,为什么?

Docker Compose Mac 错误 : Cannot start service zoo1: Mounts denied: