假设,我有一个从 HTTP API 获取数据的方法
public R getResource(String id){
//HTTP call to
return fetch("http://example.com/api/id")
}
但是 http://example.org/api/一次支持多个 ID http://example.org/api/id1,id2,id3
在多线程环境中,我想阻塞,直到收集到“m”个 id,然后一次性从 API 获取数据。
此外,为了避免无限/长 block ,应该有一个等待超时。
对于 m=5 假设有 20 个线程同时到达来调用此方法,那么应该向 HTTP api 发送 4 批请求。
支持此批处理的任何实现建议或现有框架。
欢迎编辑建议。
最佳答案
使用 BlockingQueue
线程正在执行 BlockingQueue::poll(long timeout, TimeUnit unit)
例如计算超时,以便第一个请求等待的时间不会超过某个固定的持续时间。
轮询线程将从自己的列表中的队列中收集 ID,直到它具有 m
已达到 ID 或最长等待时间。这样的线程应该只有一个。
在上面的列表中,应该有包含 ID 和 CompletableFuture<R>
的条目。 ,使用调用结果完成。 future 是你给来电者的。您可能想使用 Map<String, CompletableFuture<R>>
而不是列表。因此,与请求完成相比,您可以轻松完成 future 。实际上,队列也应该包含 future,这样你就可以将它返回给调用者。
粗略的草图:
class ResourceMultigetter<R> {
private final BlockingQueue<Map.Entry<String, CompletableFuture<R>>> newEntries = ...;
private final Map<String, CompletableFuture<R>> collected = ...;
private long millisOfFirstWaitingRequest;
private volatile boolean stopped;
class Processor implements Runnable {
@Override
public void run() { // run by the polling thread
while (!stopped) {
final Map.Entry<String, CompletableFuture<R>> e = newEntries.poll(....);
if (e == null) {
if (!timeHasElapsed()) continue;
} else {
if (collected.isEmpty()) {
millisOfFirstWaitingRequest = System.currentTimeMillis();
}
collected.put(e.getKey(), e.getValue());
if (collected.size() < m && !timeHasElapsed()) continue;
}
final List<String> processedIds = callTheServer();
processedIds.forEach(id -> collected.remove(id));
}
}
}
public CompletableFuture<R> enqueue(String id) {
final CompletableFuture<R> result = new CompletableFuture<>();
newEntries.add(new AbstractMap.SimpleImmutableEntry<>(id, result));
return result;
}
}
你可以像这样初始化它
ResourceMultigetter resourceMultigetter = new ResourceMultigetter();
new Thread(resourceMultigetter.new Processor()).start();
客户端代码会执行类似的操作
R r = resourceMultigetter.enqueue(id); // this blocks
关于java - N+1 HTTP 调用通过同时队列进行批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58960660/