我正在使用 Spring 云流,我想保存消息并在 Kafka 服务器消失时重新尝试将它们发布到主题上,但即使 Kafka/Zookeeper 服务器停止,MessageChannel send() 方法也始终返回 true。
有人可以帮忙吗?
更新 application.yml 内容:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
zk-nodes: localhost
mode: raw
bindings:
output:
producer:
sync: true
bindings:
output:
destination: topic-notification
content-type: application/json
代码:
@Service
public class SendToKafka {
private Logger log = LoggerFactory.getLogger(SendToKafka.class);
@Autowired
Source source;
@Autowired
NotificationFileService notificationFileService;
public void send(NotificationToResendDTO notification){
try {
CompletableFuture.supplyAsync(() -> notification)
.thenAcceptAsync(notif -> {
boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
log.info(" ======== kafka server response === " + resp);
if (!resp){
log.info(" ======== failed to send the notification" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
}
}).get();
} catch (InterruptedException | ExecutionException e) {
log.info(" ======== failed to send the notification with exception" + notification);
// save failed notification
notificationFileService.writeTofile(notification);
e.printStackTrace();
}
}
}
最佳答案
Kafka 默认是异步的;您需要设置 sync
至 true
;见 binder producer properties .
sync
Whether the producer is synchronous.
Default: false.
关于Spring Cloud 流 MessageChannel send() 总是返回 true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47678318/