Spring Cloud 流 MessageChannel send() 总是返回 true

标签 spring spring-boot spring-cloud-stream spring-kafka

我正在使用 Spring 云流,我想保存消息并在 Kafka 服务器消失时重新尝试将它们发布到主题上,但即使 Kafka/Zookeeper 服务器停止,MessageChannel send() 方法也始终返回 true。

有人可以帮忙吗?

更新 application.yml 内容:

spring:
    cloud:
        stream:
            kafka:
                binder:
                    brokers: localhost
                    zk-nodes: localhost
                    mode: raw
                bindings:
                    output:
                        producer:
                            sync: true
            bindings:
                output:
                    destination: topic-notification
                    content-type: application/json

代码:
@Service
public class SendToKafka {
    private Logger log = LoggerFactory.getLogger(SendToKafka.class);

    @Autowired
    Source source;

    @Autowired
    NotificationFileService notificationFileService;

    public void send(NotificationToResendDTO notification){
        try {
            CompletableFuture.supplyAsync(() -> notification)
                .thenAcceptAsync(notif -> {
                    boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
                    log.info(" ======== kafka server response === " + resp);

                    if (!resp){
                        log.info(" ======== failed to send the notification" + notification);
                        // save failed notification
                        notificationFileService.writeTofile(notification);
                    }
                }).get();
        } catch (InterruptedException | ExecutionException e) {
            log.info(" ======== failed to send the notification with exception" + notification);
            // save failed notification
            notificationFileService.writeTofile(notification);
            e.printStackTrace();
        }
    }
}

最佳答案

Kafka 默认是异步的;您需要设置 synctrue ;见 binder producer properties .

sync

Whether the producer is synchronous.

Default: false.

关于Spring Cloud 流 MessageChannel send() 总是返回 true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47678318/

相关文章:

spring - @Qualifier 无效

java - 将 YML/Java 配置 Autowiring 到 Spring Boot Bean

java - 多模块项目的 Spring Boot 组件扫描问题

java - 当 Spring Cloud Stream 响应式消费者遇到异常时,为什么我会收到 onComplete 信号?

apache-kafka - 发送到 kafka 主题时序列化消息时出错

java - Spring代理是否请求属性?

java - mocoServer - 代码解释

java - 独立消费者 (SpringJMS) 在 ActiveMQ 上创建了另一个队列

java - Spring boot 2.0.2,使用Aop拦截Cloud Stream注释不再起作用

java - 如何在 Spring 样本宠物诊所中使用JPA?