java - 如何使用 Rx 运行并发任务队列?

标签 java concurrency rx-java rx-java2

我找到了很多关于它的示例,但不知道什么是“正确”的实现。

基本上,我有一个对象(我们称之为 NBAManager),并且该对象有一个方法 public CompletablegenerateGame()。这个想法是 generateGame 方法被调用很多次,我想以顺序方式生成游戏:我正在考虑并发队列。我提出了以下设计:我将为 NBAManager 创建一个 NBAService: service 的单例实例,并且 generateGame() 的主体将看起来像这样:

公共(public)可完成的generateGame(RequestInfo信息) 返回 service.generateGame(info);

所以基本上我会放弃那个 Completable 结果。在该 NBAService 对象内部,我将有一个队列(一个并发队列,因为我希望有机会 poll()add (请求)(如果在 NBAManager 正在处理较早的请求之一时调用了 generateGame())。我被这个问题困扰了:

  • 以 Rx 方式编写这样的作业队列的正确方法是什么?这方面的例子有很多。您能给我发送一个良好实现的链接吗?
  • 如何处理队列执行的逻辑?我相信,如果只有一项工作,我们就必须执行,如果有很多工作,那么我们只需添加它即可。在没有可运行的情况下如何控制它?我正在考虑使用主题

谢谢!

最佳答案

有多种方法可以实现这一点,您可以选择调用多少 RxJava。最少的参与可以使用单线程 ExecutorService 作为“队列”和 CompletableSubject 来延迟完成:

class NBAService {
    static ExecutorService exec = Executors.newSingleThreadedExecutor();

    public static Completable generateGame(RequestInfo info) {
        CompletableSubject result = CompletableSubject.create();
        exec.submit(() -> {
            // do something with the RequestInfo instance
            f(info).subscribe(result);
        });
        return result;
    }
}

如果您想在订阅 Completable 时触发执行,则需要更复杂的解决方案。在这种情况下,您可以使用 create()subscribeOn():

class NBAService {

    public static Completable generateGame(RequestInfo info) {
        return Completable.create(emitter -> {
            // do something with the RequestInfo instance
            emitter.setDisposable(
                f(info).subscribe(emitter::onComplete, emitter::onError)
            );
        })
        .subscribeOn(Schedulers.single());
    }
}

关于java - 如何使用 Rx 运行并发任务队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48509017/

相关文章:

go - 将上下文与取消一起使用,Go 例程不会终止

android - Realm DB Realm.asObservable() 发出结果两次

android - 更新到 RxJava 2.x.x 后 Observable.combineLatest 导致错误。 - 无法推断类型

java - Hibernate DTD 未从类路径加载,jar 在那里,仍然出现 FileNotFoundException

java - 从嵌入式 Tomcat/Jetty jsf-bean 调用应用程序方法

scala - 你如何处理 Akka Flow 中的 futures 和 mapAsync?

c# - ASP.NET Web API C# 并发请求导致数据库重复

algorithm - 异步、提前退出、串联 Observable

java - 使用 vertx 在 mongoDB 中批量写入

java - 执行Java -cp