java - 如何实现异步队列?

标签 java concurrency java.util.concurrent

给定以下队列变化:

interface AsyncQueue<T> {
    //add new element to the queue
    void add(T elem); 
    //request single element from the queue via callback
    //callback will be called once for single polled element when it is available
    //so, to request multiple elements, poll() must be called multiple times with (possibly) different callbacks
    void poll(Consumer<T> callback);
}

我发现我不知道如何使用 java.util.concurrent 原语来实现它!所以问题是:

  • 使用 java.util.concurrent 包实现它的正确方法是什么?
  • 是否可以不使用额外的线程池来做到这一点?

最佳答案

您的 AsyncQueueBlockingQueue 非常相似,例如 ArrayBlockingQueue。返回的 Future 将简单地委托(delegate)给 ArrayBlockingQueue 方法。例如,Future.get 会调用 blockingQueue.poll


至于你的更新,我假设调用 add 的线程应该在有一个等待时调用回调?如果是这样,那么为元素创建一个队列和为回调创建一个队列就是一项简单的任务。

  • 添加时,检查是否有回调等待,然后调用它,否则将元素放入元素队列
  • 轮询时,检查是否有元素在等待,然后用该元素调用回调,否则将回调放入回调队列

代码大纲:

class AsyncQueue<E> {

    Queue<Consumer<E>> callbackQueue = new LinkedList<>();
    Queue<E> elementQueue = new LinkedList<>();

    public synchronized void add(E e) {
        if (callbackQueue.size() > 0)
            callbackQueue.remove().accept(e);
        else
            elementQueue.offer(e);
    }

    public synchronized void poll(Consumer<E> c) {
        if (elementQueue.size() > 0)
            c.accept(elementQueue.remove());
        else
            callbackQueue.offer(c);
    }
}

关于java - 如何实现异步队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28720291/

相关文章:

java - 类似于创建对象而不是方法的匿名函数?

concurrency - 什么是Phusion旅客PassengerMaxRequestQueueSize的最佳值

java - 在java并发环境中使用什么是最好的队列缓存

java - Java HashSet 是只读的线程安全的吗?

java - 何时在 java 多线程中重置 CyclicBarrier

java - 执行器 - 需要 LinkedBlockingQueue

java - 不会说英语以外的语言

java - 如何判断类的类型并通过fromJson方法括号传递给gson

java - 附加控件+字符串java中的一个字符

Java 7 : How to execute parallel tasks in batches?