更新为使用附加信息重新表述问题
我们有两个注释:
-
CustomLogging
-
PollableStreamListener
两者都是使用 Spring AOP 的方面来实现的。
CustomLogging
注释:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {
}
CustomLoggingAspect
类:
@Aspect
@Component
@Slf4j
@Order(value = 1)
public class CustomLoggingAspect {
@Before("@annotation(customLogging)")
public void addCustomLogging(CustomLogging customLogging) {
log.info("Logging some information");
}
}
PollableStreamListener
注释:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {
}
PollableStreamListenerAspect
类:
@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint,
PollableStreamListener pollableStreamListener, Object dataCapsule) {
if (dataCapsule instanceof Message) {
Message<?> message = (Message<?>) dataCapsule;
AcknowledgmentCallback callback = StaticMessageHeaderAccessor
.getAcknowledgmentCallback(message);
callback.noAutoAck();
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
callback.acknowledge(Status.ACCEPT);
} catch (Throwable e) {
callback.acknowledge(Status.REJECT);
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
callback.acknowledge(Status.REQUEUE);
log.info("Re-queue");
}
}
}
}
我们有一个名为 CleanupController
的类它按照时间表定期执行。
CleanupController
类:
@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
public void pollForDeletionRequest() {
log.trace("Polling for new messages");
cleanupInput.poll(cleanupSubmissionService::submitDeletion);
}
当计划执行时,它会调用另一个类中的方法,该方法同时带有PollableStreamListener
注释和CustomLogging
。我添加了 Thread.sleep()
模仿需要一段时间执行的方法。
@PollableStreamListener
@CustomLogging
public void submitDeletion(Message<?> received) {
try {
log.info("Starting processing");
Thread.sleep(10000);
log.info("Finished processing");
} catch (Exception e) {
log.info("Error", e);
}
}
我面临的问题是 CustomLogging
产生的输出每次我们使用 @Schedule
轮询新消息时都会打印,但我只希望它在实际执行带注释的方法时打印(这可能现在发生,或者将来可能发生,具体取决于当前是否正在处理另一条消息)。这会导致日志消息困惑,因为它意味着该消息正在被处理,而实际上它已被重新排队以供将来执行。
有什么方法可以让这些注释很好地协同工作,以便CustomLogging
仅当带注释的方法执行时才会发生输出?
更新为使用 @Order
上PollableStreamListener
根据@dunni的建议,我对上面的原始示例进行了以下更改。
将 PollableStreamListenerAspect
的顺序设置为 1 :
@Aspect
@Component
@Slf4j
@Order(value = 1)
public class PollableStreamListenerAspect {
...
}
将 CustomLoggingAspect
的阶数增加到 2 :
@Aspect
@Component
@Slf4j
@Order(value = 2)
public class CustomLoggingAspect {
...
}
我发现进行这些更改后,轮询根本无法检测到新请求。这是 PollableStreamListenerAspect
上的更改引入了这个问题(我注释掉了该行并重新运行它,事情的表现与之前一样)。
更新为使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)
上PollableStreamListener
我已经更新了 PollableStreamListener
使用HIGHEST_PRECEDENCE
并更新@Around
值:
@Aspect
@Component
@Slf4j
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
private final ExecutorService executor = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener)")
public void receiveMessage(ProceedingJoinPoint joinPoint) {
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
// Call method to process this Kafka message
joinPoint.proceed();
} catch (Throwable e) {
e.printStackTrace();
throw new PollableStreamListenerException(e);
} finally {
paused = false;
}
};
executor.submit(runnable);
} else {
// The separate thread is busy with a previous message, so re-queue this message for later:
log.info("Re-queue");
}
}
}
这现在部分有效。当我发送 Kafka 消息时,它会被处理,并且来自 CustomLogging
的日志记录仅当未处理另一条 Kafka 消息时才会打印注释。到目前为止一切顺利。
下一个挑战是获得@Around
接受Message
这是通过 Kafka 提供的。我尝试使用上面的示例进行此操作,并更改了以下几行:
@Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener) && args(dataCapsule,..)")
public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) {
...
}
服务器正常启动,但是当我发布 Kafka 消息时,出现以下异常:
2021-04-22 10:38:00,055 ERROR [scheduling-1] org.springframework.core.log.LogAccessor: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation), failedMessage=GenericMessage...
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:330)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.handle(DefaultPollableMessageSource.java:361)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:219)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:200)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:68)
at xyx.pollForDeletionRequest(CleanupController.java:35)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:596)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at xyz.CleanupSubmissionServiceImpl$$EnhancerBySpringCGLIB$$8737f6f8.submitDeletion(<generated>)
at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:327)
... 17 more
最佳答案
因为this problem ,您需要在 PollableStreamListenerAspect
上使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)
。确实很奇怪,但它会按照你希望的方式工作。不过,IMO 这个问题应该在 Spring 得到解决。必须使用此解决方法是丑陋的,并且仅当异步调用 proceed()
的方面实际上具有最高优先级时才有效,但情况并非总是如此。作为替代方案,您可以使用 native AspectJ 及其自己的声明建议优先级的概念,该概念独立于 Spring 内部。
这是您的应用程序的简化版本,如 MCVE :
注释:
package de.scrum_master.spring.q67155048;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {}
package de.scrum_master.spring.q67155048;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {}
带有带有两个注释的方法的组件:
package de.scrum_master.spring.q67155048;
import org.springframework.stereotype.Component;
@Component
public class MyComponent {
private int counter = 0;
@PollableStreamListener
@CustomLogging
public void submitDeletion() {
try {
System.out.println(" Starting processing #" + ++counter);
Thread.sleep(1000);
System.out.println(" Finished processing #" + counter);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
方面:
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class CustomLoggingAspect {
@Before("@annotation(de.scrum_master.spring.q67155048.CustomLogging)")
public void addCustomLogging() {
System.out.println("Logging");
}
}
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener)")
public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
驱动程序应用程序:
应用程序每 500 毫秒调用一次目标方法,但执行需要 1,000 毫秒。因此,在这种情况下,我们预计大约 50% 的调用会重新排队,而不会进行任何日志记录,因为较高优先级方面不会继续执行目标方法。
package de.scrum_master.spring.q67155048;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
doStuff(appContext);
}
}
private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
MyComponent myComponent = appContext.getBean(MyComponent.class);
for (int i = 0; i < 10; i++) {
myComponent.submitDeletion();
Thread.sleep(500);
}
// This is just to make the application exit cleanly
PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
}
}
控制台日志:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.1.8.RELEASE)
2021-04-20 12:56:03.675 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Starting DemoApplication on Xander-Ultrabook with PID 13560 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
...
2021-04-20 12:56:07.666 INFO 13560 --- [ main] d.s.spring.q67155048.DemoApplication : Started DemoApplication in 4.65 seconds (JVM running for 7.181)
Receiving message
Logging
Starting processing #1
Receiving message
Re-queue
Finished processing #1
Receiving message
Logging
Starting processing #2
Receiving message
Re-queue
Finished processing #2
Receiving message
Logging
Starting processing #3
Receiving message
Re-queue
Finished processing #3
Receiving message
Logging
Starting processing #4
Receiving message
Re-queue
Finished processing #4
Receiving message
Logging
Starting processing #5
Receiving message
Re-queue
Finished processing #5
2021-04-20 12:56:12.767 INFO 13560 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
...
看到了吗?我们计算了 10 次“接收消息”,但只有 5 次“重新排队”和 5 次“记录”。请注意,我对处理调用进行了编号,因为它们是异步的。这样,在开始和结束时就更容易理解。
根据用户评论进行更新:
我已经更新了我的 MCVE,以便重现您的参数绑定(bind)问题。新的或更改的文件是:
package de.scrum_master.spring.q67155048;
public class Message<T> {
private T content;
public Message(T content) {
this.content = content;
}
@Override
public String toString() {
return "Message{" +
"content=" + content +
'}';
}
}
package de.scrum_master.spring.q67155048;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@Configuration
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
doStuff(appContext);
}
}
private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
MyComponent myComponent = appContext.getBean(MyComponent.class);
Message<String> message = new Message<>("Hi there!");
for (int i = 0; i < 10; i++) {
myComponent.submitDeletion(message);
Thread.sleep(500);
}
// This is just to make the application exit cleanly
PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
}
}
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
System.out.println("dataCapsule = " + dataCapsule);
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
根据您自己的经验,这会产生:
Exception in thread "main" java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at de.scrum_master.spring.q67155048.MyComponent$$EnhancerBySpringCGLIB$$4baa410d.submitDeletion(<generated>)
at de.scrum_master.spring.q67155048.DemoApplication.doStuff(DemoApplication.java:21)
at de.scrum_master.spring.q67155048.DemoApplication.main(DemoApplication.java:13)
您正在点击this problem ,我已经评论了关闭的Spring issue #16956关于它,希望能得到重新处理并有人修复它。
目前,您的解决方法不是使用优雅的 AOP 参数绑定(bind),而是使用 JoinPoint.getArgs()
手动获取参数:
package de.scrum_master.spring.q67155048;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private volatile boolean paused = false;
//@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && execution(* *(*, ..))")
//public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
Object dataCapsule = joinPoint.getArgs()[0];
System.out.println("Receiving message");
if (!paused) {
// The separate thread is not busy with a previous message, so process this message:
Runnable runnable = () -> {
try {
paused = true;
System.out.println("dataCapsule = " + dataCapsule);
joinPoint.proceed();
}
catch (Throwable throwable) {
throwable.printStackTrace();
}
finally {
paused = false;
}
};
EXECUTOR_SERVICE.submit(runnable);
}
else {
System.out.println(" Re-queue");
}
}
}
现在它又像这样工作了:
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #1, message = Message{content=Hi there!}
Receiving message
Re-queue
Receiving message
Re-queue
Finished processing #1, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #2, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #2, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #3, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #3, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #4, message = Message{content=Hi there!}
Receiving message
Re-queue
Finished processing #4, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
Starting processing #5, message = Message{content=Hi there!}
关于java - 当一个 @Around 建议无法继续时,建议优先级问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67155048/