我正在用 Java 构建一个 REST API,我会将其暴露给外界。调用 API 的人必须注册并在请求中发送他们的用户 ID。 最多有 10 个并发线程可用于执行 API 请求。我正在维护一个队列,其中包含所有要服务的请求 ID(数据库条目的主键)。 我需要实现一些公平使用政策如下 - 如果队列中有超过 10 个作业(即超过最大线程数),则用户一次只能执行一个请求(他/她提交的其他请求,如果有的话,将保留在队列中并且只有在他之前的请求完成执行后才会被处理)。如果有空闲线程,即即使将线程分配给不同用户提交的请求后,线程池中剩余的线程也可以分配给剩余的请求(即使提交请求的用户已经持有一个线程)那一刻)。
目前的实现如下——
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
public class APIJobExecutor implements Runnable{
private static PriorityBlockingQueue<Integer> jobQueue = new PriorityBlockingQueue<Integer>();
private static ExecutorService jobExecutor = Executors.newCachedThreadPool();
private static final int MAX_THREADS = 10;
private static Semaphore sem = new Semaphore(MAX_THREADS, true);
private APIJobExecutor(){
}
public static void addJob(int jobId)
{
if(!jobQueue.contains(jobId)){
jobQueue.add(new Integer(jobId));
}
}
public void run()
{
while (true) {
try {
sem.acquire();
}catch (InterruptedException e1) {
e1.printStackTrace();
//unable to acquire lock. retry.
continue;
}
try {
Integer jobItem = jobQueue.take();
jobExecutor.submit(new APIJobService(jobItem));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
sem.release();
}
}
}
}
编辑: 是否有任何开箱即用的 Java 数据结构可以为我提供此功能。如果没有,我该如何实现?
最佳答案
这是一个相当常见的“服务质量”模式,可以使用存储桶 idea 来解决在作业队列中。我不知道此模式的标准 Java 实现和/或数据结构(可能是 PriorityQueue ?),但至少应该有几个实现 available (如果您找到合适的,请告诉我们)。
我确实曾经创建过自己的实现,并且我尝试将其与项目分离,以便您可以修改和使用它(添加单元测试!)。一些注意事项:
- 在不需要 QoS 的情况下使用默认队列(例如,如果执行的作业少于 10 个)。
- 基本思想是根据 QoS 键(例如用户名)将任务存储在列表中,并维护一个单独的“下一个是谁”列表。
- 它旨在用于作业队列(例如,
APIJobExecutor
的一部分,而不是替代品)。作业队列的部分职责是在任务执行后始终调用remove(taskId)
。 - 不应存在内存泄漏:如果队列中没有任务/作业,则所有内部映射和列表都应为空。
代码:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
import org.slf4j.*;
/** A FIFO task queue. */
public class QosTaskQueue<TASKTYPE, TASKIDTYPE> {
private static final Logger log = LoggerFactory.getLogger(QosTaskQueue.class);
public static final String EMPTY_STRING = "";
/** Work tasks queued which have no (relevant) QoS key. */
private final ConcurrentLinkedQueue<TASKIDTYPE> defaultQ = new ConcurrentLinkedQueue<TASKIDTYPE>();
private final AtomicInteger taskQSize = new AtomicInteger();
private final Map<TASKIDTYPE, TASKTYPE> queuedTasks = new ConcurrentHashMap<TASKIDTYPE, TASKTYPE>();
/** Amount of tasks in queue before "quality of service" distribution kicks in. */
private int qosThreshold = 10;
/** Indicates if "quality of service" distribution is in effect. */
private volatile boolean usingQos;
/**
* Lock for all modifications to Qos-queues.
* <br>Must be "fair" to ensure adding does not block polling threads forever and vice versa.
*/
private final ReentrantLock qosKeyLock = new ReentrantLock(true);
/*
* Since all QoS modifications can be done by multiple threads simultaneously,
* there is never a good time to add or remove a Qos-key with associated queue.
* There is always a chance that a key is added while being removed and vice versa.
* The simplest solution is to make everything synchronized, which is what qosKeyLock is used for.
*/
private final Map<String, Queue<TASKIDTYPE>> qosQueues = new HashMap<String, Queue<TASKIDTYPE>>();
private final Queue<String> qosTurn = new LinkedList<String>();
public boolean add(TASKTYPE wt, TASKIDTYPE taskId, String qosKey) {
if (queuedTasks.containsKey(taskId)) {
throw new IllegalStateException("Task with ID [" + taskId + "] already enqueued.");
}
queuedTasks.put(taskId, wt);
return addToQ(taskId, qosKey);
}
public TASKTYPE poll() {
TASKIDTYPE taskId = pollQos();
return (taskId == null ? null : queuedTasks.get(taskId));
}
/**
* This method must be called after a task is taken from the queue
* using {@link #poll()} and executed.
*/
public TASKTYPE remove(TASKIDTYPE taskId) {
TASKTYPE wt = queuedTasks.remove(taskId);
if (wt != null) {
taskQSize.decrementAndGet();
}
return wt;
}
private boolean addToQ(TASKIDTYPE taskId, String qosKey) {
if (qosKey == null || qosKey.equals(EMPTY_STRING) || size() < getQosThreshold()) {
defaultQ.add(taskId);
} else {
addSynced(taskId, qosKey);
}
taskQSize.incrementAndGet();
return true;
}
private void addSynced(TASKIDTYPE taskId, String qosKey) {
qosKeyLock.lock();
try {
Queue<TASKIDTYPE> qosQ = qosQueues.get(qosKey);
if (qosQ == null) {
if (!isUsingQos()) {
// Setup QoS mechanics
qosTurn.clear();
qosTurn.add(EMPTY_STRING);
usingQos = true;
}
qosQ = new LinkedList<TASKIDTYPE>();
qosQ.add(taskId);
qosQueues.put(qosKey, qosQ);
qosTurn.add(qosKey);
log.trace("Created QoS queue for {}", qosKey);
} else {
qosQ.add(taskId);
if (log.isTraceEnabled()) {
log.trace("Added task to QoS queue {}, size: " + qosQ.size(), qosKey);
}
}
} finally {
qosKeyLock.unlock();
}
}
private TASKIDTYPE pollQos() {
TASKIDTYPE taskId = null;
qosKeyLock.lock();
try {
taskId = pollQosRecursive();
} finally {
qosKeyLock.unlock();
}
return taskId;
}
/**
* Poll the work task queues according to qosTurn.
* Recursive in case empty QoS queues are removed or defaultQ is empty.
* @return
*/
private TASKIDTYPE pollQosRecursive() {
if (!isUsingQos()) {
// QoS might have been disabled before lock was released or by this recursive method.
return defaultQ.poll();
}
String qosKey = qosTurn.poll();
Queue<TASKIDTYPE> qosQ = (qosKey.equals(EMPTY_STRING) ? defaultQ : qosQueues.get(qosKey));
TASKIDTYPE taskId = qosQ.poll();
if (qosQ == defaultQ) {
// DefaultQ should always be checked, even if it was empty
qosTurn.add(EMPTY_STRING);
if (taskId == null) {
taskId = pollQosRecursive();
} else {
log.trace("Removed task from defaultQ.");
}
} else {
if (taskId == null) {
qosQueues.remove(qosKey);
if (qosQueues.isEmpty()) {
usingQos = false;
}
taskId = pollQosRecursive();
} else {
qosTurn.add(qosKey);
if (log.isTraceEnabled()) {
log.trace("Removed task from QoS queue {}, size: " + qosQ.size(), qosKey);
}
}
}
return taskId;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(this.getClass().getName());
sb.append(", size: ").append(size());
sb.append(", number of QoS queues: ").append(qosQueues.size());
return sb.toString();
}
public boolean containsTaskId(TASKIDTYPE wid) {
return queuedTasks.containsKey(wid);
}
public int size() {
return taskQSize.get();
}
public void setQosThreshold(int size) {
this.qosThreshold = size;
}
public int getQosThreshold() {
return qosThreshold;
}
public boolean isUsingQos() {
return usingQos;
}
}
关于java - API后端如何实现线程的公平使用策略?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26709595/