我们正在使用低级处理器 API 开发 Kafka Streams 应用程序。
根据 Kafka 的文档,所有线程和并行性都由流线程和流任务处理。并行性还可以使用主题分区进行扩展。
当前代码如下:
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Do processing on the stream thread itself
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
但是,在任何情况下都建议创建我们自己的线程来进行实际处理吗?这意味着主要使用 Kafka Streams API 来消费来自主题的数据,而不是用于实际处理。实际处理将发生在 Kafka 流线程中初始数据消耗后调用的新线程中。
拓扑中的示例处理器:
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Spawn new thread to do the processing
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
//Do more processing
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
我已经为此尝试了最基本的代码,但无法确定它是否干预了 Kafka 提供的自动功能。例如自动提交偏移量、超时等
还是坚持 Kafka 流已经提供的默认行为并利用流线程快速处理数据总是更好?
最佳答案
不建议启动您自己的线程,因为这会破坏 Kafka Streams 的容错保证。如果 process()
返回,Kafka Streams 假定消息已被完全处理并且所有可能的输出消息都通过 forward()
发送。对于这种情况,Kafka Streams 可能会提交输入记录偏移量。
但是,如果您在后台线程中处理消息,并且该线程处理失败,Kafka Streams 将对此一无所知,因此,即使发生故障并且消息会丢失,也可能会提交偏移量。
此外,后台线程不允许在 process()
返回后调用 forward()
。如果 forward()
在 process()
的“外部”调用,Kafka Streams 将抛出异常。
使用您自己的后台线程并保留至少一次处理保证并非不可能,但是,它相当复杂,因此不推荐。
关于java - 是否建议在 Kafka Streams 应用程序中启动新线程(使用编程方式)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56350501/