http - 使用 RxJava 使用 Async Jersey HTTP Client 限制传出的 HTTP 请求

标签 http jersey rx-java vert.x throttling

我正在尝试使用 Jersey Client 来限制传出的 http 请求。因为我运行的是 Vertx Verticle,所以我创建了一个特殊的 RateLimiter 类来处理 throttle 。

我的目标是防止 HTTP 调用的速率超过每秒 1 次。我的想法是,提交的可调用对象将使用单线程 ExecutorService 运行,这样我就可以阻塞该单线程,以确保不会以更高的速度处理这些任务。

基本上这个类中唯一的公共(public)方法是“call”:

 public <T> Observable<T> call(Callable<Observable<T>> action) {
    return Observable.create(subscriber -> {

        Observable<Observable<T>> observed =
                Observable.from(executor.submit(() -> {                         
                        return action.call();
                    })
                ).doOnError(throwable -> {
                               logger.error(throwable);
                           }
                );

        observed.subscribe(t -> {
            try {
                Thread.sleep(1000); 
                t.subscribe(data -> {
                    try {
                        subscriber.onNext(data);
                    } catch (Throwable e) {
                        subscriber.onError(e);
                    }
                    subscriber.onCompleted();
                });
            } catch (Exception e) {
                logger.error(e);
            }

        });
    });
} 

这是我当前的实现,无论自上次调用以来经过了多少时间,它都使用 1 秒 sleep 。最初我尝试使用 ScheduledExecutorService 并计算延迟时间,以便我将以每秒 1 次的速率准确提交请求。但是,在这两种情况下,它通常都无法满足速率限制,我收到了两个请求,一个接一个地立即提交。

我的假设是某个地方的请求被传递到一个不同的执行队列,该队列正在被一个不同的线程连续轮询,所以如果由于某种原因该线程很忙并且队列中同时存在两个请求,它们将按顺序执行,但不会延迟。

任何想法如何解决这个问题?也许是不同的方法?

最佳答案

我会使用简单的 Vertx 事件总线和一个队列,你每秒从中轮询:

 public static void main(String[] args) {


    Vertx vertx = Vertx.vertx();

    vertx.deployVerticle(new DebounceVerticle(), (r) -> {

        // Ok, verticle is ready!
        // Request to send 10 events in 1 second
        for (int i = 0; i < 10; i++) {
            vertx.eventBus().publish("call", UUID.randomUUID().toString());
        }
    });


}


private static class DebounceVerticle extends AbstractVerticle {

    HttpClient client;
    @Override
    public void start() {
        client = vertx.createHttpClient();

        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        vertx.eventBus().consumer("call", (payload) -> {
            String message = (String) payload.body();
            queue.add(message);
            System.out.println(String.format("I got %s but I don't know when it will be executed", message));
        });

        vertx.setPeriodic(1000, (l) -> {
            String message = queue.poll();

            if (message != null) {
                System.out.println(String.format("I'm finally sending %s", message));

                //Do your client magic
            }
        });
    }
}

关于http - 使用 RxJava 使用 Async Jersey HTTP Client 限制传出的 HTTP 请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40357996/

相关文章:

python字典键错误无法解决

java - Eclipse Kura http 身份验证失败

javascript - fetch() 不发送标题?

java - 如何为测试类中的所有测试启动一次 Jersey 测试容器(Grizzly)?

java - Jersey /JAX-RS : How to cascade beans-validation recursively with @Valid automatically?

没有实例的 Java 枚举

java - 如何监控 HttpURLConnection PUT 进程

java - Swagger JAX-RS 与 Jersey 1.19 给出 'Conflicting URI templates' 错误

android - RxJava中的特殊注释

rx-java - 使用 RxJava 刷新透明身份验证 token