java - N+1 HTTP 调用通过同时队列进行批处理

标签 java multithreading optimization concurrency reactive

假设,我有一个从 HTTP API 获取数据的方法

public R getResource(String id){
      //HTTP call to 
      return fetch("http://example.com/api/id")
}

但是 http://example.org/api/一次支持多个 ID http://example.org/api/id1,id2,id3

在多线程环境中,我想阻塞,直到收集到“m”个 id,然后一次性从 API 获取数据。

此外,为了避免无限/长 block ,应该有一个等待超时。

对于 m=5 假设有 20 个线程同时到达来调用此方法,那么应该向 HTTP api 发送 4 批请求。

支持此批处理的任何实现建议或现有框架。

欢迎编辑建议。

最佳答案

使用 BlockingQueue线程正在执行 BlockingQueue::poll(long timeout, TimeUnit unit)例如计算超时,以便第一个请求等待的时间不会超过某个固定的持续时间。

轮询线程将从自己的列表中的队列中收集 ID,直到它具有 m已达到 ID 或最长等待时间。这样的线程应该只有一个。

在上面的列表中,应该有包含 ID 和 CompletableFuture<R> 的条目。 ,使用调用结果完成。 future 是你给来电者的。您可能想使用 Map<String, CompletableFuture<R>> 而不是列表。因此,与请求完成相比,您可以轻松完成 future 。实际上,队列也应该包含 future,这样你就可以将它返回给调用者。

粗略的草图:

class ResourceMultigetter<R> {
    private final BlockingQueue<Map.Entry<String, CompletableFuture<R>>> newEntries = ...;

    private final Map<String, CompletableFuture<R>> collected = ...;

    private long millisOfFirstWaitingRequest;

    private volatile boolean stopped;

    class Processor implements Runnable {
        @Override
        public void run() { // run by the polling thread
            while (!stopped) {
                final Map.Entry<String, CompletableFuture<R>> e = newEntries.poll(....);
                if (e == null) {
                    if (!timeHasElapsed()) continue;
                } else {
                    if (collected.isEmpty()) {
                        millisOfFirstWaitingRequest = System.currentTimeMillis();
                    }
                    collected.put(e.getKey(), e.getValue());
                    if (collected.size() < m && !timeHasElapsed()) continue;

                }
                final List<String> processedIds = callTheServer();
                processedIds.forEach(id -> collected.remove(id));
            }
        }
    }

    public CompletableFuture<R> enqueue(String id) {
        final CompletableFuture<R> result = new CompletableFuture<>();
        newEntries.add(new AbstractMap.SimpleImmutableEntry<>(id, result));
        return result;
    }
}

你可以像这样初始化它

ResourceMultigetter resourceMultigetter = new ResourceMultigetter();
new Thread(resourceMultigetter.new Processor()).start();

客户端代码会执行类似的操作

R r = resourceMultigetter.enqueue(id); // this blocks

关于java - N+1 HTTP 调用通过同时队列进行批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58960660/

相关文章:

java - 动态添加按钮到 fingerpaint android api 演示

java - 如何同步围绕方法定义的 Java 方面?

Java多线程解析txt文件

optimization - Haskell 中的严格性优化和内存分配

java - 如何在 HashMap 中查找对象而不引用实际对象? (从文本文件加载有向图)

java - Spring Security 在 Controller 中使用@security 注释

ios - 为什么我的 firebase 数据库不等待更新值

linux - 多线程和 SMP Linux

python - 向量化python中矩阵中所有组合的角度计算

php - 安全使用 PHP ini_set "memory_limit"