我知道这个问题之前在这里被问过:Kafka Streaming Concurrency?
但这对我来说很奇怪。根据文档(或者我可能遗漏了一些东西),每个分区都有一个任务,这意味着处理器的不同实例,并且每个任务都由不同的线程执行。但是当我测试的时候,我看到不同的线程可以获得不同的处理器实例。因此,如果您想在处理器中保留任何内存状态(老式方式),您必须锁定吗?
示例代码:
public class SomeProcessor extends AbstractProcessor<String, JsonObject> {
private final String ID = UUID.randomUUID().toString();
@Override
public void process(String key, JsonObject value) {
System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);
输出:
线程 ID:88 ID:26b11094-a094-404b-b610-88b38cc9d1ef
线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda
线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda
线程 ID:90 ID:0a43ecb0-26f2-440d-88e2-87e0c9cc4927
线程ID:90 ID:c667e669-9023-494b-9345-236777e9dfda
线程ID:90 ID:c667e669-9023-494b-9345-236777e9dfda
有没有办法为每个实例强制执行线程?
最佳答案
每个实例的线程数是一个配置参数(num.stream.threads
,默认值为1
)。因此,如果您启动单个 KafkaStreams
实例,您将获得 num.stream.threads
个线程。
任务将工作拆分为并行单元(基于您的输入主题分区)并将分配给线程。因此,如果您有多个任务和一个线程,则所有任务都将分配给该线程。如果您有两个线程(所有 KafkaStreams
实例的总和)每个线程执行大约 50% 的任务。
Note: because a Kafka Streams application is distributed in nature, there is no difference if you run a single
KafkaStreams
instance with multiple threads, or multipleKafkaStreams
instanced with one thread each. Tasks will be distributed over all available threads of your application.
如果你想在任务之间共享任何数据结构并且你有一个以上的线程,那么你有责任同步对该数据结构的访问。请注意,任务到线程的分配在运行时可能会发生变化,因此,所有访问都必须同步。 但是,不推荐使用这种模式,因为它限制了可扩展性。你应该设计你的程序没有共享数据结构!这样做的主要原因是,你的程序通常分布在多台机器上,因此,不同的 KafkaStreams
实例无法访问共享数据结构无论如何数据结构。共享数据结构只能在单个 JVM 中工作,但使用单个 JVM 可以防止应用程序横向扩展。
关于java - Kafka 流处理器线程安全吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47119317/