java - 如何在同一个盒子上运行多个相互独立的kafka消费者?

标签 java multithreading design-patterns apache-kafka abstract-class

我有两个 Kafka 消费者 ConsumerAConsumerB。我想在同一台机器上运行这两个相互独立的 kafka 消费者。他们之间根本没有关系。这两个 kafka 消费者将在同一台机器上处理不同的主题。

  • 每个消费者都应该有不同的 Properties 对象。
  • 每个消费者都应该有不同的线程池配置,因为如果需要独立于其他消费者,它们可以以多线程方式(消费者组)运行。

下面是我的设计:

消费者类(摘要):

 public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;

    public Consumer(String consumerName, Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }

    protected abstract void shutdown();
    protected abstract void run(String consumerName, Properties consumerProps);

    @Override
    public final void run() {
        run(consumerName, consumerProps);
    }
}

ConsumerA类:

public class ConsumerA extends Consumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private KafkaConsumer<byte[], byte[]> consumer;

    public ConsumerA(String consumerName, Properties consumerProps) {
        super(consumerName, consumerProps);
    }

    @Override
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    @Override
    protected void run(String consumerName, Properties consumerProps) {
        consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(getTopicsBasisOnConsumerName());

        Map<String, Object> config = new HashMap<>();
        config.put(Config.URLS, TEST_URL);
        GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);

        try {
            while (!closed.get()) {
                ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<byte[], byte[]> record : records) {
                    GenericRecord payload = decoder.decode(record.value());
                    // extract data from payload
                    System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                                      record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
                consumer.commitAsync();
            }
        } catch (WakeupException ex) {
            // Ignore exception if closing
            System.out.println("error= ", ex);
            if (!closed.get()) throw e;             
        } catch (Exception ex) {
            System.out.println("error= ", ex);      
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

ConsumerA B类:

// similar to `ConsumerA` but with specific details of B

ConsumerHandler 类:

public final class ConsumerHandler {
  private final ExecutorService executorServiceConsumer;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceConsumer.shutdown();
        try {
          executorServiceConsumer.awaitTermination(1000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

下面是我的一个项目中的主类,如果我启动我的服务器,调用将首先自动进行,从这个地方我启动我所有的 kafka 消费者,在那里我执行我的 ConsumerA消费者B。一旦调用关闭,我就会通过对所有 Kafka 消费者调用关闭来释放所有资源。

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;

@Singleton
@DependencyInjectionInitializer
public class Initializer {
  private ConsumerHandler consumerHandlerA;
  private ConsumerHandler consumerHandlerB;

  @PostConstruct
  public void init() {
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA", getConsumerPropsA()), 3);
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB", getConsumerPropsB()), 3);
  }

  @PreDestroy
  public void shutdown() {
    consumerHandlerA.shutdown();
    consumerHandlerB.shutdown();
  }
}

对于这种我想在同一个盒子上运行多个 kafka 消费者的问题,这是正确的设计吗?让我知道是否有更好、更有效的方法来解决这个问题。一般来说,我会在同一个盒子上最多运行三个或四个 Kafka 消费者,如果需要,每个消费者都可以有自己的消费者组。

这是 KafkaConsumer 的 Javadoc我在我的两个消费者中都使用它。并以此为基础article我已经创建了我的消费者,只是我使用了抽象类来扩展它。 在该链接中搜索“Putting it all Together”

在文档中提到消费者不是线程安全的,但看起来我的代码正在为池中的每个线程重用相同的消费者实例。

public ConsumerHandler(Consumer consumer, int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }

解决这个线程安全问题并仍然实现相同功能的最佳方法是什么?

最佳答案

一个简短的建议,如果您已经知道,我们深表歉意。类级变量永远不是线程安全的。如果您需要为每个线程使用不同的 Properties 对象,最好在方法级别声明它们,并将它们作为参数提供给您需要访问 Properties 对象的其他方法。

关于java - 如何在同一个盒子上运行多个相互独立的kafka消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41072666/

相关文章:

multithreading - 在Perl中使用多线程时,应如何更新哈希哈希?

design-patterns - 您能解释一下 Context 设计模式吗?

Java 设计问题

java - 使用 Spring 3 MVC 的问题

java - 为什么 AIDL/Messenger 绑定(bind)到服务?

java - 如何将并行进程同步到 Web 服务中?

asp.net - 将动态表单字段添加到数据库

java - WeakHashMap 是在不断增长,还是会清除垃圾键?

JavaFX:如何将图像添加到已在另一种方法中初始化的矩形中?

c++ - 线程没有从 sleep 中醒来