java - 为响应式(Reactive)管道编写方面

标签 java rx-java aop spring-aop project-reactor

我正在为返回 promise 的方法编写方面。考虑以下方法:

public Mono<Stream> publishToKafka(Stream s) {
    //publishToKafka is asynchronous
    return Mono.just(s).flatMap(worker::publishToKafka);
}

我想缓存发布是否成功。由于这是一个横切关注点,因此方面看起来是最好的设计。这是我的看法。

@Around("@annotation....")
public Object cache() {
    //get the data to cache from the annotation
    Object result = pjp.proceed();
    cache.cache("key","data");
    return result;
}

现在,由于 publishToKafka 是异步的,一旦发生线程切换并调用 cache.cache() ,目标方法就会返回。这不是我想要的。我想要的是,如果事件已成功发布到 Kafka,则应缓存结果。以下建议有效。

@Around("@annotation....")
public <T extends Stream<T>> Mono<T> cache() {
    //get the data to cache from the annotation
    return ((Mono<T>)pjp.proceed()).doOnNext(a -> cache.cache(key, data));
}

我想了解这里发生了什么。这种情况是否发生在管道组装期间?或者在执行期间(pjp.proceed() 返回一个 promise ),我的建议会添加 doOnNext 运算符?

我需要在此示例的上下文中了解汇编与执行时间。

最佳答案

Spring AOP 和 AspectJ 方面始终在与拦截的连接点相同的线程中同步执行。因此,如果您拦截的方法立即返回,并且返回值类似于 Promise、Future 或空(void)与回调的组合,则您不能指望在方面的建议中神奇地获得异步结果。您确实需要让方面了解异步情况。

话虽如此,我还想提一下,我以前从未使用过响应式编程,我只知道这个概念。从我在您的建议中看到的,该解决方案应该有效,但有一件事不太好:您让建议返回一个 new Mono 实例 returned by your doOnNext(..) call 。也许在注册了缓存回调后返回从 proceed() 获得的 原始 Mono 会更干净,只是为了避免任何副作用。

我不知道还要解释什么,情况已经很清楚了。如果我的解释还不够,请随时提出直接相关的后续问题。

关于java - 为响应式(Reactive)管道编写方面,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60630848/

相关文章:

java - 为什么当我第二次输入内容时会循环中断

java - 即使没有异常,数据也不会插入到表中

android - 如何在 RxJava 中处理 `IllegalArgumentException`?

java - spectj 切入点/建议仅适用于 *

c# - 需要 PostSharp 的免费模拟

java - Spring外部客户端超时

java - FileOutputStream 到 FileInputStream

rx-java - RxJava : CountDownLatch never stop

rx-java - 防止 Observable 为每个订阅运行

spring - 没有找到匹配的工厂方法 : factory method 'aspectOf()'