java - Kafka 消费者在可完成的 future 内抵消提交

标签 java multithreading java-8 apache-kafka kafka-consumer-api

我在线程内创建了一个 Kafka 消费者实例,作为构造函数的一部分,并且在 run 方法内的线程内,我确实调用了不同的 Web 服务,并保持调用非阻塞,我正在使用完整的 future。我的问题是,我无法通过调用 thenApply 方法并传递 Kafka 消费者实例来发出提交,因为它给我一个错误,表明 Kafka 消费者不是线程安全的。虽然在我的提交方法中我已经编写了代码

synchronized(consumer) {
  commitResponse();
}

我仍然得到ConcurrentModificationException

class KafkaConsumerThread implements Runnable {

  KafkaConsumer<String, String> consumer;

  public KafkaConsumerThread(Properties properties) {
    consumer = new KafkaConsumer<String, String>(properties);    
    ...
  }

  @Override
  public void run() {
    try {
      // synchronized (consumer) {
      consumer.subscribe(topics);
      while (true) {
        if (closed.get()) {
          consumer.close();
        }
        ConsumerRecords<String, String> records = consumer.poll(120000);
        for (ConsumerRecord<String, String> record : records) {
          getAsyncClient().prepareGet(webServiceUrl)
              .execute()
              .toCompletableFuture()
              .thenApply(resp -> callAnotherService1(resp))
              .thenApply(resp -> callAnotherService2(resp))
              .thenApply(resp -> commitResponse(resp, consumer));
          }
        }
      }
    } catch (Exception ex) {
      ...
    }

在上面的代码中,我在 commitResponse 方法中收到异常“KafkaConsumer 对于多线程访问不安全”。尽管在我的提交响应中,如果我将提交包含在同步(消费者)中,我仍然会收到错误。

最佳答案

很可能是因为 poll方法不同步,并且在您的异步 GET 执行提交时执行(同时仍然保持内部 kafka 锁运行)。

查看私有(private)方法的引用: org.apache.kafka.clients.consumer.KafkaConsumer.acquire()org.apache.kafka.clients.consumer.KafkaConsumer.release()org.apache.kafka.clients.consumer.KafkaConsumer

关于java - Kafka 消费者在可完成的 future 内抵消提交,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46369286/

相关文章:

Java : How to implement Multi-Threading into a recursive folder size finder algorithm?

java - 数组作为局部变量 - 不遵循 Lambda 中的明确赋值规则

java - Java 方法引用中类型参数的使用

java - SpringMVC HTTP 状态 405 - 不支持请求方法 'POST'

java - 在 jBoss AS 7 中部署 EAR,其中包含 WAR 中的 Web 服务

java - 使用java获取XML的某些值

java - 在 Java 中,是否存在可与 Python 的 socketserver 媲美的东西?

java - 不同线程写入/读取的原始类字段变量

java - LocalDateTime 的长时间戳

objective-c - zlib:线程安全的zalloc和zfree在C中?