我遇到了 NullPointerException
在使用 Ratpack 的 Promise.cache
时在 Ratpack 的内脏中结合多个下游 promise 和ParallelBatch
,并且从文档中我不清楚我的用法是否不正确,或者这是否代表 Ratpack 中的错误。
这是一个演示问题的简化测试用例:
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 25; i++) {
Promise<Integer> p = Promise.value(12);
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
final List<Integer> results = ExecHarness.yieldSingle(c ->
ParallelBatch.of(promises).yield()
).getValueOrThrow();
}
在本地运行此测试 10000 次导致失败率约为 10/10000,
NullPointerException
看起来像这样:java.lang.NullPointerException
at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
不使用
cache
在这个测试用例中,问题消失了,因为没有两次订阅每个缓存的 promise 。我的问题是:这是对 Ratpack API 的错误使用,还是代表框架中的错误?如果是前者,你能指出我在文档中解释为什么这种用法是错误的吗?
最佳答案
即使您的示例不是缓存 Promise 的最佳用例(重新创建和缓存在每个迭代步骤中保持相同值的 Promise 没有多大意义),您实际上在 CachingUpstream<T>
中发现了一个竞争条件错误。类(class)。
我做了一些实验来弄清楚发生了什么,这是我的发现。首先,我创建了一个值(value) promise 12
提供 CachingUpstream<T>
的自定义(更详细)实现目的。我拿走了Promise.value(12)
的尸体我已经覆盖了内联方法 cacheResultIf(Predicate<? super ExecResult<T>> shouldCache)
默认返回 CachingUpstream<T>
实例:
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
接下来我创建了一个类
TestCachingUpstream<T>
只需复制原始类(class)的主体,我就添加了一些东西,例如TestCachingUpstream<T>
具有内部 ID(随机 UUID)以更轻松地跟踪 promise 的执行。 我没有更改方法的实现,我只是想跟踪执行流程并保持原始实现不变。我的自定义类如下所示:
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
我已将 for 循环中的步骤数从 25 减少到 3,以使控制台输出更简洁。
成功测试执行(无竞争条件)
让我们看一下正确执行的流程是什么样的:
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...
正如您所看到的,每次迭代都会导致缓存的 Promise 产生 5 个控制台日志行。
tryDrain
第一次调用方法,没有缓存结果,它下降到yield(downstream);
方法调用yield(downstream)
成功完成和receiveResult(downstream, ExecResult.of(Result.success(value)));
从内部调用 success
回调 Promise.cache()
通过使用负持续时间来使用无限到期日期,这就是 receiveResult()
的原因方法releases upstream
object by setting it's value to null
receiveResult()
完成前的方法sets cached result using ref
internal object并调用tryDrain()
在退出方法之前。 tryDrain()
方法sees previously cached result用于缓存 promise (p.map(v -> v + 2)
)的下一次调用,因此它将缓存结果直接传递给下游。 对于在 for 循环中创建的所有 3 个 Promise,这种情况会重复。
测试执行失败(竞争条件)
使用这些
System.out.printf()
运行测试使测试失败的次数减少了几次,主要是因为此 I/O 操作消耗了一些 CPU 周期,并且不同步的部分代码有更多的周期来避免竞争条件。但是它仍然会发生,现在让我们看看失败测试的输出是什么样的:[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12})
[8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null})
java.lang.NullPointerException
at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
这是失败测试的输出 - 我在 IntelliJ IDEA 中运行它,并且我已将这个测试的执行配置为重复执行直到失败。我花了一些时间才让这个测试失败,但是在运行这个测试几次之后,它最终在迭代号 1500 左右失败了。在这种情况下,我们可以看到在 for 循环中创建的第一个 Promise 发生了竞争条件。可以看到
receiveResult()
里面释放上游对象后方法[088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12})
并调用
tryDrain
在退出该方法之前,缓存 promise 的下一次执行还没有看到先前缓存的结果,它运行到 yield(downstream)
再次方法。之后 upstream
对象已通过将其值设置为 null
来释放.和yield(downstream)
方法期望上游对象正确初始化,否则抛出 NPE。我试图调试方法:
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
这是决定是否需要获取缓存的 promise 的方法。然而,当我添加任何日志语句时,它开始导致
StackOverflowError
.我猜在极少数情况下cached.expireAt.isBefore(clock.instant())
返回 false
, 因为 cached
对象来自 AtomicReference
所以这个对象应该在方法执行之间正确传递。这是我在实验中使用的完整测试类:
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import ratpack.exec.*;
import ratpack.exec.internal.CompleteExecResult;
import ratpack.exec.internal.DefaultExecution;
import ratpack.exec.internal.DefaultPromise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.test.exec.ExecHarness;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class AnotherPromiseTest {
@Test
public void foo() throws Exception {
List<Promise<Integer>> promises = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
continuation.resume(() -> down.success(12))
)) {
@Override
public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
return transform(up -> {
return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
});
}
};
p = p.cache();
promises.add(p.map(v -> v + 1));
promises.add(p.map(v -> v + 2));
}
ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
}
private static class TestCachingUpstream<T> implements Upstream<T> {
private final String id = UUID.randomUUID().toString();
private Upstream<? extends T> upstream;
private final Clock clock;
private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
private final Function<? super ExecResult<T>, Duration> ttlFunc;
private final AtomicBoolean pending = new AtomicBoolean();
private final AtomicBoolean draining = new AtomicBoolean();
private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
this(upstream, ttl, Clock.systemUTC());
}
@VisibleForTesting
TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
this.upstream = upstream;
this.ttlFunc = ttl;
this.clock = clock;
}
private void tryDrain() {
if (draining.compareAndSet(false, true)) {
try {
TestCachingUpstream.Cached<? extends T> cached = ref.get();
if (needsFetch(cached)) {
if (pending.compareAndSet(false, true)) {
Downstream<? super T> downstream = waiting.poll();
System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
if (downstream == null) {
pending.set(false);
} else {
try {
yield(downstream);
} catch (Throwable e) {
System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
receiveResult(downstream, ExecResult.of(Result.error(e)));
}
}
}
} else {
System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
Downstream<? super T> downstream = waiting.poll();
while (downstream != null) {
downstream.accept(cached.result);
downstream = waiting.poll();
}
}
} finally {
draining.set(false);
}
}
if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
tryDrain();
}
}
private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
}
private void yield(final Downstream<? super T> downstream) throws Exception {
System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
upstream.connect(new Downstream<T>() {
public void error(Throwable throwable) {
System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
}
@Override
public void success(T value) {
System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
receiveResult(downstream, ExecResult.of(Result.success(value)));
}
@Override
public void complete() {
System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
receiveResult(downstream, CompleteExecResult.get());
}
});
}
@Override
public void connect(Downstream<? super T> downstream) throws Exception {
TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
if (needsFetch(cached)) {
Promise.<T>async(d -> {
waiting.add(d);
tryDrain();
}).result(downstream::accept);
} else {
downstream.accept(cached.result);
}
}
private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
Duration ttl = Duration.ofSeconds(0);
try {
ttl = ttlFunc.apply(result);
} catch (Throwable e) {
if (result.isError()) {
//noinspection ThrowableResultOfMethodCallIgnored
result.getThrowable().addSuppressed(e);
} else {
result = ExecResult.of(Result.error(e));
}
}
Instant expiresAt;
if (ttl.isNegative()) {
expiresAt = null; // eternal
System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
upstream = null; // release
} else if (ttl.isZero()) {
expiresAt = clock.instant().minus(Duration.ofSeconds(1));
} else {
expiresAt = clock.instant().plus(ttl);
}
ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
pending.set(false);
downstream.accept(result);
tryDrain();
}
static class Cached<T> {
final ExecResult<T> result;
final Instant expireAt;
Cached(ExecResult<T> result, Instant expireAt) {
this.result = result;
this.expireAt = expireAt;
}
}
}
}
希望能帮助到你。
关于java - Ratpack 的 Promise.cache 与 ParallelBatch 中的多个下游 promise ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50824081/