spring-cloud - 使用 Spring Cloud Stream 通过 Confluent Schema Registry 生成 Avro 消息

标签 spring-cloud spring-cloud-stream

我是 Spring Cloud Stream 的新手,正在尝试基于 Confluent Schema Registry 生成 Avro 消息。

我可以得到非常基本的示例,https://cloud.spring.io/spring-cloud-stream/ 可以正常工作,但是当我尝试进一步扩展它以使用 Avro 时,我遇到了异常。

作为通用测试环境,我正在运行 Docker 镜像: https://github.com/Landoop/fast-data-dev/

测试项目使用Spring Initializr生成并稍作修改: https://start.spring.io/

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.demo.kafka</groupId>
  <artifactId>kafka-demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafka-demo</name>
  <description>Demo project for Spring Boot</description>


  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Dalston.SR1</spring-cloud.version>
    <project-avro.version>7.0.0-SNAPSHOT</project-avro.version>
    <apache.avro.version>1.8.2</apache.avro.version>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-schema</artifactId>
      <version>1.2.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.9</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${apache.avro.version}</version>
    </dependency>
    <dependency>
      <groupId>project.foundation</groupId>
      <artifactId>maritz-avro</artifactId>
      <version>${project-avro.version}</version>
    </dependency>
  </dependencies>
</project>

我已将以下架构发布到注册表:

{
  "type": "record",
  "name": "RoutingSlipMsg",
  "fields": [
    {
      "name": "projectNumber",
      "type": "string"
    },
    {
      "name": "clientName",
      "type": "string"
    },
    {
      "name": "programTheme",
      "type": "string"
    },
    {
      "name": "databaseName",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "actionType",
      "type": [
        "null",
        "string"
      ]
    }
  ]
}

原始测试application.yml

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamicSchemaGenerationEnabled: true
      bindings:
        output:
          binder: kafka
          destination: routing_slip_dev
#          contentType: application/avro
#          contentType: application/*+avro
          contentType: application/com.project.RoutingSlipMsg.v1+avro
      schemaRegistryClient:
        endpoint: http://192.168.99.100:8081
      kafka:
        binder:
          defaultBrokerPort: 9092
          defaultZkPort: 2181
          zkNodes: 192.168.99.100
          brokers: 192.168.99.100

测试 application.yml 编辑:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamicSchemaGenerationEnabled: true
      bindings:
        output:
          binder: kafka
          destination: routing_slip_dev
#          contentType: application/*+avro
#          contentType: application/avro
      schemaRegistryClient:
        endpoint: http://192.168.99.100:8081
      kafka:
        binder:
          defaultBrokerPort: 9092
          defaultZkPort: 2181
          zkNodes: 192.168.99.100
          brokers: 192.168.99.100

配置类

@Configuration
public class DemoConfiguration {


  @Bean
  public ConfluentSchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient registry = new ConfluentSchemaRegistryClient();
    registry.setEndpoint("http://192.168.99.100:8081");
    return registry;
  }

}

自动主题创建似乎工作正常。我每次都可以将 destination: 值更改为其他值,然后将创建一个新主题。

测试

package com.project.kafka.kafkademo;

import com.project.avro.RoutingSlipMsg;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaDemoApplicationTests {

  @Autowired
  private KafkaRoutingSlipSender kafkaRoutingSlipSender;

  @Test
  public void testSend() throws Exception {

    RoutingSlipMsg routingSlip = new RoutingSlipMsg();
    routingSlip.setActionType("FAKE_ACTION");
    routingSlip.setClientName("TEST_CLIENT");
    routingSlip.setDatabaseName("NONE");
    routingSlip.setProgramTheme("THEME");
    routingSlip.setProjectNumber("ABC123");

    kafkaRoutingSlipSender.send(routingSlip);
  }
}

**BootApplication类*

package com.project.kafka.kafkademo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(KafkaDemoApplication.class, args);
  }
}

原始发件人类

package com.project.kafka.kafkademo;

import com.project.avro.RoutingSlipMsg;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {


  @InboundChannelAdapter(value = Source.OUTPUT)
  public MessageSource<RoutingSlipMsg> send(RoutingSlipMsg data) {
    System.out.println("**** Hello World ****");
    return () -> {
      return new GenericMessage<>(data);
    };
  }
}

发件人类别编辑:

@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {

  @Autowired
  private Source source;

  public void send(RoutingSlipMsg data) {
    System.out.println("**** Hello World ****");
    source.output().send(MessageBuilder.withPayload(data).build());
  }
}

由于不当使用 @InboundChannelAdapter 导致的原始 Stacktrace/异常

2017-07-06 12:20:48.645 ERROR 69011 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is java.lang.IllegalArgumentException: wrong number of arguments
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:119)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:201)
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:116)
    ... 20 more

我在这里查看了几篇不同的帖子,但似乎没有任何帮助。 我可能缺少什么?

更新 1: 我已经对 KafkaRoutingSlipSender 类和 application.yml 进行了更新,如上所示。删除 contentType 后,这些更改将发布如下消息:

ÿcontentType?"application/x-java-object;type=com.maritz.avro.RoutingSlipMsg"FAKE_ACTIOÎTEST_CLIENÔNONÅTHEMÅABC12³

进入主题。

但是,当我将 contentType 设置为 application/*+avro 时,@Autowired Source 似乎是一个 空指针异常

新堆栈跟踪/异常:

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=9a9ebc45-9431-8de5-1cb0-40d191082599, timestamp=1499438830456}]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:22)
    at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:57)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
    ... 31 more

更新 2 (07-10-2017): 由于以下信息,我已经修复了 null 源问题:

Spring Boot: Configuration Class is simply ignored and not loaded

现在正在使用 ConfluentSchemaRegistryClient。调试时我可以单步执行代码,但它找不到架构。

ConfluentSchemaRegistryClient.register(字符串主题,字符串格式,字符串模式)

ResponseEntity<Map> response = this.template.exchange(this.endpoint + path, HttpMethod.POST, request, Map.class, new Object[0]);

新的 Stacktrace/异常 (07-10-2017):

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is org.springframework.web.client.HttpClientErrorException: 404 Not Found
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=73365130-a91b-ba9c-aaf0-77cfaa26f73d, timestamp=1499700409997}]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:29)
    at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:52)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.springframework.web.client.HttpClientErrorException: 404 Not Found
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:63)
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:700)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:653)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:531)
    at org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient.register(ConfluentSchemaRegistryClient.java:73)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
    ... 31 more

register() 是否应该创建一个新的模式(如果该模式不存在),或者 404 是适当的行为吗?很困惑。

最佳答案

Caused by: java.lang.IllegalArgumentException: wrong number of arguments

@InboundChannelAdapter 方法不能有任何参数...

/**
 * Indicates that a method is capable of producing a {@link org.springframework.messaging.Message}
 * or {@link org.springframework.messaging.Message} {@code payload}.
 * <p>
 * A method annotated with {@code @InboundChannelAdapter} can't accept any parameters.
 * <p>    
 * Return values from the annotated method may be of any type. If the return
 * value is not a {@link org.springframework.messaging.Message}, a {@link org.springframework.messaging.Message}
 * will be created with that object as its {@code payload}.

...

轮询消息或消息负载。

关于spring-cloud - 使用 Spring Cloud Stream 通过 Confluent Schema Registry 生成 Avro 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44955707/

相关文章:

apache-kafka - 发件箱模式 - 我们如何防止消息中继过程生成重复的消息?

spring-cloud -/eureka/v2/apps 是否已从 spring 云发现仪表板中删除

spring - 如何构建 Spring Cloud Stream JMS ActiveMQ

java - 如果正在使用不同的线程,则 Spring Cloud Stream 可轮询消费者 dlq 和 errorChannel 不起作用

spring-cloud - Spring Cloud Stream无法检测消息路由器

spring-boot - 如何使用 Feign.Builder 实现 Sleuth Tracing?

session - 微服务 Spring session

spring-boot - 使用 Spring Cloud Stream Kafka Binder 时无法设置 groupId 和 clientId

java - rabbitmq Binder 中的自定义 DLX 选项

spring-boot - Spring Boot 应用程序与 Spring Cloud 的集成测试