java - 我使用spring-cloud-stream无法发送消息,但可以接收消息

标签 java spring-cloud-stream

StreamInput.java

public interface StreamInput {
    String QUEUE = "cherryQueue";
    @Input(StreamInput.QUEUE)
    SubscribableChannel input();
}

StreamOutput.java

public interface StreamOutput {
    @Output(StreamInput.QUEUE)
    MessageChannel output();
}

StreamReceiver.java

@Slf4j
@Component
@EnableBinding(StreamInput.class)
public class StreamReceiver {
    @StreamListener(StreamInput.QUEUE)
    public void process(Object message){
    log.info("receive message:{}",message);
    log.info("--------------------------------------------------");
    }
}

springboot项目启动正常,队列创建成功: enter image description here

================================================== ========================= 但是!我用的是StreamOutput,它不起作用。

这是测试用例:

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableBinding(StreamOutput.class)
public class MessageTest {

    @Autowired
    private StreamOutput streamOutput;

    @Test
    public void sentMessageTest() {
        streamOutput.output()
                .send(MessageBuilder.withPayload("now time is :" + new Date()).build());
        log.info("the message send OK !!!");
    }

}

这是异常信息:

java.lang.IllegalStateException: Failed to load ApplicationContext

    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:108)
    at org.springframework.test.context.web.ServletTestExecutionListener.setUpRequestContextIfNecessary(ServletTestExecutionListener.java:190)
    at org.springframework.test.context.web.ServletTestExecutionListener.prepareTestInstance(ServletTestExecutionListener.java:132)
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'cherryQueue' defined in com.cherry.order.message.StreamInput: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.cherry.order.message.MessageTest$StreamOutput; factoryMethodName=output; initMethodName=null; destroyMethodName=null
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:64)
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerInputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:47)
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:80)
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:562)
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:541)
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:76)
    at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:45)
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:358)
    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:357)
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145)
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117)
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:328)
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:233)
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:273)
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:93)
    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:694)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:532)
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759)
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:327)
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:139)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 24 more

最佳答案

Caused by: org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'cherryQueue' defined in com.cherry.order.message.StreamInput: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.cherry.order.message.MessageTest$StreamOutput; factoryMethodName=output; initMethodName=null; destroyMethodName=null

我认为这个错误非常明显。

@Output(StreamInput.QUEUE)

您不能对 2 个 channel 使用相同的 channel 名称。

如果您希望在同一应用程序中从同一目标生成和使用消息(这有点不寻常),则必须使用 2 个 channel ,并将绑定(bind)配置为引用同一目标。

关于java - 我使用spring-cloud-stream无法发送消息,但可以接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51438311/

相关文章:

apache-kafka - Spring Integration 中的 MutableMessageBuilderFactory

java - 如何使用对象有效负载而不是来自 MessageCollector 的字符串接收 GenericMessage

java - Spring JMS 消息监听器无法找到 JNDI 查找

spring-integration - 没有 spring-boot 的 Spring-cloud-stream 应用程序是否可行?

apache-kafka - 如何使用 Spring 云流访问使用密码保护的融合架构注册服务器?

java - 编码检测方法不起作用

java - 如何从通过 StreamListener 接收到的对象创建 KTable?

java - 在 OpenShift 上重新启动 WildFly 时出现内存不足错误

java - 自定义数组排序

java - 在java中按升序显示字母