spring-boot - 无法运行这个简单的流,...需要 serde 配置吗?

标签 spring-boot apache-kafka-streams spring-cloud-stream

是的,我已经阅读了我找到的所有文档,并尝试了配置上的所有替代方案,但只是这个应该记录一行的简单示例不起作用

(这是一个带有 spring-cloud-stream-binder-kafka-streams 的 Spring-Boot-2 应用程序)

Kafka 正在存储一个字符串值(空键)

我的应用程序.yaml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: 'myStreamTopic'
    output:
          producer.keySerde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
      kafka:
        streams:
          binder:
            configuration:
              default.key.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
              default.value.serde: 'org.apache.kafka.common.serialization.Serdes$StringSerde'
            brokers:
              - 'ommited:9092'
              - 'ommited:9092'
              - 'ommited:9092'
            application-id: hack1

只是这个简单的代码作为 POC:

@SpringBootApplication
@Slf4j
public class HackatonApplication {

    public static void main(String[] args) {
SpringApplication.run(HackatonApplication.class, args);
}

  @EnableBinding(KafkaStreamsProcessor.class)
  public static class LineProcessor {

    @StreamListener(Sink.INPUT)
    public void process(KStream<?, String> line) {
      log.info("Received: {}", line);
    }

  }

我无法让它运行!

org.springframework.context.ApplicationContextException:无法启动bean“outputBindingLifecycle”;嵌套异常是java.lang.IllegalArgumentException:尝试调用公共(public)抽象org.apache.kafka.streams.kstream.KStream org.apache.kafka.streams.kstream.KStream.map(org.apache.kafka.streams.kstream.KeyValueMapper )但尚未设置委托(delegate)。 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:184) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] 在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar!/:5.0.7.RELEASE] 在

抱歉,如果这是微不足道的,但我已经花了几个小时进行搜索、谷歌搜索并试图找到记录的解决方案。

最佳答案

您正在使用开箱即用的 KafkaStreamsProcessor 进行绑定(bind),它需要一个 KStream 作为输入,另一个 KStream 作为输出。如果您使用此标准,则必须为输出绑定(bind)提供正确的配置(例如目标等)。那么你的方法必须返回一个KStream并且你需要使用spring端的SendTo注解来绑定(bind)。如下所示:

@StreamListener(Sink.INPUT)
@SendTo("output")
public KStream<?,String> process(KStream<?, String> line) {
    log.info("Received: {}", line);
    return line;
}

但是,就您的情况而言,您可以使用自定义处理器并将其用于 EnableBinding

interface CustomKafkaStreamsProcessor {
        @Input("input")
        KStream<?, ?> input();
    } 

然后将其与您的绑定(bind)一起使用。 @EnableBinding(CustomKafkaStreamsProcessor.class)

这样,您就不必更改返回内容的方法。

关于spring-boot - 无法运行这个简单的流,...需要 serde 配置吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51454931/

相关文章:

java - Spring引导-gson单例?

java - Kafka Streams Global Store - 添加更改日志主题

java - @Configuration & @Bean 多个可注入(inject)

sql - jpa多对多查询

java - 如何使用 Micronaut 运行 KafkaStream 应用程序?

apache-kafka - Kafka 的动态消息编排流引擎

spring-cloud-stream 生产者交易性

javascript - 对 Spring Boot 端点的 Ajax 请求无法读取 HTTP MSG

apache-kafka - 如何处理kafka KStream并直接写入数据库而不是向其发送另一个主题