java-8 - java 8 并行流与 ForkJoinPool 和 ThreadLocal

标签 java-8 parallel-processing java-stream forkjoinpool inheritable-thread-local

我们正在使用java 8并行流来处理任务,并且我们通过ForkJoinPool#submit提交任务。我们没有使用 jvm 范围的 ForkJoinPool.commonPool,而是创建自己的自定义池来指定并行性并将其存储为静态变量。

我们有验证框架,我们将表列表置于验证器列表中,然后通过自定义 ForkJoinPool 提交此作业,如下所示:

静态 ForkJoinPool forkJoinPool = new ForkJoinPool(4);

List<Table> tables = tableDAO.findAll();
ModelValidator<Table, ValidationResult> validator = ValidatorFactory
    .getInstance().getTableValidator();

List<ValidationResult> result = forkJoinPool.submit(
    () -> tables.stream()
                .parallel()
                .map(validator)
                .filter(result -> result.getValidationMessages().size() > 0)
                .collect(Collectors.toList())).get();

我们遇到的问题是,在下游组件中,在静态 ForkJoinPool 的单独线程上运行的各个验证器依赖tenant_id,它对于每个请求都是不同的,并存储在 InheritableThreadLocal 变量中。由于我们创建的是静态 ForkJoinPool,因此 ForkJoinPool 池中的线程只会在第一次创建时继承父线程的值。但这些池线程将不知道当前请求的新tenant_id。因此,对于后续执行,这些池线程使用旧的tenant_id。

我尝试创建一个自定义 ForkJoinPool 并在构造函数中指定 ForkJoinWorkerThreadFactory 并重写 onStart 方法以提供新的tenant_id。但这不起作用,因为 onStart 方法仅在创建时调用一次,而不是在单独执行时调用一次。

似乎我们需要像 ThreadPoolExecutor#beforeExecute 这样的东西,但在 ForkJoinPool 的情况下不可用。那么,如果我们想将当前线程本地值传递给静态池线程,我们有什么选择呢?

一种解决方法是为每个请求创建 ForkJoinPool,而不是使其静态,但我们不想这样做,以避免创建线程的昂贵性质。

我们有什么替代方案?

最佳答案

我发现以下解决方案无需更改任何底层代码即可工作。基本上,map 方法采用一个函数接口(interface),我将其表示为 lambda 表达式。此表达式添加了一个 preExecution Hook ,以在当前 ThreadLocal 中设置新的tenantId,并在 postExecution 中清理它。

forkJoinPool.submit(tables.stream()
        .parallel()
        .map((item) -> {
                    preExecution(tenantId);
                    try {
                        return validator.apply(item);
                    } finally {
                        postExecution();
                    }
                }
        )
        .filter(validationResult ->
                validationResult.getValidationMessages()
                        .size() > 0)
        .collect(Collectors.toList())).get();

关于java-8 - java 8 并行流与 ForkJoinPool 和 ThreadLocal,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63227575/

相关文章:

Java 8 映射集合并在集合为空时添加默认值

java - 我该如何解决这个问题?

r - 使用 foreach 包将行附加到数据框

java - 通过 Java 中的元素串联合并字符串列表

sorting - 合并和排序多个流 java 8

java - 通过将数组中的值与现有键相加来构造 HashMap,而不覆盖键

Java 8 forEach Stream() 与旧的 forEach 循环

java - 我应该尽可能使用并行流吗?

bash - 当后台进程结束时, `+` 之前的 `-` 、 ` ` 和 `Done` 符号是什么意思?

java - 删除 Java 8 流的元素