java - 是否建议在 Kafka Streams 应用程序中启动新线程(使用编程方式)?

标签 java apache-kafka apache-kafka-streams

我们正在使用低级处理器 API 开发 Kafka Streams 应用程序。

根据 Kafka 的文档,所有线程和并行性都由流线程和流任务处理。并行性还可以使用主题分区进行扩展。

当前代码如下:

public class Processor implements Processor<K, V> {

@Override
  public void process(String key, V value) {

      //Do processing on the stream thread itself
      ...

      // Write back to output topic
      context.forward(key, updatedValue)
    }); 
  }
}

但是,在任何情况下都建议创建我们自己的线程来进行实际处理吗?这意味着主要使用 Kafka Streams API 来消费来自主题的数据,而不是用于实际处理。实际处理将发生在 Kafka 流线程中初始数据消耗后调用的新线程中。

拓扑中的示例处理器:

public class Processor implements Processor<K, V> {

@Override
  public void process(String key, V value) {

  //Spawn new thread to do the processing
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.submit(() -> {
      String threadName = Thread.currentThread().getName();
      System.out.println("Hello " + threadName);

      //Do more processing
      ...

      // Write back to output topic
      context.forward(key, updatedValue)
    }); 
  }
}

我已经为此尝试了最基本的代码,但无法确定它是否干预了 Kafka 提供的自动功能。例如自动提交偏移量、超时等

还是坚持 Kafka 流已经提供的默认行为并利用流线程快速处理数据总是更好?

最佳答案

不建议启动您自己的线程,因为这会破坏 Kafka Streams 的容错保证。如果 process() 返回,Kafka Streams 假定消息已被完全处理并且所有可能的输出消息都通过 forward() 发送。对于这种情况,Kafka Streams 可能会提交输入记录偏移量。

但是,如果您在后台线程中处理消息,并且该线程处理失败,Kafka Streams 将对此一无所知,因此,即使发生故障并且消息会丢失,也可能会提交偏移量。

此外,后台线程不允许在 process() 返回后调用 forward()。如果 forward()process() 的“外部”调用,Kafka Streams 将抛出异常。

使用您自己的后台线程并保留至少一次处理保证并非不可能,但是,它相当复杂,因此不推荐。

关于java - 是否建议在 Kafka Streams 应用程序中启动新线程(使用编程方式)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56350501/

相关文章:

java - 使用 Confluence 的 Spring Cloud Stream Kafka 不会生成与使用 Confluence 的 Spring Kafka 相同的消息

java - 如何在 Android 上使用文件描述符获取文件路径?

java.lang.NoClassDefFoundError : kafka/common/TopicAndPartition 错误

apache-kafka - 用于计算卡夫卡消费者总滞后时间的预定脚本

java - KStream 批处理窗口

java - Kubernetes 上的 Kafka 流 : Long rebalancing after redeployment

如果有焦点组件,则不会执行场景的 JavaFX Key Pressed 事件

java:这个方法返回什么?

apache-kafka - kafka中的Bootstrap服务器与zookeeper?

java - 卡夫卡流DSL : Application lags for windowed aggregation