java - 将代码重构为响应式风格

标签 java reactive-programming rx-java

我正在开发的应用程序大量使用异步处理,我正在寻找更好的方法来组织代码。

系统的外部输入由 servlet 接收。该 servlet 收集的原始数据被存入队列中。线程池针对该队列运行,并将原始数据解析为结构化记录,然后将其存入一组 N 个队列中的一个。选择队列时,所有同类记录都会进入同一队列。这 N 个队列分别由一个线程提供服务,该线程将同类记录收集到一个集合中。计划任务每​​分钟都会醒来,并将前一分钟收集的每种类型的所有记录写入文件。

目前,这段代码是使用一堆队列、线程池和不断运行的可运行对象来组织的,这使得逻辑难以理解。我想将此代码重构为使上述数据流更加明确的代码。我正在寻找实现这一目标的工具和方法。

  • 使用类似 RxJava 的工具帮忙解决这个问题吗?如果是这样,怎么办?
  • 我还应该考虑其他方法吗?

最佳答案

根据您的描述,这是一个 RxJava 的示例。希望对您有帮助。

public class TestServlet extends HttpServlet {

private static final PublishSubject<String> o = PublishSubject.<String>create();

public static Observable<String> getServletObservable() {
    return o.observeOn(Schedulers.computation());
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {
    synchronized (TestServlet.class) {
        o.onNext("value");
    }
}

}

class Foo {
public static void main(String[] args) {
    TestServlet.getServletObservable().map(new Func1<String, String>() {

        @Override
        public String call(String t1) {
            // do something
            return null;
        }

    }).groupBy(new Func1<String, String>() {

        @Override
        public String call(String t1) {
            // do something
            return null;
        }

    }).subscribe(new Observer<GroupedObservable<String, String>>() {

        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(GroupedObservable<String, String> group) {
            group.observeOn(Schedulers.io()).subscribe(new Observer<String>() {

                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String t) {
                    // store t
                }

            });
        }

    });
}
}

关于java - 将代码重构为响应式风格,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22468409/

相关文章:

java - 如何在 RxJava 中进行递归 Observable 调用?

reactive-programming - FRP本质 : functional reactive programming as programming with (discrete) differential equations?

java - 在 RxJava 中将 Observable<List<Car>> 转换为 Observable<Car> 序列

java - Clojure gen-class this 关键字

java - 如何通过 1 个按钮设置 2 个或更多图像

java - 在Java中,如何避免HTTP 404获取文件://URL with special characters in it?

java - 将文件从 R 传递到 Java

java - react 堆项目 : Reactively transform mutable object's state with another Publisher in same sequence

android - 如何使用rxandroid监听gps位置更新

java - RxJava Observable "Iteration"是如何工作的?