java - CompletableFuture、Future 和 RxJava 的 Observable 的区别

标签 java multithreading asynchronous java-8 rx-java

我想知道两者的区别 CompletableFutureFutureObservable RxJava.

我所知道的是所有都是异步的,但是

Future.get() 阻塞线程

CompletableFuture给出回调方法

RxJava Observable --- 类似于 CompletableFuture 其他好处(不确定)

例如:如果客户端需要进行多个服务调用,而当我们使用 Futures (Java) Future.get() 将按顺序执行...想要想知道它在 RxJava 中的表现如何。

还有文档 http://reactivex.io/intro.html

很难使用 Futures 来优化组合条件异步执行流(或者是不可能的,因为每个请求的延迟在运行时会有所不同)。当然可以这样做,但它很快就会变得复杂(因此容易出错),或者它会过早地阻塞 Future.get(),从而消除了异步执行的好处。

真的很想知道 RxJava 是如何解决这个问题的。我发现从文档中很难理解。

最佳答案

future

Futures在 Java 5 (2004) 中引入。它们基本上是尚未完成的操作结果的占位符。操作完成后,Future 将包含该结果。例如,操作可以是 RunnableCallable提交给 ExecutorService 的实例.操作的提交者可以使用Future 对象来检查操作是否isDone() ,或使用阻塞 get() 等待它完成方法。

例子:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures在 Java 8 (2014) 中引入。它们实际上是受 Google 的 Listenable Futures 启发的常规 Futures 的演变。 ,部分 Guava图书馆。它们是 future ,还允许您将任务串联在一起。您可以使用它们告诉某个工作线程“去做一些任务 X,完成后,使用 X 的结果去做其他事情”。使用 CompletableFutures,您可以对操作的结果做一些事情,而无需实际阻塞线程来等待结果。这是一个简单的例子:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJavareactive programming 的整个库在 Netflix 创建。乍一看,它似乎类似于 Java 8's streams .确实如此,只是它更强大。

与 Futures 类似,RxJava 可用于将一堆同步或异步操作串在一起以创建处理管道。与一次性使用的 Futures 不同,RxJava 可以处理零个或多个项目的。包括具有无限数量的项目的永无止境的流。由于令人难以置信的丰富set of operators,它也更加灵活和强大。 .

与 Java 8 的流不同,RxJava 也有一个 backpressure机制,它允许它处理处理管道的不同部分在不同线程中以不同速率运行的情况。

RxJava 的缺点是,尽管有可靠的文档,但由于涉及到范式转换,学习它是一个具有挑战性的库。 Rx 代码也可能是调试的噩梦,尤其是在涉及多个线程的情况下,甚至更糟 - 如果需要背压。

如果你想进入它,有一个完整的page官网各种教程,加官方documentationJavadoc .您还可以观看一些视频,例如 this one它简要介绍了 Rx,并讨论了 Rx 和 Futures 之间的区别。

奖励:Java 9 响应式(Reactive)流

Java 9's Reactive Streams又名 Flow API是由各种 reactive streams 实现的一组接口(interface)库,例如 RxJava 2 , Akka Streams , 和 Vertx .它们允许这些响应式(Reactive)库互连,同时保留所有重要的背压。

关于java - CompletableFuture、Future 和 RxJava 的 Observable 的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35329845/

相关文章:

java - 即使存在实例,Room DB 也始终运行 if(INSTANCE==null) block

java - JTable 过滤数据无法正常工作

python - 在运行线程中显示 chaco 图

java - 在android中管理多个异步任务

c# - 如何使 IObservable<T> 的实现成为多线程的?

javascript - 是否可以在 Java 或 JavaScript 中发出 HTTP (POST) 请求而不等待响应

java - 如何使用标准数据为 Realm 数据库提供种子?

java - GC 花费了大量时间

java - 我的代码中有一个第三方异步 block 。我怎么知道它什么时候结束?

java - MultitClient 服务器 - 允许持续通信