java - 关闭后如何重新连接kafka生产者?

标签 java multithreading scala apache-kafka

我有一个多线程应用程序,它使用生产者类来生成消息,之前我使用下面的代码为每个请求创建生产者。KafkaProducer 是用每个请求新建的,如下所示:

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {

                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        isValidMsg[0] = false;
                        exception.printStackTrace();
                        saveOrUpdateLog(msgBean, producerType, exception);
                        logger.error("ERROR:Unable to produce message.",exception);
                    }
                }
            });
producer.close();

然后我阅读了关于生产者的 Kafka 文档,了解到我们应该使用单个生产者实例以获得良好的性能。

然后我在单例类中创建了 KafkaProducer 的单个实例。

现在我们应该何时何地关闭生产者。显然,如果我们在第一次发送请求后关闭生产者,它不会找到生产者重新发送消息,因此抛出:

java.lang.IllegalStateException: Cannot send after the producer is closed.

或者我们如何在关闭后重新连接到生产者。 问题是如果程序崩溃或有异常呢?

最佳答案

通常,在 KafkaProducer 上调用 close() 足以确保所有飞行记录已完成:

/**
 * Close this producer. This method blocks until all previously sent requests complete.
 * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
 * <p>
 * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
 * will be called instead. We do this because the sender thread would otherwise try to join itself and
 * block forever.</strong>
 * <p>
 *
 * @throws InterruptException If the thread is interrupted while blocked
 */

如果您的生产者在您的应用程序的整个生命周期中都在使用,请不要关闭它,直到您收到终止信号,然后调用 close()。如文档中所述,生产者在多线程环境中使用是安全的,因此您应该重复使用同一个实例。

如果您在多个线程中共享您的KafkaProducer,您有两个选择:

  1. 在主执行线程中通过 Runtime.getRuntime().addShutdownHook 注册关闭回调时调用 close()
  2. 让您的多线程方法竞争关闭,只允许一个方法获胜。

2 的粗略草图可能如下所示:

object KafkaOwner {
  private var producer: KafkaProducer = ???
  @volatile private var isClosed = false
     
  def close(): Unit = {
    if (!isClosed) {
      kafkaProducer.close()
      isClosed = true
    }
  }
    
  def instance: KafkaProducer = {
    this.synchronized {
      if (!isClosed) producer 
      else {
        producer = new KafkaProducer()
        isClosed = false
      }
    }
  }
}

关于java - 关闭后如何重新连接kafka生产者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39917046/

相关文章:

java - 无法使用 STREAM_VOICE_CALL 通过 Android Java 中的 MediaPlayer 播放音频

java - 使用 MethodInvokingFactoryBean 设置非常规 bean

java - 如何在 Spring 中创建多 Web 模块应用程序?

scala - 与take(10)和limit(10).collect()的性能比较

java - Scala非法继承错误没有意义

scala - 当外部类在 Scala 中声明为 var 时实例化内部类

java - 为什么 Checkstyle 默认不允许 protected 变量?

java - 线程没有通过通知唤醒

python - PyQT 段错误(有时)

java - 使用 5 个线程添加数字