javascript - kafka-node 有没有办法在崩溃后请求收到的消息?

标签 javascript node.js apache-kafka kafka-consumer-api

我一直在查看码头,现在想知道这是否是 kafka 或 node-kafka 的一个功能。

因此,我的消费者收到来自 kafka 的消息,当我处理该消息时,我喜欢将其标记为在处理完成之前我没有收到它。这样,如果服务器在处理时崩溃,我可以重新连接并接收我的应用程序在崩溃之前收到的消息。

高级示例。

  1. Kafka消息是一个用户id;
  2. 收到后,我会从数据库中获取用户信息。
  3. 我向用户发送电子邮件。
  4. 发送电子邮件时。我将其记录到数据库中。
  5. 我将消息标记为完成。 (我不确定这一步是否可行)

这样,如果任何步骤在第五步之前崩溃,当服务器恢复时,它会再次收到失败的消息。

我正在使用 node.js LTS 8.10.0 和 npm 包 kafka-node@2.4.1

最佳答案

我感谢 void,因为他的回答帮助我在 kafka-node 中找到了我需要的东西。这只是 node.js 答案/示例。

const kafkaConfig = config.get('kafka');
const kafkaClient = new kafka.KafkaClient({kafkaHost: kafkaConfig.host});
const consumer = new kafka.Consumer(kafkaClient, [{topic: kafkaConfig.topic}], {autoCommit: false});
const offset = new kafka.Offset(kafkaClient);

// very import to retrieve the offset
offset.fetchLatestOffsets([kafkaConfig.topic], (error, offsets) => {
    const latestOffset = offsets[kafkaConfig.topic][0];
    consumer.setOffset(kafkaConfig.topic, 0, latestOffset);
});

consumer.on('message', message => {
    // do stuff
    Log.insert({message})
        .then(() => {
            // commit the success so that kafka will update the offset
            consumer.commit((err, data) => console.log(err, data));
        })
});

关于javascript - kafka-node 有没有办法在崩溃后请求收到的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49820799/

相关文章:

javascript - 如何使用jquery ajax和node js在express框架中将数据从客户端传递到服务器?

ios - Apache Kafka - iOS 消费者

apache-kafka - 垂直或水平扩展 Kafka 集群的注意事项?

javascript - Sailsjs CORS 'Access-Control-Allow-Origin' 问题

javascript - jQuery 触发器不执行自定义事件

javascript - 基于javascript中的另一个数组过滤对象数组

node.js - 在 Mongoose 中查找期间添加自定义字段

javascript - 如何在 Electron 中使用 <webview> 方法

具有反应流的 Spring 云流

javascript - 如何使用 Handlebars 排除数组/对象元素?