Kafka客户应用程序具有严重的延迟(在高峰时段无法足够快地消耗kafka事件)。 kafka主题有120个分区,使用者组总共有30个主机,每个主机有两个使用者,因此每个使用者都使用2个kafka分区。我们使用的主机是具有32核的AWS C5.9xlarge实例。每个使用者都放在一个java.lang.Thread中,并且在每个线程中创建了一个带有250个线程的ThreadPool。
我们已经验证了CPU/内存/IO都不是瓶颈。然后,我们将250名 worker 增加到500名 worker ,但延迟保持不变。然后,我们改回了250名工作人员,但每台主机从2个增加到4个消费者。结果,每个消费者从一个kafka分区进行消费。现在问题解决了,延迟降低到了非常低的水平。
我的问题是,为什么在Threadpool中从250增加到500并没有帮助,而每台主机从2到4的消费者增加却有帮助呢?
private class ConsumerThread extends Thread {
public ConsumerThread(StremProcessor processor) {
this.processor = processor;
this.consumer = new KafkaConsumer()
}
@Override
public void run() {
ExecutorService executor = Executors.newFixedThreadPool(250);
while (true) {
Data data = consumer.poll()
executor.invokeAll(getTasks(data, processor)); //processor is
}
}
}
最佳答案
首先:您应该在每个循环之间的while循环中包括一些延迟,以防止应用程序淹没您的内存。
基本上,ExecutorService.invokeAll()
方法返回Future
的列表。您可以使用它们来“控制”线程。
How are the threads in ThreadPool different from the java.lang.Thread?
它们没有什么不同,但是您可以得到一个包装器(
Future
),该包装器使您可以在执行时控制线程。底层的Thread
像普通的Java线程一样工作。Is it because all the threads in the ThreadPool use a single processor core?
不
关于multithreading - java.lang.Thread和ThreadPool中的线程的性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55411596/