node.js - 我可以限制kafka Node 消费者的消费吗?

标签 node.js apache-kafka

好像是我的kafka Node 消费者:

var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
     ...
    });

获取的消息太多,在某些情况下我无法处理。 有没有办法限制它(例如每秒接受不超过 1000 条消息,可能使用暂停 api?)

  • 我正在使用 kafka-node,与 Java 版本相比,它似乎有一个有限的 api

最佳答案

在 Kafka 中,轮询和处理应该以协调/同步的方式进行。也就是说,在每次轮询之后,您应该先处理所有接收到的数据,然后再进行下一次轮询。此模式会自动将消息数量限制为您的客户端可以处理的最大吞吐量。

像这样的东西(伪代码):

while(isRunning) {
  messages = poll(...)
  for(m : messages) {
    process(m);
  }
}

(这就是为什么没有参数“fetch.max.messages”的原因——您只是不需要它。)

关于node.js - 我可以限制kafka Node 消费者的消费吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38546366/

相关文章:

apache-kafka - 无论如何,对于kafka Streams应用程序中的不同输入主题,是否可以使用不同的 auto.offset.reset 策略?

node.js - 在 npm 中找不到模块

node.js - MongoDB 返回 null,但查询单独工作

node.js - 为什么 Nestjs 不在我的浏览器中设置 cookie?

javascript - MongoDB 批量插入操作而不是 For 循环

java - Spring boot application.yml 中的 Spring Kafka SSL 设置

apache-kafka - Apache Flink 和 Kafka FETCH_SESSION_ID_NOT_FOUND 信息日志

javascript - 如何访问内部生成器函数内部的父类(super class)方法?

python - 如何摆脱 Spark Streaming + Kafka 中的 NoSuchMethodError : org. apache.kafka.clients.consumer.KafkaConsumer.subscribe 错误

python - 卡夫卡超时错误 ('Failed to update metadata after 60.0 secs.' )