java - 拦截Spring Cloud Stream SubscribableChannel的传入消息

标签 java spring-integration spring-cloud-stream spring-messaging

由于org.springframework.messaging.support.ChannelInterceptor的方法postReceive未在org.springframework.messaging.SubscribableChannel中调用。有没有办法拦截带注释的方法的所有传入消息@StreamListener(Sink.INPUT)

例如:

在进入handle方法之前拦截消息

@StreamListener(Sink.INPUT)
public void handle(Foo foo) {
    // ...
}

以下是我对 Spring Cloud Stream 的设置

public interface EventSink {

    String INPUT1 = "input1";
    String INPUT2 = "input2";

    @Input(INPUT1)
    SubscribableChannel input1();

    @Input(INPUT2)
    SubscribableChannel input2();   
}

public interface EventSource {

    String OUTPUT1 = "output1";
    String OUTPUT2 = "output2";

    @Output(OUTPUT1)
    MessageChannel output1();

    @Output(OUTPUT2)
    MessageChannel output2()';
}

spring:
  cloud:
    stream:
      bindings:
        input1:
          destination: input1
        input2:
          destination: input2     
        output1:
          destination: output1
        output2:
          destination: output2

public class EventHandler {

    @StreamListener(EventSink.INPUT1)
    public void handle(Foo1 foo) {
        // ...
    }

    @StreamListener(EventSink.INPUT2)
    public void handle(Foo2 foo) {
        // ...
    }

}

@Service
public class Bar1Service {

    @Autowired
    private EventSource source;

    public void bar1() {
        source.output1().send(MessageBuilder.withPayload("bar1").build());
    }

}

@Service
public class Bar2Service {

    @Autowired
    private EventSource source;

    public void bar2() {
        source.output2().send(MessageBuilder.withPayload("bar2").build());
    }

}

最佳答案

使用DirectChannel,绑定(bind)器会在同一线程上调用监听器,因此 preSend 在这里是合适的。

但是,您不必搞乱 ThreadLocal,您可以使用方法签名访问 header ...

@StreamListener(Processor.INPUT)
public void handle(Foo foo, @Header("bar") String bar) {
    ...
}

编辑

@EnableBinding(Processor.class)
public class So41459187Application {

    public static void main(String[] args) {
        SpringApplication.run(So41459187Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String in) {
        return in.toUpperCase();
    }

    @Configuration
    public static class Config {

        @Bean
        public BeanPostProcessor channelConfigurer() {
            return new BeanPostProcessor() {

                @Override
                public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                    return bean;
                }

                @Override
                public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                    if ("input".equals(beanName)) {
                        ((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {

                            @Override
                            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                                System.out.println("PreSend on INPUT: " + message);
                                return message;
                            }

                        });
                    }
                    else if ("output".equals(beanName)) {
                        ((AbstractMessageChannel) bean).addInterceptor(new ChannelInterceptorAdapter() {

                            @Override
                            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                                System.out.println("PreSend on OUTPUT: " + message);
                                return message;
                            }

                        });
                    }
                    return bean;
                }

            };
        }
    }

}

关于java - 拦截Spring Cloud Stream SubscribableChannel的传入消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41459187/

相关文章:

java - 我可以使用 BigInteger 中的哪些操作

spring-boot - 如果我使用 @EnableAutoConfiguration,IntelliJ 无法 Autowiring @ServiceActivator 方法中的参数

java - 窗口存储未填满(Spring Cloud Stream Kafka)

java - 有没有办法在 Play! 中使用 MyBatis?框架?

java - Memcached 的 Java UDP 套接字客户端接收超时

java - 使用 spring 集成从 wmq 获取 JMS 目标

spring-boot - 如何使用Spring Boot配置JMX

java - Spring Cloud Stream RabbitMQ Binder - Spring Cloud函数错误处理

Spring Netflix eureka、zuul vs Spring 云数据流

java - 如何在Spring Boot MVC中更改View上的数据?