java - 当一个 @Around 建议无法继续时,建议优先级问题

标签 java spring annotations spring-aop

更新为使用附加信息重新表述问题

我们有两个注释:

  • CustomLogging
  • PollableStreamListener

两者都是使用 Spring AOP 的方面来实现的。

CustomLogging注释:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {

}

CustomLoggingAspect类:

@Aspect
@Component
@Slf4j
@Order(value = 1)
public class CustomLoggingAspect {

  @Before("@annotation(customLogging)")
  public void addCustomLogging(CustomLogging customLogging) {
    log.info("Logging some information");
  }

}

PollableStreamListener注释:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {

}

PollableStreamListenerAspect类:

@Aspect
@Component
@Slf4j
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(pollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint,
      PollableStreamListener pollableStreamListener, Object dataCapsule) {
    if (dataCapsule instanceof Message) {
      Message<?> message = (Message<?>) dataCapsule;
      AcknowledgmentCallback callback = StaticMessageHeaderAccessor
          .getAcknowledgmentCallback(message);
      callback.noAutoAck();

      if (!paused) {
        // The separate thread is not busy with a previous message, so process this message:
        Runnable runnable = () -> {
          try {
            paused = true;

            // Call method to process this Kafka message
            joinPoint.proceed();

            callback.acknowledge(Status.ACCEPT);
          } catch (Throwable e) {
            callback.acknowledge(Status.REJECT);
            throw new PollableStreamListenerException(e);
          } finally {
            paused = false;
          }
        };

        executor.submit(runnable);
      } else {
        // The separate thread is busy with a previous message, so re-queue this message for later:
        callback.acknowledge(Status.REQUEUE);
        log.info("Re-queue");
      }
    }
  }

}

我们有一个名为 CleanupController 的类它按照时间表定期执行。

CleanupController类:

@Scheduled(fixedDelayString = "${app.pollable-consumer.time-interval}")
  public void pollForDeletionRequest() {
    log.trace("Polling for new messages");
    cleanupInput.poll(cleanupSubmissionService::submitDeletion);
  }

当计划执行时,它会调用另一个类中的方法,该方法同时带有PollableStreamListener注释和CustomLogging 。我添加了 Thread.sleep()模仿需要一段时间执行的方法。

@PollableStreamListener
  @CustomLogging
  public void submitDeletion(Message<?> received) {
    try {
      log.info("Starting processing");
      Thread.sleep(10000);
      log.info("Finished processing");
    } catch (Exception e) {
      log.info("Error", e);
    }
  }

我面临的问题是 CustomLogging 产生的输出每次我们使用 @Schedule 轮询新消息时都会打印,但我只希望它在实际执行带注释的方法时打印(这可能现在发生,或者将来可能发生,具体取决于当前是否正在处理另一条消息)。这会导致日志消息困惑,因为它意味着该消息正在被处理,而实际上它已被重新排队以供将来执行。

有什么方法可以让这些注释很好地协同工作,以便CustomLogging仅当带注释的方法执行时才会发生输出?


更新为使用 @OrderPollableStreamListener

根据@dunni的建议,我对上面的原始示例进行了以下更改。

PollableStreamListenerAspect 的顺序设置为 1 :

@Aspect
@Component
@Slf4j
@Order(value = 1)
public class PollableStreamListenerAspect {
...
}

CustomLoggingAspect 的阶数增加到 2 :

@Aspect
@Component
@Slf4j
@Order(value = 2)
public class CustomLoggingAspect {
...
}

我发现进行这些更改后,轮询根本无法检测到新请求。这是 PollableStreamListenerAspect 上的更改引入了这个问题(我注释掉了该行并重新运行它,事情的表现与之前一样)。


更新为使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)PollableStreamListener

我已经更新了 PollableStreamListener使用HIGHEST_PRECEDENCE并更新@Around值:

@Aspect
@Component
@Slf4j
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {

  private final ExecutorService executor = Executors.newFixedThreadPool(1);

  private volatile boolean paused = false;

  @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener)")
  public void receiveMessage(ProceedingJoinPoint joinPoint) {
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;

          // Call method to process this Kafka message
          joinPoint.proceed();
        } catch (Throwable e) {
          e.printStackTrace();
          throw new PollableStreamListenerException(e);
        } finally {
          paused = false;
        }
      };

      executor.submit(runnable);
    } else {
      // The separate thread is busy with a previous message, so re-queue this message for later:
      log.info("Re-queue");
    }
  }
}

这现在部分有效。当我发送 Kafka 消息时,它会被处理,并且来自 CustomLogging 的日志记录仅当未处理另一条 Kafka 消息时才会打印注释。到目前为止一切顺利。

下一个挑战是获得@Around接受Message这是通过 Kafka 提供的。我尝试使用上面的示例进行此操作,并更改了以下几行:

  @Around(value = "@annotation(au.com.brolly.commons.stream.PollableStreamListener) && args(dataCapsule,..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) {
...
}

服务器正常启动,但是当我发布 Kafka 消息时,出现以下异常:

2021-04-22 10:38:00,055 ERROR [scheduling-1] org.springframework.core.log.LogAccessor: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation), failedMessage=GenericMessage...
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:330)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.handle(DefaultPollableMessageSource.java:361)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:219)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:200)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.poll(DefaultPollableMessageSource.java:68)
    at xyx.pollForDeletionRequest(CleanupController.java:35)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:596)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:624)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:72)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at xyz.CleanupSubmissionServiceImpl$$EnhancerBySpringCGLIB$$8737f6f8.submitDeletion(<generated>)
    at org.springframework.cloud.stream.binder.DefaultPollableMessageSource.doHandleMessage(DefaultPollableMessageSource.java:327)
    ... 17 more

最佳答案

因为this problem ,您需要在 PollableStreamListenerAspect 上使用 @Order(value = Ordered.HIGHEST_PRECEDENCE)。确实很奇怪,但它会按照你希望的方式工作。不过,IMO 这个问题应该在 Spring 得到解决。必须使用此解决方法是丑陋的,并且仅当异步调用 proceed() 的方面实际上具有最高优先级时才有效,但情况并非总是如此。作为替代方案,您可以使用 native AspectJ 及其自己的声明建议优先级的概念,该概念独立于 Spring 内部。

这是您的应用程序的简化版本,如 MCVE :

注释:

package de.scrum_master.spring.q67155048;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CustomLogging {}
package de.scrum_master.spring.q67155048;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PollableStreamListener {}

带有带有两个注释的方法的组件:

package de.scrum_master.spring.q67155048;

import org.springframework.stereotype.Component;

@Component
public class MyComponent {
  private int counter = 0;

  @PollableStreamListener
  @CustomLogging
  public void submitDeletion() {
    try {
      System.out.println("  Starting processing #" + ++counter);
      Thread.sleep(1000);
      System.out.println("  Finished processing #" + counter);
    }
    catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

方面:

package de.scrum_master.spring.q67155048;

import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class CustomLoggingAspect {
  @Before("@annotation(de.scrum_master.spring.q67155048.CustomLogging)")
  public void addCustomLogging() {
    System.out.println("Logging");
  }
}
package de.scrum_master.spring.q67155048;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
  public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
  private volatile boolean paused = false;

  @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener)")
  public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
    System.out.println("Receiving message");
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;
          joinPoint.proceed();
        }
        catch (Throwable throwable) {
          throwable.printStackTrace();
        }
        finally {
          paused = false;
        }
      };
      EXECUTOR_SERVICE.submit(runnable);
    }
    else {
      System.out.println("  Re-queue");
    }
  }
}

驱动程序应用程序:

应用程序每 500 毫秒调用一次目标方法,但执行需要 1,000 毫秒。因此,在这种情况下,我们预计大约 50% 的调用会重新排队,而不会进行任何日志记录,因为较高优先级方面不会继续执行目标方法。

package de.scrum_master.spring.q67155048;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@Configuration
public class DemoApplication {
  public static void main(String[] args) throws InterruptedException {
    try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
      doStuff(appContext);
    }
  }

  private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
    MyComponent myComponent = appContext.getBean(MyComponent.class);
    for (int i = 0; i < 10; i++) {
      myComponent.submitDeletion();
      Thread.sleep(500);
    }
    // This is just to make the application exit cleanly
    PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
  }
}

控制台日志:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.8.RELEASE)

2021-04-20 12:56:03.675  INFO 13560 --- [           main] d.s.spring.q67155048.DemoApplication     : Starting DemoApplication on Xander-Ultrabook with PID 13560 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
...
2021-04-20 12:56:07.666  INFO 13560 --- [           main] d.s.spring.q67155048.DemoApplication     : Started DemoApplication in 4.65 seconds (JVM running for 7.181)
Receiving message
Logging
  Starting processing #1
Receiving message
  Re-queue
  Finished processing #1
Receiving message
Logging
  Starting processing #2
Receiving message
  Re-queue
  Finished processing #2
Receiving message
Logging
  Starting processing #3
Receiving message
  Re-queue
  Finished processing #3
Receiving message
Logging
  Starting processing #4
Receiving message
  Re-queue
  Finished processing #4
Receiving message
Logging
  Starting processing #5
Receiving message
  Re-queue
  Finished processing #5
2021-04-20 12:56:12.767  INFO 13560 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
...

看到了吗?我们计算了 10 次“接收消息”,但只有 5 次“重新排队”和 5 次“记录”。请注意,我对处理调用进行了编号,因为它们是异步的。这样,在开始和结束时就更容易理解。


根据用户评论进行更新:

我已经更新了我的 MCVE,以便重现您的参数绑定(bind)问题。新的或更改的文件是:

package de.scrum_master.spring.q67155048;

public class Message<T> {
  private T content;

  public Message(T content) {
    this.content = content;
  }

  @Override
  public String toString() {
    return "Message{" +
      "content=" + content +
      '}';
  }
}
package de.scrum_master.spring.q67155048;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;

@SpringBootApplication
@Configuration
public class DemoApplication {
  public static void main(String[] args) throws InterruptedException {
    try (ConfigurableApplicationContext appContext = SpringApplication.run(DemoApplication.class, args)) {
      doStuff(appContext);
    }
  }

  private static void doStuff(ConfigurableApplicationContext appContext) throws InterruptedException {
    MyComponent myComponent = appContext.getBean(MyComponent.class);
    Message<String> message = new Message<>("Hi there!");
    for (int i = 0; i < 10; i++) {
      myComponent.submitDeletion(message);
      Thread.sleep(500);
    }
    // This is just to make the application exit cleanly
    PollableStreamListenerAspect.EXECUTOR_SERVICE.shutdown();
  }
}
package de.scrum_master.spring.q67155048;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
  public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
  private volatile boolean paused = false;

  @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
  public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
    System.out.println("Receiving message");
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;
          System.out.println("dataCapsule = " + dataCapsule);
          joinPoint.proceed();
        }
        catch (Throwable throwable) {
          throwable.printStackTrace();
        }
        finally {
          paused = false;
        }
      };
      EXECUTOR_SERVICE.submit(runnable);
    }
    else {
      System.out.println("  Re-queue");
    }
  }
}

根据您自己的经验,这会产生:

Exception in thread "main" java.lang.IllegalStateException: Required to bind 2 arguments, but only bound 1 (JoinPointMatch was NOT bound in invocation)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.argBinding(AbstractAspectJAdvice.java:605)
    at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
    at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
    at de.scrum_master.spring.q67155048.MyComponent$$EnhancerBySpringCGLIB$$4baa410d.submitDeletion(<generated>)
    at de.scrum_master.spring.q67155048.DemoApplication.doStuff(DemoApplication.java:21)
    at de.scrum_master.spring.q67155048.DemoApplication.main(DemoApplication.java:13)

您正在点击this problem ,我已经评论了关闭的Spring issue #16956关于它,希望能得到重新处理并有人修复它。

目前,您的解决方法不是使用优雅的 AOP 参数绑定(bind),而是使用 JoinPoint.getArgs() 手动获取参数:

package de.scrum_master.spring.q67155048;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Aspect
@Component
@Order(value = Ordered.HIGHEST_PRECEDENCE)
public class PollableStreamListenerAspect {
  public static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
  private volatile boolean paused = false;

  //@Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && args(dataCapsule, ..)")
  @Around("@annotation(de.scrum_master.spring.q67155048.PollableStreamListener) && execution(* *(*, ..))")
  //public void receiveMessage(ProceedingJoinPoint joinPoint, Object dataCapsule) throws Throwable {
  public void receiveMessage(ProceedingJoinPoint joinPoint) throws Throwable {
    Object dataCapsule = joinPoint.getArgs()[0];
    System.out.println("Receiving message");
    if (!paused) {
      // The separate thread is not busy with a previous message, so process this message:
      Runnable runnable = () -> {
        try {
          paused = true;
          System.out.println("dataCapsule = " + dataCapsule);
          joinPoint.proceed();
        }
        catch (Throwable throwable) {
          throwable.printStackTrace();
        }
        finally {
          paused = false;
        }
      };
      EXECUTOR_SERVICE.submit(runnable);
    }
    else {
      System.out.println("  Re-queue");
    }
  }
}

现在它又像这样工作了:

Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #1, message = Message{content=Hi there!}
Receiving message
  Re-queue
Receiving message
  Re-queue
  Finished processing #1, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #2, message = Message{content=Hi there!}
Receiving message
  Re-queue
  Finished processing #2, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #3, message = Message{content=Hi there!}
Receiving message
  Re-queue
  Finished processing #3, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #4, message = Message{content=Hi there!}
Receiving message
  Re-queue
  Finished processing #4, message = Message{content=Hi there!}
Receiving message
dataCapsule = Message{content=Hi there!}
Logging
  Starting processing #5, message = Message{content=Hi there!}

关于java - 当一个 @Around 建议无法继续时,建议优先级问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67155048/

相关文章:

java - 了解 Java 语法错误

java.lang.NoSuchMethodError : org. springframework.core.annotation.AnnotationUtils.clearCache()V

javascript - 在 Acrobat JS API 中将透明颜色设置为 strokeColor 会导致黑色边框

java - Ebean.Update 未将更新写入 DB2 数据库

java - 我可以覆盖方法类型上的@RequestMapping 吗?

java - JAXB异常处理

java - 当bean具有可变参数构造函数时,如何XML配置Spring bean以进行构造函数注入(inject)

java - 用@DataJpaTest 注释的测试不是用@Autowired 注释的 Autowiring 字段

java - 编写java注解用于计时方法调用

java - 删除hibernate实体而不(尝试)删除关联表( View )条目