java - Axon:如何为单个事件配置 amqp 发布?

标签 java spring-boot amqp spring-amqp axon

我有一个简单的 spring 驱动的服务,它通过 amqp 发布事件。 配置基于bootiful-axon .

现在我希望服务保持一些私有(private)状态。这是一个简单的用例,可以通过 3 个额外的事件来实现。这些事件在服务范围之外没有任何意义,所以我不希望它们“离开”。

我如何指定哪些事件应该通过 amqp 发布,哪些不应该发布?

最佳答案

我是这样解决的:

拦截发送方法的自定义SpringAMQPPublisher:

public class SelectiveAmqpPublisher extends SpringAMQPPublisher {


    static boolean shouldSend (Class<?> pt) {
        return PublicEvent.class.isAssignableFrom(pt);
    }


    public SelectiveAmqpPublisher (
            SubscribableMessageSource<EventMessage<?>> messageSource) {

        super(messageSource);

    }


    @Override
    protected void send (List<? extends EventMessage<?>> events) {

        super.send(events.stream()
                        .filter(e -> shouldSend(e.getPayloadType()))
                        .collect(Collectors.toList()));

    }    

}

配置:

@Autowired
private AMQPProperties amqpProperties;

@Autowired 
private RoutingKeyResolver routingKeyResolver;

@Autowired
private AMQPMessageConverter aMQPMessageConverter;


@Bean(initMethod = "start", destroyMethod = "shutDown")
public SpringAMQPPublisher amqpBridge(
             EventBus eventBus, 
             ConnectionFactory connectionFactory,
             AMQPMessageConverter amqpMessageConverter) {

    SpringAMQPPublisher publisher = new SelectiveAmqpPublisher(eventBus);



    // The rest is from axon-spring-autoconfigure...

    publisher.setExchangeName(amqpProperties.getExchange());
    publisher.setConnectionFactory(connectionFactory);
    publisher.setMessageConverter(amqpMessageConverter);
    switch (amqpProperties.getTransactionMode()) {

        case TRANSACTIONAL:
            publisher.setTransactional(true);
            break;
        case PUBLISHER_ACK:
            publisher.setWaitForPublisherAck(true);
            break;
        case NONE:
            break;
        default:
            throw new IllegalStateException("....");
    }

    return publisher;

}

关于java - Axon:如何为单个事件配置 amqp 发布?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46721560/

相关文章:

symfony - 我应该如何构建 dockerized RabbitMQ?

ssl - 无法与 rabbitmq 建立 ssl 连接

java - Eclipse 自动完成不适用于 lambda 和类型

java - 如何将网页元素与页面对象文件分开

java - 在 Java 中实现接口(interface)实例列表的通用容器

java - 如何在 Spring Cloud Contract stub 上执行 WireMock.verify() 操作?

java - spring-data-rest:是否可以在单个响应中动态嵌入资源的关系?

java - Apache-POI 在 excel 中排序行

spring-boot - 为单元测试禁用 servlet 过滤器

java - 如何阻止 SimpleMessageListenerContainer 陷入关闭/重新启动循环?