java - Ratpack 的 Promise.cache 与 ParallelBatch 中的多个下游 promise

标签 java concurrency promise ratpack

我遇到了 NullPointerException在使用 Ratpack 的 Promise.cache 时在 Ratpack 的内脏中结合多个下游 promise 和ParallelBatch ,并且从文档中我不清楚我的用法是否不正确,或者这是否代表 Ratpack 中的错误。

这是一个演示问题的简化测试用例:

@Test
public void foo() throws Exception {
    List<Promise<Integer>> promises = new ArrayList<>();

    for (int i = 0; i < 25; i++) {
        Promise<Integer> p = Promise.value(12);
        p = p.cache();
        promises.add(p.map(v -> v + 1));
        promises.add(p.map(v -> v + 2));
    }

    final List<Integer> results = ExecHarness.yieldSingle(c ->
            ParallelBatch.of(promises).yield()
    ).getValueOrThrow();
}

在本地运行此测试 10000 次导致失败率约为 10/10000,NullPointerException看起来像这样:
java.lang.NullPointerException
    at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
    at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
    at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
    at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)

不使用 cache在这个测试用例中,问题消失了,因为没有两次订阅每个缓存的 promise 。

我的问题是:这是对 Ratpack API 的错误使用,还是代表框架中的错误?如果是前者,你能指出我在文档中解释为什么这种用法是错误的吗?

最佳答案

即使您的示例不是缓存 Promise 的最佳用例(重新创建和缓存在每个迭代步骤中保持相同值的 Promise 没有多大意义),您实际上在 CachingUpstream<T> 中发现了一个竞争条件错误。类(class)。

我做了一些实验来弄清楚发生了什么,这是我的发现。首先,我创建了一个值(value) promise 12提供 CachingUpstream<T> 的自定义(更详细)实现目的。我拿走了Promise.value(12)的尸体我已经覆盖了内联方法 cacheResultIf(Predicate<? super ExecResult<T>> shouldCache) 默认返回 CachingUpstream<T>实例:

Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
        continuation.resume(() -> down.success(12))
)) {
    @Override
    public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
        return transform(up -> {
            return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
        });
    }
};

接下来我创建了一个类TestCachingUpstream<T>只需复制原始类(class)的主体,我就添加了一些东西,例如
  • 我每TestCachingUpstream<T>具有内部 ID(随机 UUID)以更轻松地跟踪 promise 的执行。
  • 当 Promise 执行期间发生特定事情时,我添加了一些详细的日志消息。

  • 我没有更改方法的实现,我只是想跟踪执行流程并保持原始实现不变。我的自定义类如下所示:
    private static class TestCachingUpstream<T> implements Upstream<T> {
        private final String id = UUID.randomUUID().toString();
    
        private Upstream<? extends T> upstream;
    
        private final Clock clock;
        private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
        private final Function<? super ExecResult<T>, Duration> ttlFunc;
    
        private final AtomicBoolean pending = new AtomicBoolean();
        private final AtomicBoolean draining = new AtomicBoolean();
        private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
    
        public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
            this(upstream, ttl, Clock.systemUTC());
        }
    
        @VisibleForTesting
        TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
            this.upstream = upstream;
            this.ttlFunc = ttl;
            this.clock = clock;
        }
    
        private void tryDrain() {
            if (draining.compareAndSet(false, true)) {
                try {
                    TestCachingUpstream.Cached<? extends T> cached = ref.get();
                    if (needsFetch(cached)) {
                        if (pending.compareAndSet(false, true)) {
                            Downstream<? super T> downstream = waiting.poll();
    
                            System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
    
                            if (downstream == null) {
                                pending.set(false);
                            } else {
                                try {
                                    yield(downstream);
                                } catch (Throwable e) {
                                    System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                    receiveResult(downstream, ExecResult.of(Result.error(e)));
                                }
                            }
                        }
                    } else {
                        System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                        Downstream<? super T> downstream = waiting.poll();
                        while (downstream != null) {
                            downstream.accept(cached.result);
                            downstream = waiting.poll();
                        }
                    }
                } finally {
                    draining.set(false);
                }
            }
    
            if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
                tryDrain();
            }
        }
    
        private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
            return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
        }
    
        private void yield(final Downstream<? super T> downstream) throws Exception {
            System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
            upstream.connect(new Downstream<T>() {
                public void error(Throwable throwable) {
                    System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
                }
    
                @Override
                public void success(T value) {
                    System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.success(value)));
                }
    
                @Override
                public void complete() {
                    System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, CompleteExecResult.get());
                }
            });
        }
    
        @Override
        public void connect(Downstream<? super T> downstream) throws Exception {
            TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
            if (needsFetch(cached)) {
                Promise.<T>async(d -> {
                    waiting.add(d);
                    tryDrain();
                }).result(downstream::accept);
            } else {
                downstream.accept(cached.result);
            }
        }
    
        private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
            Duration ttl = Duration.ofSeconds(0);
            try {
                ttl = ttlFunc.apply(result);
            } catch (Throwable e) {
                if (result.isError()) {
                    //noinspection ThrowableResultOfMethodCallIgnored
                    result.getThrowable().addSuppressed(e);
                } else {
                    result = ExecResult.of(Result.error(e));
                }
            }
    
            Instant expiresAt;
            if (ttl.isNegative()) {
                expiresAt = null; // eternal
                System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
                upstream = null; // release
            } else if (ttl.isZero()) {
                expiresAt = clock.instant().minus(Duration.ofSeconds(1));
            } else {
                expiresAt = clock.instant().plus(ttl);
            }
    
            ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
            pending.set(false);
    
            downstream.accept(result);
    
            tryDrain();
        }
    
        static class Cached<T> {
            final ExecResult<T> result;
            final Instant expireAt;
    
            Cached(ExecResult<T> result, Instant expireAt) {
                this.result = result;
                this.expireAt = expireAt;
            }
        }
    }
    

    我已将 for 循环中的步骤数从 25 减少到 3,以使控制台输出更简洁。

    成功测试执行(无竞争条件)

    让我们看一下正确执行的流程是什么样的:
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...  
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...  
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...  
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...
    

    正如您所看到的,每次迭代都会导致缓存的 Promise 产生 5 个控制台日志行。
  • tryDrain 第一次调用方法,没有缓存结果,它下降到yield(downstream);方法调用
  • 调用yield(downstream)成功完成和receiveResult(downstream, ExecResult.of(Result.success(value)));从内部调用 success回调
  • Promise.cache()通过使用负持续时间来使用无限到期日期,这就是 receiveResult() 的原因方法releases upstream object by setting it's value to null
  • receiveResult()完成前的方法sets cached result using ref internal object并调用tryDrain()在退出方法之前。
  • tryDrain()方法sees previously cached result用于缓存 promise (p.map(v -> v + 2))的下一次调用,因此它将缓存结果直接传递给下游。

  • 对于在 for 循环中创建的所有 3 个 Promise,这种情况会重复。

    测试执行失败(竞争条件)

    使用这些 System.out.printf() 运行测试使测试失败的次数减少了几次,主要是因为此 I/O 操作消耗了一些 CPU 周期,并且不同步的部分代码有更多的周期来避免竞争条件。但是它仍然会发生,现在让我们看看失败测试的输出是什么样的:
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...  
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null... 
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...  
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...  
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null}) 
    
    java.lang.NullPointerException
        at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
        at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
        at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
        at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
        at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
        at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
        at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
        at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
        at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
        at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
        at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
        at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    

    这是失败测试的输出 - 我在 IntelliJ IDEA 中运行它,并且我已将这个测试的执行配置为重复执行直到失败。我花了一些时间才让这个测试失败,但是在运行这个测试几次之后,它最终在迭代号 1500 左右失败了。在这种情况下,我们可以看到在 for 循环中创建的第一个 Promise 发生了竞争条件。可以看到receiveResult()里面释放上游对象后方法
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    

    并调用 tryDrain在退出该方法之前,缓存 promise 的下一次执行还没有看到先前缓存的结果,它运行到 yield(downstream)再次方法。之后 upstream对象已通过将其值设置为 null 来释放.和yield(downstream)方法期望上游对象正确初始化,否则抛出 NPE。

    我试图调试方法:
    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
        return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }
    

    这是决定是否需要获取缓存的 promise 的方法。然而,当我添加任何日志语句时,它开始导致 StackOverflowError .我猜在极少数情况下cached.expireAt.isBefore(clock.instant())返回 false , 因为 cached对象来自 AtomicReference所以这个对象应该在方法执行之间正确传递。

    这是我在实验中使用的完整测试类:
    import com.google.common.annotations.VisibleForTesting;
    import io.netty.util.internal.PlatformDependent;
    import org.junit.Test;
    import ratpack.exec.*;
    import ratpack.exec.internal.CompleteExecResult;
    import ratpack.exec.internal.DefaultExecution;
    import ratpack.exec.internal.DefaultPromise;
    import ratpack.exec.util.ParallelBatch;
    import ratpack.func.Function;
    import ratpack.func.Predicate;
    import ratpack.test.exec.ExecHarness;
    
    import java.time.Clock;
    import java.time.Duration;
    import java.time.Instant;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Queue;
    import java.util.UUID;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicReference;
    
    public class AnotherPromiseTest {
    
        @Test
        public void foo() throws Exception {
            List<Promise<Integer>> promises = new ArrayList<>();
    
            for (int i = 0; i < 3; i++) {
                Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
                        continuation.resume(() -> down.success(12))
                )) {
                    @Override
                    public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
                        return transform(up -> {
                            return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
                        });
                    }
                };
    
                p = p.cache();
                promises.add(p.map(v -> v + 1));
                promises.add(p.map(v -> v + 2));
            }
    
            ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
        }
    
        private static class TestCachingUpstream<T> implements Upstream<T> {
            private final String id = UUID.randomUUID().toString();
    
            private Upstream<? extends T> upstream;
    
            private final Clock clock;
            private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
            private final Function<? super ExecResult<T>, Duration> ttlFunc;
    
            private final AtomicBoolean pending = new AtomicBoolean();
            private final AtomicBoolean draining = new AtomicBoolean();
            private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
    
            public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
                this(upstream, ttl, Clock.systemUTC());
            }
    
            @VisibleForTesting
            TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
                this.upstream = upstream;
                this.ttlFunc = ttl;
                this.clock = clock;
            }
    
            private void tryDrain() {
                if (draining.compareAndSet(false, true)) {
                    try {
                        TestCachingUpstream.Cached<? extends T> cached = ref.get();
                        if (needsFetch(cached)) {
                            if (pending.compareAndSet(false, true)) {
                                Downstream<? super T> downstream = waiting.poll();
    
                                System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
    
                                if (downstream == null) {
                                    pending.set(false);
                                } else {
                                    try {
                                        yield(downstream);
                                    } catch (Throwable e) {
                                        System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                        receiveResult(downstream, ExecResult.of(Result.error(e)));
                                    }
                                }
                            }
                        } else {
                            System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                            Downstream<? super T> downstream = waiting.poll();
                            while (downstream != null) {
                                downstream.accept(cached.result);
                                downstream = waiting.poll();
                            }
                        }
                    } finally {
                        draining.set(false);
                    }
                }
    
                if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
                    tryDrain();
                }
            }
    
            private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
                return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
            }
    
            private void yield(final Downstream<? super T> downstream) throws Exception {
                System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
                upstream.connect(new Downstream<T>() {
                    public void error(Throwable throwable) {
                        System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                        receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
                    }
    
                    @Override
                    public void success(T value) {
                        System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                        receiveResult(downstream, ExecResult.of(Result.success(value)));
                    }
    
                    @Override
                    public void complete() {
                        System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                        receiveResult(downstream, CompleteExecResult.get());
                    }
                });
            }
    
            @Override
            public void connect(Downstream<? super T> downstream) throws Exception {
                TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
                if (needsFetch(cached)) {
                    Promise.<T>async(d -> {
                        waiting.add(d);
                        tryDrain();
                    }).result(downstream::accept);
                } else {
                    downstream.accept(cached.result);
                }
            }
    
            private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
                Duration ttl = Duration.ofSeconds(0);
                try {
                    ttl = ttlFunc.apply(result);
                } catch (Throwable e) {
                    if (result.isError()) {
                        //noinspection ThrowableResultOfMethodCallIgnored
                        result.getThrowable().addSuppressed(e);
                    } else {
                        result = ExecResult.of(Result.error(e));
                    }
                }
    
                Instant expiresAt;
                if (ttl.isNegative()) {
                    expiresAt = null; // eternal
                    System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
                    upstream = null; // release
                } else if (ttl.isZero()) {
                    expiresAt = clock.instant().minus(Duration.ofSeconds(1));
                } else {
                    expiresAt = clock.instant().plus(ttl);
                }
    
                ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
                pending.set(false);
    
                downstream.accept(result);
    
                tryDrain();
            }
    
            static class Cached<T> {
                final ExecResult<T> result;
                final Instant expireAt;
    
                Cached(ExecResult<T> result, Instant expireAt) {
                    this.result = result;
                    this.expireAt = expireAt;
                }
            }
        }
    }
    

    希望能帮助到你。

    关于java - Ratpack 的 Promise.cache 与 ParallelBatch 中的多个下游 promise ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50824081/

    相关文章:

    java - 如何使用NNLS进行非负多元线性回归?

    java - Apache HttpClient HttpRequestRetryHandler 从未被调用

    java - 解析 XSD 时,OSGi/Equinox 应用程序启动因 "301 Moved Permanently"而失败

    go - 解决 goroutines 死锁

    javascript - angularjs deferred 在回调中不起作用

    java - jmonkeyengine 移动速度太快无法进行碰撞检测

    java - DB中的并发问题

    java - 给定数据库结构可以在运行时更改,如何处理并发 SQL 更新

    javascript - Promise.all 错误处理——让一个 Promise 的结果在另一个 Promise 的 catch 中可访问?

    javascript - jQuery 时,中止多个 Ajax