java - 不带 Lambda 的 CompletableFuture.supplyAsync()

标签 java completable-future

我正在努力适应 Supplier<U> 的功能风格等并创建可测试的代码。

所以我有一个 InputStream它被分成异步处理的 block ,我想知道它们什么时候完成。为了编写可测试的代码,我将处理逻辑外包给它自己的Runnable :

public class StreamProcessor {
    
    public CompletableFuture<Void> process(InputStream in) {
        List<CompletableFuture> futures = new ArrayList<>();
        while (true) {
            try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
                byte[] data = IOUtils.toByteArray(chunkStream);
                CompletableFuture<Void> f = CompletableFuture.runAsync(createTask(data));
                futures.add(f);
                
            } catch (EOFException ex) {
                // end of stream reached
                break;
            } catch (IOException ex) {
                return CompletableFuture.failedFuture(ex);
            }
        }
        return CompletableFuture.allOf(futures.toArray(CompletableFuture<?>[]::new));
    }
    
    ChunkTask createTask(byte[] data) {
        return new ChunkTask(data);
    }
    
    public class ChunkTask implements Runnable {
        final byte[] data;

        ChunkTask(byte[] data) {
            this.data = data;
        }

        @Override
        public void run() {
            try {
                // do something
            } catch (Exception ex) {
                // checked exceptions must be wrapped
                throw new RuntimeException(ex);
            }
        }
        
    }
}

这很有效,但会带来两个问题:

  1. 处理代码不能返回任何内容;这是 Runnable毕竟。
  2. ChunkTask.run() 内捕获的任何已检查异常必须包含在 RuntimeException 中。解开失败的组合CompletableFuture返回 RuntimeException 需要再次打开才能到达最初的原因 - 与 IOException 相反.

所以我正在寻找一种方法来做到这一点 CompletableFuture.supplyAsync() ,但我不知道如何在没有 lambda 的情况下做到这一点(不好测试)或返回 CompletableFuture.failedFuture()从处理逻辑来看。

最佳答案

我可以想到两种方法:

<强>1。与supplyAsync :

当使用CompletableFuture.supplyAsync时,你需要一个供应商而不是一个可运行的:

    public static class ChunkTask implements Supplier<Object> {
        final byte[] data;

        ChunkTask(byte[] data) {
            this.data = data;
        }

        @Override
        public Object get() {
            Object result = ...;
            // Do something or throw an exception
            return result;
        }
    }

然后:

CompletableFuture
    .supplyAsync( new ChunkTask( data ) )
    .whenComplete( (result, throwable) -> ... );

如果 Supplier.get() 发生异常,它将被传播,您可以在 CompletableFuture.whenComplete 中看到它, CompletableFuture.handleCompletableFuture.exceptionally .

<强>2。通过CompletableFuture到线程

您可以传递CompletableFutureChunkTask :

    public class ChunkTask implements Runnable {
        final byte[] data;
        private final CompletableFuture<Object> future;

        ChunkTask(byte[] data, CompletableFuture<Object> future) {
            this.data = data;
            this.future = future;
        }

        @Override
        public void run() {
            try {
                Object result = null;
                // do something
                future.complete( result );
            } catch (Throwable ex) {
                future.completeExceptionally( ex );
            }
        }
    }

那么逻辑就变成了:

while (true) {
    CompletableFuture<Object> f = new CompletableFuture<>();
    try (SizeLimitInputStream chunkStream = new SizeLimitInputStream(in, 100)) {
        byte[] data = IOUtils.toByteArray(chunkStream);
        startThread(new ChunkTask(data, f));
        futures.add(f);
     } catch (EOFException ex) {
         // end of stream reached
         break;
     } catch (IOException ex) {
         f.completeExceptionally( ex );
         return f;
     }
}

可能,第 2 点可以让您更灵活地管理异常。

关于java - 不带 Lambda 的 CompletableFuture.supplyAsync(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67300567/

相关文章:

java - ReentrantLock 与 CompletableFuture

exception - Java 8 CompletableFuture 异常方法的惊人行为

java - 为什么 CompletableFuture.allOf 声明为 CompletableFuture<Void>?

java - Logger.getLogger ("Classname".class.getName()).log(Level.SEVERE, null, ex);

java - HTTP 状态 415 - 将数据从 ajax 作为 json 传递到 Restfull 服务器时出现不支持的媒体类型错误

java - 如何通过 JFileChooser 将 Icon 对象保存到文件中?

java - CompletableFuture.thenAccept 中使用的垃圾回收对象的可用性

java-8 - 用于 CompletableFuture 中异常处理的 finally block 等效项

java - 在测试中用 ExecutorService 替换 ManagedExecutorService

java - Java字符集转换