kafka-producer-api - xxxxx : 30030 ms has passed since batch creation plus linger time 的 1 条记录到期

标签 kafka-producer-api

我的用例:
使用 Postman,我调用了 Spring boot soap 端点。端点创建一个 KafkaProducer 并向特定主题发送消息。我还有一个 TaskScheduler 来使用这个主题。

问题:
调用soap将消息推送到主题时,出现此错误:

2017-11-14 21:29:31.463 ERROR 6389 --- [ad | producer-3] DomainEntityProducer : Expiring 1 record(s) for DomainEntityCommandStream-0: 30030 ms has passed since batch creation plus linger time 2017-11-14 21:29:31.464 ERROR 6389 --- [nio-8080-exec-6] DomainEntityProducer : org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DomainEntityCommandStream-0: 30030 ms has passed since batch creation plus linger time



这是我用来推送主题的方法:
public DomainEntity push(DomainEntity pDomainEntity) throws Exception {
    logger.log(Level.INFO, "streaming...");
    wKafkaProperties.put("bootstrap.servers", "localhost:9092");
    wKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    wKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer wKafkaProducer = new KafkaProducer(wKafkaProperties);
    ProducerRecord wProducerRecord = new ProducerRecord("DomainEntityCommandStream", getJSON(pDomainEntity));
    wKafkaProducer.send(wProducerRecord, (RecordMetadata r, Exception e) -> {
        if (e != null) {
            logger.log(Level.SEVERE, e.getMessage());
        }
    }).get();
    return pDomainEntity;
}

使用命令 shell 脚本

./kafka-console-producer.sh --broker-list 10.0.1.15:9092 --topic DomainEntityCommandStream





./kafka-console-consumer.sh --boostrap-server 10.0.1.15:9092 --topic DomainEntityCommandStream --from-beginning



效果很好。

通过 Stackoverflow 上的一些相关问题,我试图清除该主题:

./kafka-topics.sh --zookeeper 10.0.1.15:9092 --alter --topic DomainEntityCommandStream --config retention.ms=1000



查看 kafka 日志,我看到保留时间已更改。

但是,不走运,我遇到了同样的错误。

有效载荷小得离谱,那我为什么要更改batch.size?
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
                  xmlns:gs="http://soap.problem.com">
   <soapenv:Header/>
   <soapenv:Body>
      <gs:streamDomainEntityRequest>
         <gs:domainEntity>
                <gs:name>12345</gs:name>
                <gs:value>Quebec</gs:value>
                <gs:version>666</gs:version>
            </gs:domainEntity>
      </gs:streamDomainEntityRequest>
   </soapenv:Body>
</soapenv:Envelope>

最佳答案

使用 Docker 和 Kafka 0.11.0.1 镜像,您需要将以下环境参数添加到容器中:

KAFKA_ZOOKEEPER_CONNECT = X.X.X.X:XXXX (your zookeeper IP or domain : PORT default 2181)

KAFKA_ADVERTISED_HOST_NAME = X.X.X.X (your kafka IP or domain)

KAFKA_ADVERTISED_PORT = XXXX (your kafka PORT number default 9092)


可选:

KAFKA_BROKER_ID = 999 (some value)

KAFKA_CREATE_TOPICS=test:1:1 (some topic name to create at start)


如果它不起作用并且您仍然收到相同的消息(“xxxxx 的 X 条记录到期:XXXXX ms 已通过批处理创建加上逗留时间”),您可以尝试从 Zookeeper 清除 kafka 数据。

关于kafka-producer-api - xxxxx : 30030 ms has passed since batch creation plus linger time 的 1 条记录到期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47299204/

相关文章:

apache-kafka - kafka 异步发送不是真的异步吗?

apache-kafka - 高级 Kafka 消费者 API 不起作用

apache-kafka - Kafka : The message when serialized is larger than the maximum request size you have configured with the max. request.size 配置

scala - 将 Kafka 消费者和生产者集成到一个函数中

java - 使用Kafka翻滚窗口查询时返回空数据

java - 找不到 key.serializer 的类

hadoop - 使用 HiveStorageHandler 的 Kafka 生产者

apache-kafka - Kafka session.timeout.ms 和 max.poll.interval.ms 的区别

apache-kafka - 卡夫卡0.10与卡夫卡0.8的差异

c# - asp.net core依赖注入(inject)配置