java - API后端如何实现线程的公平使用策略?

标签 java multithreading algorithm concurrency producer-consumer

我正在用 Java 构建一个 REST API,我会将其暴露给外界。调用 API 的人必须注册并在请求中发送他们的用户 ID。 最多有 10 个并发线程可用于执行 API 请求。我正在维护一个队列,其中包含所有要服务的请求 ID(数据库条目的主键)。 我需要实现一些公平使用政策如下 - 如果队列中有超过 10 个作业(即超过最大线程数),则用户一次只能执行一个请求(他/她提交的其他请求,如果有的话,将保留在队列中并且只有在他之前的请求完成执行后才会被处理)。如果有空闲线程,即即使将线程分配给不同用户提交的请求后,线程池中剩余的线程也可以分配给剩余的请求(即使提交请求的用户已经持有一个线程)那一刻)。

目前的实现如下——

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;


public class APIJobExecutor implements Runnable{
    private static PriorityBlockingQueue<Integer> jobQueue = new PriorityBlockingQueue<Integer>();
    private static ExecutorService jobExecutor = Executors.newCachedThreadPool();
    private static final int MAX_THREADS = 10;
    private static Semaphore sem = new Semaphore(MAX_THREADS, true);

    private APIJobExecutor(){

    }

    public static void addJob(int jobId)
    {
        if(!jobQueue.contains(jobId)){
            jobQueue.add(new Integer(jobId));
        }
    }

    public void run()
    {
        while (true) {
            try {
                sem.acquire();
            }catch (InterruptedException e1) {
                e1.printStackTrace();
                //unable to acquire lock. retry.
                continue;
            }
            try {
                Integer jobItem = jobQueue.take();
                jobExecutor.submit(new APIJobService(jobItem));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                sem.release();
            }
        }
    }
}

编辑: 是否有任何开箱即用的 Java 数据结构可以为我提供此功能。如果没有,我该如何实现?

最佳答案

这是一个相当常见的“服务质量”模式,可以使用存储桶 idea 来解决在作业队列中。我不知道此模式的标准 Java 实现和/或数据结构(可能是 PriorityQueue ?),但至少应该有几个实现 available (如果您找到合适的,请告诉我们)。

我确实曾经创建过自己的实现,并且我尝试将其与项目分离,以便您可以修改和使用它(添加单元测试!)。一些注意事项:

  • 在不需要 QoS 的情况下使用默认队列(例如,如果执行的作业少于 10 个)。
  • 基本思想是根据 QoS 键(例如用户名)将任务存储在列表中,并维护一个单独的“下一个是谁”列表。
  • 它旨在用于作业队列(例如,APIJobExecutor 的一部分,而不是替代品)。作业队列的部分职责是在任务执行后始终调用 remove(taskId)
  • 不应存在内存泄漏:如果队列中没有任务/作业,则所有内部映射和列表都应为空。

代码:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;

import org.slf4j.*;

/** A FIFO task queue. */
public class QosTaskQueue<TASKTYPE, TASKIDTYPE> {
private static final Logger log = LoggerFactory.getLogger(QosTaskQueue.class);

public static final String EMPTY_STRING = "";

/** Work tasks queued which have no (relevant) QoS key. */
private final ConcurrentLinkedQueue<TASKIDTYPE> defaultQ = new ConcurrentLinkedQueue<TASKIDTYPE>();

private final AtomicInteger taskQSize = new AtomicInteger();

private final Map<TASKIDTYPE, TASKTYPE> queuedTasks = new ConcurrentHashMap<TASKIDTYPE, TASKTYPE>();

/** Amount of tasks in queue before "quality of service" distribution kicks in. */
private int qosThreshold = 10;

/** Indicates if "quality of service" distribution is in effect. */
private volatile boolean usingQos;

/**
 * Lock for all modifications to Qos-queues.
 * <br>Must be "fair" to ensure adding does not block polling threads forever and vice versa. 
 */
private final ReentrantLock qosKeyLock = new ReentrantLock(true); 

/*
 * Since all QoS modifications can be done by multiple threads simultaneously, 
 * there is never a good time to add or remove a Qos-key with associated queue. 
 * There is always a chance that a key is added while being removed and vice versa.
 * The simplest solution is to make everything synchronized, which is what qosKeyLock is used for.
 */
private final Map<String, Queue<TASKIDTYPE>> qosQueues = new HashMap<String, Queue<TASKIDTYPE>>();
private final Queue<String> qosTurn = new LinkedList<String>();

public boolean add(TASKTYPE wt, TASKIDTYPE taskId, String qosKey) {

    if (queuedTasks.containsKey(taskId))  {
        throw new IllegalStateException("Task with ID [" + taskId + "] already enqueued.");
    }
    queuedTasks.put(taskId, wt);
    return addToQ(taskId, qosKey);
}

public TASKTYPE poll() {

    TASKIDTYPE taskId = pollQos();
    return (taskId == null ? null : queuedTasks.get(taskId));
}

/**
 * This method must be called after a task is taken from the queue
 * using {@link #poll()} and executed.
 */
public TASKTYPE remove(TASKIDTYPE taskId) {

    TASKTYPE wt = queuedTasks.remove(taskId);
    if (wt != null) {
        taskQSize.decrementAndGet();
    }
    return wt;
}

private boolean addToQ(TASKIDTYPE taskId, String qosKey) {

    if (qosKey == null || qosKey.equals(EMPTY_STRING) || size() < getQosThreshold()) {
        defaultQ.add(taskId);
    } else {
        addSynced(taskId, qosKey);
    }
    taskQSize.incrementAndGet();
    return true;
}

private void addSynced(TASKIDTYPE taskId, String qosKey) {

    qosKeyLock.lock();
    try {
        Queue<TASKIDTYPE> qosQ = qosQueues.get(qosKey); 
        if (qosQ == null) {
            if (!isUsingQos()) {
                // Setup QoS mechanics
                qosTurn.clear();
                qosTurn.add(EMPTY_STRING);
                usingQos = true;
            }
            qosQ = new LinkedList<TASKIDTYPE>();
            qosQ.add(taskId);
            qosQueues.put(qosKey, qosQ);
            qosTurn.add(qosKey);
            log.trace("Created QoS queue for {}", qosKey);
        } else { 
            qosQ.add(taskId);
            if (log.isTraceEnabled()) {
                log.trace("Added task to QoS queue {}, size: " + qosQ.size(), qosKey);
            }
        }
    } finally {
        qosKeyLock.unlock();
    }
}

private TASKIDTYPE pollQos() {

    TASKIDTYPE taskId = null;
    qosKeyLock.lock();
    try {
        taskId = pollQosRecursive();
    } finally {
        qosKeyLock.unlock();
    }
    return taskId;
}

/**
 * Poll the work task queues according to qosTurn.
 * Recursive in case empty QoS queues are removed or defaultQ is empty.  
 * @return
 */
private TASKIDTYPE pollQosRecursive() {

    if (!isUsingQos()) {
        // QoS might have been disabled before lock was released or by this recursive method.
        return defaultQ.poll();
    }
    String qosKey = qosTurn.poll();
    Queue<TASKIDTYPE> qosQ = (qosKey.equals(EMPTY_STRING) ? defaultQ : qosQueues.get(qosKey));
    TASKIDTYPE taskId = qosQ.poll();
    if (qosQ == defaultQ) {
        // DefaultQ should always be checked, even if it was empty
        qosTurn.add(EMPTY_STRING);
        if (taskId == null) {
            taskId = pollQosRecursive();
        } else {
            log.trace("Removed task from defaultQ.");
        }
    } else {
        if (taskId == null) {
            qosQueues.remove(qosKey);
            if (qosQueues.isEmpty()) {
                usingQos = false;
            }
            taskId = pollQosRecursive();
        } else {
            qosTurn.add(qosKey);
            if (log.isTraceEnabled()) {
                log.trace("Removed task from QoS queue {}, size: " + qosQ.size(), qosKey);
            }
        }
    }
    return taskId;
}

@Override
public String toString() {

    StringBuilder sb = new StringBuilder(this.getClass().getName());
    sb.append(", size: ").append(size());
    sb.append(", number of QoS queues: ").append(qosQueues.size());
    return sb.toString();
}

public boolean containsTaskId(TASKIDTYPE wid) {
    return queuedTasks.containsKey(wid);
}

public int size() { 
    return taskQSize.get(); 
}

public void setQosThreshold(int size) {
    this.qosThreshold = size;
}

public int getQosThreshold() {
    return qosThreshold;
}

public boolean isUsingQos() {
    return usingQos;
}

}

关于java - API后端如何实现线程的公平使用策略?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26709595/

相关文章:

java - 找不到项目 'org.springframework.boot:spring-boot-starter-parent:2.4.0'

java - 是否会陷入僵局?

c# - 如果在 C# 中花费的时间太长,如何终止线程?

java - 在 Java 中处理 alpha 混合

algorithm - 这个断词算法的时间复杂度是多少? (动态规划)

java - 解释 JMX URL

java - 用字符串android中的下划线替换所有句号

java - JPA:删除的外键约束

java - 线程数增加时的高延迟

algorithm - 递增地计算图中的桥