java - 限制 RxJava/Micronaut 项目中不同目的地的连接

标签 java rx-java reactive-programming micronaut

我有一个项目,可以从许多不同的提供商读取数据;有些通过 SOAP,有些通过 HTTP 等。其中一些提供程序还对并发连接数有限制。例如,提供商 A 可能允许无限个连接,提供商 B 可能只允许 2 个连接,提供商 C 可能允许 5 个连接。

我对 Micronaut 很熟悉,但我不知道它内置了任何东西,可以让我根据需要限制对特定 URL 的连接。因此,我的第一个想法是创建每个提供者的线程限制(也许使用 RxJava 的调度程序系统?我相信您可以使用 Java 的 Executor 类创建自定义线程限制)并让它为我完成排队工作。我想我也可以采用更手动的方式创建 ConcurrentMap 并在其中存储 Activity 连接的数量,但这似乎更困惑且更容易出错。

如有任何建议,我们将不胜感激!谢谢!

最佳答案

仅当网络连接由线程(即同步)进行时,限制线程数才适用。但Micronaut也可以进行异步连接,然后限制线程数就不起作用了。最好直接限制连接数。创建一个与 Micronaut 具有相同接口(interface)的中间代理对象,并将所有传入请求传递给真正的 Micronaut。它还有一个参数 - limit,当请求通过时,减少 limit。当限制变为 0 时,代理对象停止传递请求,将它们保留在输入队列中。一旦请求完成,它就会向代理对象发出信号,并从输入队列传递一个请求(如果有),或者只是增加限制。

代理最简单的实现是一个线程,使用 BlockingQueue 处理输入请求,使用 Semaphore 进行限制。但如果有很多提供者并且为每​​个提供者创建一个线程的成本很高,则可以将代理实现为异步对象。

关于java - 限制 RxJava/Micronaut 项目中不同目的地的连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59674025/

相关文章:

java - 从 Elasticsearch 在 Elasticsearch Painless 脚本参数中传递动态值

android - 将 RxAndroid 导入 Android Studio

rx-java - 为什么 Observable.empty() 不向下游传播 onComplete() ?

javascript - 如何为 RxJS 中的两个可观察对象设置 'wait'

java - GWT 中的 MYSQL 驱动程序错误

java - 如果用户在警报对话框中按"is"按钮,如何在列表中添加项目,而如果按“否”则不添加该项目

java - Android 以编程方式接听电话

rx-java - 如果 retryWhen :s retries runs out 则捕获错误

Angular2 + RxJS - 无法读取未定义的下一个属性

junit - 无法调用 "reactor.core.publisher.Mono.thenReturn(Object)",因为返回值为空