我有以下任务,我想使用 Project Reactor(或 RxJava)来解决它
有事件的来源。每个事件由 serviceId 和一些负载组成。收到事件后,我们需要使用有效负载对指定的 serviceId 执行操作。但我们要保证对同一个serviceId的两次请求之间的时间间隔必须大于或等于1秒。但对不同服务的请求可以并行执行。
我们还应该注意,服务的计数是动态的。
如下图所示
目前我有以下代码:
Flux.create((sink-> eventProvider.listen(new EventListner(){
public void event(req) {
sink.next(req);
}
})))
/* need some logic here */
.flatMap(req -> requestExecutor.execute(req))
.doOnNext(res -> responseProcessor.process(res))
.subscribe();
你有什么想法吗?
最佳答案
如果事件标识了它们发起调用的服务,您可以使用 groupBy()
运算符按服务分隔流。要在每个服务请求后引入延迟,请使用带有参数的 flatMap()
来单线程使用。
在 RxJava 中:
observable
.groupBy(event -> getServiceId( event )) // 1
.flatMap(serviceObservable -> // 2
serviceObservable // 3
.flatMap( ev -> service(serviceObservable.getKey(), ev), 1) // 4
.delay(1, TimeUnit.SECONDS)) // 5
.subscribe();
- 按事件将使用的服务对事件进行分组。该 ID 稍后将作为 key 使用。当遇到新的服务 ID 时,这将发出新的项目。
serviceObservable
是一个GroupByObservable
,将在下面进行处理。- 此可观察对象的每次发射都是一个应该发送到单个服务的事件。
serviceObservable.getKey()
返回要使用的服务的 ID。我发明了一种方法service()
,它通过服务的 ID 向服务发送事件。此外,参数1
告诉flatMap()
单线程操作,因此一次只能发生一个服务请求。delay()
(或您想要的任何运算符)将在释放操作之前等待一秒钟。
(免责声明:此代码未经测试,但我在过去的项目中做过类似的调度,因此基本思想是合理的。)
关于java - Project Reactor(或 RxJava2)执行程序序列调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53519820/