java - 异步迭代器

标签 java multithreading asynchronous concurrency

我有以下代码:

while(slowIterator.hasNext()) {
  performLengthTask(slowIterator.next());
}

因为迭代器和任务都很慢,所以将它们放在单独的线程中是有意义的。这是对迭代器包装器的快速而肮脏的尝试:

class AsyncIterator<T> implements Iterator<T> {
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);

    private AsyncIterator(final Iterator<T> delegate) {
      new Thread() {
        @Override
        public void run() {
          while(delegate.hasNext()) {
            queue.put(delegate.next()); // try/catch removed for brevity
          }
        }
      }.start();
    }

    @Override
    public boolean hasNext() {
      return true;
    }

    @Override
    public T next() {
        return queue.take(); // try/catch removed for brevity
    }
    // ... remove() throws UnsupportedOperationException
  }

但是这个实现缺乏对“hasNext()”的支持。 hasNext() 方法当然可以阻塞,直到它知道是否返回 true 为止。我可以在我的 AsyncIterator 中有一个 peek 对象,我可以更改 hasNext() 以从队列中取出一个对象并让 next() 返回这个 peek。但是,如果已到达委托(delegate)迭代器的末尾,这将导致 hasNext() 无限期阻塞。

我当然可以自己进行线程通信,而不是使用 ArrayBlockingQueue:

private static class AsyncIterator<T> implements Iterator<T> {

  private final Queue<T> queue = new LinkedList<T>();
  private boolean delegateDone = false;

  private AsyncIterator(final Iterator<T> delegate) {
    new Thread() {
      @Override
      public void run() {
        while (delegate.hasNext()) {
          final T next = delegate.next();
          synchronized (AsyncIterator.this) {
            queue.add(next);
            AsyncIterator.this.notify();
          }
        }
        synchronized (AsyncIterator.this) {
          delegateDone = true;
          AsyncIterator.this.notify();
        }
      }
    }.start();
  }

  @Override
  public boolean hasNext() {
    synchronized (this) {
      while (queue.size() == 0 && !delegateDone) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      }
    }
    return queue.size() > 0;
  }

  @Override
  public T next() {
    return queue.remove();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

然而,所有额外的同步、等待和通知并没有真正使代码更具可读性,而且很容易在某处隐藏竞争条件。

有什么更好的主意吗?

更新

是的,我知道常见的观察者/可观察模式。然而,通常的实现不会预见到数据流的结束,它们也不是迭代器。

我在这里特别想要一个迭代器,因为实际上上面提到的循环存在于外部库中,它需要一个迭代器。

最佳答案

这是一个棘手的问题,但我想这次我得到了正确的答案。 (我删除了我的第一个答案。)

答案是使用哨兵。我没有测试这段代码,为了清楚起见,我删除了 try/catches:

public class AsyncIterator<T> implements Iterator<T> {

    private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
    private T sentinel = (T) new Object();
    private T next;

    private AsyncIterator(final Iterator<T> delegate) {
        new Thread() {
            @Override
            public void run() {
                while (delegate.hasNext()) {
                    queue.put(delegate.next());
                }
                queue.put(sentinel);
            }
        }.start();
    }

    @Override
    public boolean hasNext() {
        if (next != null) {
            return true;
        }
        next = queue.take(); // blocks if necessary
        if (next == sentinel) {
            return false;
        }
        return true;
    }

    @Override
    public T next() {
        T tmp = next;
        next = null;
        return tmp;
    }

}

这里的见解是 hasNext() 需要阻塞,直到下一个项目准备就绪。它还需要某种退出条件,并且由于线程问题,它不能为此使用空队列或 boolean 标志。哨兵无需任何锁定或同步即可解决问题。

编辑:缓存“下一个”,因此可以多次调用 hasNext()。

关于java - 异步迭代器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21143996/

相关文章:

java - 我使用 union-find 数据结构实现 Kruskal 算法有什么问题。

asynchronous - 无法使用 `impl Future` 将异步函数存储在向量中

c# - Async TestInitialize 保证测试失败

java - 如何停止长时间运行的函数

java - 如何在java中单击按钮时暂停线程

javascript - 异步并行请求按顺序运行

java - jackson 设置默认 View

java - Android - 无法从 EditText 获取文本并在适配器的 ListView 的 TextViews 中设置

java - 如何通过次对角线遍历二维数组?

c++ - 将 int 和 string 参数传递给线程 C++