java - 使用 AspectJ 拦截请求范围内的所有 JDBC 调用并作为响应返回

标签 java multithreading spring-boot aspectj executorservice

我在为我的数据服务(Spring boot 应用程序)实现查询分析器以进行调试时遇到问题。任何帮助将不胜感激。

问题陈述:

我需要返回针对特定端点执行的所有数据库查询以及响应。

我的方法:

我创建了一个请求范围组件并在方面中 Autowiring ,填充请求范围对象中的查询并将其注入(inject)响应。我已提供以下所需的所有文件。

问题:

一些端点在多个线程中执行查询。我收到错误,但能够使用 simpleThreadScope 修复错误。但是,我无法看到线程执行的任何查询(我可以看到在线程外部执行的查询)。您能否帮助我在响应中的线程内执行查询?

AspectJ 配置:

<plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <executions>
                <execution>
                    <id>default-compile</id>
                    <phase>none</phase>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>aspectj-maven-plugin</artifactId>
            <version>1.7</version>
            <configuration>
                <aspectDirectory>aspect</aspectDirectory>
                <complianceLevel>1.8</complianceLevel>
            </configuration>
            <dependencies>
                <dependency>
                    <groupId>org.aspectj</groupId>
                    <artifactId>aspectjtools</artifactId>
                    <version>1.8.10</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <!-- Compile and weave aspects after all classes compiled by javac -->
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

方面:

@Aspect
@Component
public class QueryProfilerAspect {
    private static final Logger logger = LoggerFactory
            .getLogger(QueryProfilerAspect.class);
    private static String TIME_FORMAT = "HH:mm:ss.SSS";
    private static String QUERY_PROFILER = "QueryProfiler";

    @Autowired
    QueryProfile queryProfile;

    @Pointcut("(call(* org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations.query*(..)) && args(query,params,..))")
    public void anyJDBCOperations(String query, Map params) {
    }

    @Pointcut("execution(* *(..)) && @annotation(org.springframework.web.bind.annotation.PostMapping) || @annotation(org.springframework.web.bind.annotation.PutMapping) || @annotation(org.springframework.web.bind.annotation.DeleteMapping) || @annotation(org.springframework.web.bind.annotation.GetMapping)")
    private void anyGetPutPostDeleteMappingMethodPointCut() {
        // pointcut
    }

    @Pointcut("execution(* *(..)) && @annotation(org.springframework.web.bind.annotation.RequestMapping)")
    private void anyRequestMappingMethodPointCut() {
        // pointcut
    }

    @Around("anyJDBCOperations(sqlQuery, params)")
    public Object log(ProceedingJoinPoint jp, String sqlQuery, Map params)
            throws Throwable {
            long start = System.currentTimeMillis();
            Object output = jp.proceed();
            long elapsedTime = System.currentTimeMillis() - start;

            DataSource dataSource = ((JdbcTemplate) ((NamedParameterJdbcOperations) jp
                    .getTarget()).getJdbcOperations()).getDataSource();

            if (params instanceof Map && !params.isEmpty()) {
                logger.debug("inside instance of MAP!!!! ::param {}", params);
                sqlQuery = replaceMap(sqlQuery, (Map<?, ?>) params);
            }

            queryProfile.getQuery().add(sqlQuery);
            logger.info("Intercepted Query is::: {}", sqlQuery);
            return output;
    }

    @AfterReturning(value = "anyRequestMappingMethodPointCut() || anyGetPutPostDeleteMappingMethodPointCut()", returning = "returnVal")
    public void anyPublicControllerMethod(JoinPoint jp,
            ResponseEntity returnVal)
            throws Throwable {
            HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder
                    .currentRequestAttributes()).getRequest();

            QueryProfile qp = new QueryProfile(queryProfile);
            qp.setRequestURL(request.getRequestURL().toString());
            qp.setHostName(getHostName());

            Object responseBody = returnVal.getBody();
            if (responseBody instanceof ResponseDTO) {
                List<QueryProfile> profileList = new ArrayList<>();
                if (((ResponseDTO) responseBody).getMeta().get(QUERY_PROFILER)
                        != null) {
                    profileList.add((QueryProfile) ((ResponseDTO) responseBody)
                            .getMeta().get(QUERY_PROFILER));
                }
                profileList.add(qp);
                ((ResponseDTO) responseBody)
                        .addMeta(QUERY_PROFILER, profileList);
            }
    }
}

请求范围对象:

@Component
@Scope(value = "simpleThreadScope", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class QueryProfile {
    private String hostName;
    private String requestURL;
    private Long duration;
    private String time;
    private String dataSource;
    private List<String> query = new ArrayList<>();

    public QueryProfile() {
        //Default constructor
    }

    public QueryProfile(QueryProfile qp) {
        setHostName(qp.getHostName());
        setRequestURL(qp.getRequestURL());
        setDataSource(qp.getDataSource());
        setDuration(qp.getDuration());
        setTime(qp.getTime());
        setQuery(qp.getQuery());
    }

    public String getHostName() {
        return hostName;
    }

    public void setHostName(String hostName) {
        this.hostName = hostName;
    }

    public String getRequestURL() {
        return requestURL;
    }

    public void setRequestURL(String requestURL) {
        this.requestURL = requestURL;
    }

    public Long getDuration() {
        return duration;
    }

    public void setDuration(Long duration) {
        this.duration = duration;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getDataSource() {
        return dataSource;
    }

    public void setDataSource(String dataSource) {
        this.dataSource = dataSource;
    }

    public List<String> getQuery() {
        return query;
    }

    public void setQuery(List<String> query) {
        this.query = query;
    }
}

SimpleTheradScope 配置:

@Configuration
public class MainConfig implements BeanFactoryAware {

    private static final Logger logger = LoggerFactory.getLogger(MainConfig.class);

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        if (beanFactory instanceof ConfigurableBeanFactory) {

            logger.info("MainConfig is backed by a ConfigurableBeanFactory");
            ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;

            /*Notice:
             *org.springframework.beans.factory.config.Scope
             * !=
             *org.springframework.context.annotation.Scope
             */
            org.springframework.beans.factory.config.Scope simpleThreadScope = new SimpleThreadScope();
            cbf.registerScope("simpleThreadScope", simpleThreadScope);

            /*why the following? Because "Spring Social" gets the HTTP request's username from
             *SecurityContextHolder.getContext().getAuthentication() ... and this 
             *by default only has a ThreadLocal strategy...
             *also see http://stackoverflow.com/a/3468965/923560 
             */
            SecurityContextHolder.setStrategyName(SecurityContextHolder.MODE_INHERITABLETHREADLOCAL);
        }
        else {
            logger.info("MainConfig is not backed by a ConfigurableBeanFactory");
        } 
    }
}

我的线程执行器类:在该类中,我创建了一个可运行任务列表(调用 DAO 方法来执行查询)并调用下面的类方法来并行执行它们。

public class ThreadUtils {
    protected static final Logger logger = LoggerFactory
            .getLogger(ThreadUtils.class);

    public static void executeInParallel(List<Runnable> runnableTasks) {
        ExecutorService executorService = new DelegatingSecurityContextExecutorService(
                Executors.newFixedThreadPool(5),
                SecurityContextHolder.getContext());

        CompletableFuture<?>[] futures = runnableTasks.stream()
                .map(task -> CompletableFuture.runAsync(task, executorService))
                .toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        executorService.shutdown();
    }

我的回复是:

"data":{},
"meta": {
        "QueryProfiler": [
            {
                "hostName": "xxx.xx.com",
                "requestURL": "http://localhost:7010/abc/5",
                "duration": null,
                "time": null,
                "dataSource": null,
                "query": [
                    "SELECT * FROM table1",
                    "SELECT COUNT(1) FROM table2 WHERE abc = 1 AND def = 2"
                ]
            }
        ]
    }

最佳答案

我通过放弃请求范围对象并实现 InheritableThreadLocal 找到了问题的解决方案。该变量还具有将上下文委托(delegate)给其子线程的属性。因此,在我的方面使用 InheritableThreadLocal 变量并向该变量添加查询,我能够在返回响应时注入(inject)该变量。

除了上面的代码之外,我还添加了以下代码以使其正常工作:

private static InheritableThreadLocal<QueryProfile> queryProfile = new InheritableThreadLocal<>();
public static QueryProfile getQueryProfile(){
    if(queryProfile.get() == null){
        queryProfile.set(new QueryProfile());
        logger.info("Profiler is null. Setting with new value");
    }
    return queryProfile.get();
}
public static void setQueryProfile(QueryProfile qp){
    queryProfile.set(qp);
}

关于java - 使用 AspectJ 拦截请求范围内的所有 JDBC 调用并作为响应返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52610512/

相关文章:

java - Spring Mail - 在单独的线程中发送电子邮件找不到附件临时文件

java - 使用java当系统空闲一段时间后自动注销

java - 安卓 : sms receiver is not working

java - 如何查看JVM中JIT编译的代码?

java - Thread Pool实现代码中的疑问

java - 创建 Query like "A is "b"and C is '' "(Spring Boot/Spring Data w/Mongo)

JAVA:将任何文件正确地流式传输到浏览器

c++ - 如何在程序启动时启动 C++ 线程?

multithreading - 具有多个生产 session 的单个 JMS 连接何时开始成为瓶颈?

java - javers 是否可以忽略 Java 对象的序列更改?