java - Kafka 流处理器线程安全吗?

标签 java multithreading apache-kafka-streams

我知道这个问题之前在这里被问过:Kafka Streaming Concurrency?

但这对我来说很奇怪。根据文档(或者我可能遗漏了一些东西),每个分区都有一个任务,这意味着处理器的不同实例,并且每个任务都由不同的线程执行。但是当我测试的时候,我看到不同的线程可以获得不同的处理器实例。因此,如果您想在处理器中保留任何内存状态(老式方式),您必须锁定吗?

示例代码:

public class SomeProcessor extends AbstractProcessor<String, JsonObject> {

   private final String ID = UUID.randomUUID().toString();

   @Override
   public void process(String key, JsonObject value) {
     System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);

输出:

线程 ID:88 ID:26b11094-a094-404b-b610-88b38cc9d1ef

线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:0a43ecb0-26f2-440d-88e2-87e0c9cc4927

线程ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

线程ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

有没有办法为每个实例强制执行线程?

最佳答案

每个实例的线程数是一个配置参数(num.stream.threads,默认值为1)。因此,如果您启动单个 KafkaStreams 实例,您将获得 num.stream.threads 个线程。

任务将工作拆分为并行单元(基于您的输入主题分区)并将分配给线程。因此,如果您有多个任务和一个线程,则所有任务都将分配给该线程。如果您有两个线程(所有 KafkaStreams 实例的总和)每个线程执行大约 50% 的任务。

Note: because a Kafka Streams application is distributed in nature, there is no difference if you run a single KafkaStreams instance with multiple threads, or multiple KafkaStreams instanced with one thread each. Tasks will be distributed over all available threads of your application.

如果你想在任务之间共享任何数据结构并且你有一个以上的线程,那么你有责任同步对该数据结构的访问。请注意,任务到线程的分配在运行时可能会发生变化,因此,所有访问都必须同步。 但是,不推荐使用这种模式,因为它限制了可扩展性。你应该设计你的程序没有共享数据结构!这样做的主要原因是,你的程序通常分布在多台机器上,因此,不同的 KafkaStreams 实例无法访问共享数据结构无论如何数据结构。共享数据结构只能在单个 JVM 中工作,但使用单个 JVM 可以防止应用程序横向扩展。

关于java - Kafka 流处理器线程安全吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47119317/

相关文章:

java - 在 JPane 上正确显示多行

java - 将参数传递给 @BeforeClass 和 @AfterClass 方法

std::endl 的 C++ 多线程程序 I/O 问题

multithreading - 线程/ fork 的效率

c++ - 有没有办法通过模板参数自动解析重载方法?

apache-kafka - Kafka Streams - 减少大型状态存储的内存占用

apache-kafka - 使用选择键和转换在 DSL 拓扑上进行流重新分区

java - 处理弹出窗口之外的点击事件

java - 重写嵌套 for 循环以提供更好的格式化输出

apache-kafka-streams - Kafka Streams - 使用处理器 API 实现连接