java - RxJava : Thread pool for network calls

标签 java multithreading rx-java rx-java2

我正在尝试创建一种机制来限制并发网络请求的数量。我的想法是,我想要一个固定的线程池,例如 20 个线程,并使用该池最多只允许 20 个传出 HTTP 请求。

我一直在尝试做的是:

public class HttpClient {
  private final Scheduler scheduler;

  public HttpClient(int maxRequests) {
    this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
  }

  public Single<...> request() {
    return this.httpRequest()
      .subscribeOn(this.scheduler);
  }

  // sends the http request and returns a response
  private Single<...> httpRequest() {
    return ...
  }
}

但这不起作用。我尝试将 maxRequests 设置为 1,发送 5 个请求,然后在接收请求的服务器上设置一个断点,以将第一个请求“卡”在那里,以便查看其他 4 个线程是否等待可用线程。但是所有 5 个请求都执行了,过了一会儿,我在所有 5 个请求上都遇到了超时异常。

我也尝试过使用 observeOn,但也不起作用。

编辑:我还尝试使用以下代码实现信号量逻辑:


public HttpClient(int maxRequests) {
  this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}

public Single<...> request() {
  return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
   .andThen(this.httpRequest())
   .doFinally(concurrentRequestsSemaphore::release);
}

其中Semaphore是信号量的 native Java实现。该机制按预期工作,如果 maxRequests 为 2,并且我发送了 5 个请求,则 2 个请求将发出,其他 3 个将陷入 fromAction 等待。但这种方法带来了其他意想不到的行为,例如即使在 2 个请求收到响应后,其他 3 个请求也没有执行,因为 .doFinally(concurrentRequestsSemaphore::release) 从未执行。我做了一些测试,它仅在 X 请求收到响应后执行。 X 会是什么样子是完全无法预测的。因此,可能有一个包含 20 个许可的信号量,20 个请求将发出并返回响应,并且不会执行其他请求,因为该信号量从未被任何请求释放。

最佳答案

您没有展示private Single<...> httpRequest()的 body 。我假设你在那里调用一些异步方法。异步方法仅占用线程来处理响应,并且当请求本身移动到服务器并返回时,不会使用任何线程。这解释了为什么您会看到所有 5 个请求均已到达服务器。 通常,为了限制某种 Activity 的数量,java.util.concurrent.Semaphore使用 s,但它们通过阻塞线程来限制 Activity 。从逻辑上讲,由于您的程序是异步的,因此您需要使用异步信号量,但它是一种罕见的野兽。 所以你有以下选择:

  • 根本不限制异步 http 请求的数量,因为它们无论如何都不会占用太多资源
  • 启动一个特殊线程,从普通同步信号量获取许可,然后启动异步http请求。当请求完全完成时信号量被释放
  • 使用固定线程池同步启动http请求
  • 使用异步信号量。我知道的唯一实现在我的库中 DF4J : AsyncSemaphore是标准信号量的扩展,因此具有同步和异步接口(interface),并且 InpSignal仅在异步程序中使用。 InpSignal 的示例使用量为AsyncServerSocketChannel.java ,它用于限制 Echo Server 实现中打开的客户端连接的数量。

关于java - RxJava : Thread pool for network calls,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60399342/

相关文章:

java - 使用通用 URL 模式保护 REST 资源

Java8 : How to get the internal Type of java. util.Optional 类?

java - 领域驱动设计中的实体

java - JDK 1.7错误: Could not find or load main class when using command "java -cp" with multiple jars

java - 无法实现不安全线程

java - 使用用户定义的 Spring @Components 进行异步处理

c# - 如何处理 ManualResetEvent

rx-java - 如何在RxJava中将Observable子类化?

android - RxJava 和缓存数据

安卓 map 框 : How to unregister IntentReceiver?