java - Project Reactor(或 RxJava2)执行程序序列调用

标签 java functional-programming rx-java2 project-reactor

我有以下任务,我想使用 Project Reactor(或 RxJava)来解决它

有事件的来源。每个事件由 serviceId 和一些负载组成。收到事件后,我们需要使用有效负载对指定的 serviceId 执行操作。但我们要保证对同一个serviceId的两次请求之间的时间间隔必须大于或等于1秒。但对不同服务的请求可以并行执行。

我们还应该注意,服务的计数是动态的。

如下图所示

enter image description here

目前我有以下代码:

Flux.create((sink-> eventProvider.listen(new EventListner(){
                public void event(req) {
                    sink.next(req);
                }
            })))
        /* need some logic here */
        .flatMap(req -> requestExecutor.execute(req))
        .doOnNext(res -> responseProcessor.process(res))
        .subscribe();

你有什么想法吗?

最佳答案

如果事件标识了它们发起调用的服务,您可以使用 groupBy() 运算符按服务分隔流。要在每个服务请求后引入延迟,请使用带有参数的 flatMap() 来单线程使用。

在 RxJava 中:

observable
  .groupBy(event -> getServiceId( event )) // 1
  .flatMap(serviceObservable -> // 2
       serviceObservable // 3
         .flatMap( ev -> service(serviceObservable.getKey(), ev), 1) // 4
                           .delay(1, TimeUnit.SECONDS)) // 5
  .subscribe();
  1. 按事件将使用的服务对事件进行分组。该 ID 稍后将作为 key 使用。当遇到新的服务 ID 时,这将发出新的项目。
  2. serviceObservable 是一个 GroupByObservable,将在下面进行处理。
  3. 此可观察对象的每次发射都是一个应该发送到单个服务的事件。
  4. serviceObservable.getKey() 返回要使用的服务的 ID。我发明了一种方法 service(),它通过服务的 ID 向服务发送事件。此外,参数 1 告诉 flatMap() 单线程操作,因此一次只能发生一个服务请求。
  5. delay()(或您想要的任何运算符)将在释放操作之前等待一秒钟。

(免责声明:此代码未经测试,但我在过去的项目中做过类似的调度,因此基本思想是合理的。)

关于java - Project Reactor(或 RxJava2)执行程序序列调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53519820/

相关文章:

java - 我应该如何确保 onCreate(View/Dialog) 已完成运行,以便一切都正确初始化?

scala - Scala中的真实世界函数式编程

haskell - 有没有办法避免在插入时复制二叉树的整个搜索路径?

f# - 如何在 F# 中实现 "return early"逻辑

android - 合并可观察对象后的 Distinct() 方法

java - RXJava - 当其中之一的数据可用时永久切换可观察对象

java - 如何使用Java播放默认Windows OS(不仅是) “finish/completed”声音?

Java 初学者 - 需要 Java 代码错误建议

java - 向根节点添加命名空间会导致它添加命名空间以添加子节点

rx-java - 在 Rxjava 中使用 Flowable 时无法处理错误