我正在尝试创建一种机制来限制并发网络请求的数量。我的想法是,我想要一个固定的线程池,例如 20 个线程,并使用该池最多只允许 20 个传出 HTTP 请求。
我一直在尝试做的是:
public class HttpClient {
private final Scheduler scheduler;
public HttpClient(int maxRequests) {
this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
}
public Single<...> request() {
return this.httpRequest()
.subscribeOn(this.scheduler);
}
// sends the http request and returns a response
private Single<...> httpRequest() {
return ...
}
}
但这不起作用。我尝试将 maxRequests 设置为 1,发送 5 个请求,然后在接收请求的服务器上设置一个断点,以将第一个请求“卡”在那里,以便查看其他 4 个线程是否等待可用线程。但是所有 5 个请求都执行了,过了一会儿,我在所有 5 个请求上都遇到了超时异常。
我也尝试过使用 observeOn
,但也不起作用。
编辑:我还尝试使用以下代码实现信号量逻辑:
public HttpClient(int maxRequests) {
this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}
public Single<...> request() {
return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
.andThen(this.httpRequest())
.doFinally(concurrentRequestsSemaphore::release);
}
其中Semaphore
是信号量的 native Java实现。该机制按预期工作,如果 maxRequests
为 2,并且我发送了 5 个请求,则 2 个请求将发出,其他 3 个将陷入 fromAction
等待。但这种方法带来了其他意想不到的行为,例如即使在 2 个请求收到响应后,其他 3 个请求也没有执行,因为 .doFinally(concurrentRequestsSemaphore::release)
从未执行。我做了一些测试,它仅在 X 请求收到响应后执行。 X 会是什么样子是完全无法预测的。因此,可能有一个包含 20 个许可的信号量,20 个请求将发出并返回响应,并且不会执行其他请求,因为该信号量从未被任何请求释放。
最佳答案
您没有展示private Single<...> httpRequest()
的 body 。我假设你在那里调用一些异步方法。异步方法仅占用线程来处理响应,并且当请求本身移动到服务器并返回时,不会使用任何线程。这解释了为什么您会看到所有 5 个请求均已到达服务器。
通常,为了限制某种 Activity 的数量,java.util.concurrent.Semaphore使用 s,但它们通过阻塞线程来限制 Activity 。从逻辑上讲,由于您的程序是异步的,因此您需要使用异步信号量,但它是一种罕见的野兽。
所以你有以下选择:
- 根本不限制异步 http 请求的数量,因为它们无论如何都不会占用太多资源
- 启动一个特殊线程,从普通同步信号量获取许可,然后启动异步http请求。当请求完全完成时信号量被释放
- 使用固定线程池同步启动http请求
- 使用异步信号量。我知道的唯一实现在我的库中 DF4J : AsyncSemaphore是标准信号量的扩展,因此具有同步和异步接口(interface),并且 InpSignal仅在异步程序中使用。 InpSignal 的示例使用量为AsyncServerSocketChannel.java ,它用于限制 Echo Server 实现中打开的客户端连接的数量。
关于java - RxJava : Thread pool for network calls,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60399342/