java - 如何从每个线程的数组中读取唯一元素?

标签 java multithreading concurrency scalability

我有一个基于数组的对象,它实现了以下接口(interface):

public interface PairSupplier<Q, E> {
     public int size();

     public Pair<Q, E> get(int index);
}

我想在它上面创建一个特定的迭代器:

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     //some magic
}

在方法next 中,我想从PairSupplier 返回一些元素。

这个元素对于线程应该是唯一的,其他线程不应该有这个元素。

由于 PairSupplier 有最终大小,这种情况并不总是可能的,但我想接近它。

元素的顺序无关紧要,线程可以在不同时间获取相同的元素。

示例:2 个线程5 个元素 - {1,2,3,4,5}

Thread 1  | Thread 2
   1           2
   3           4
   5           1
   3           2
   4           5

我的解决方案:

我创建了 AtomicInteger 索引,我会在每次下一次 调用时增加它。

PairSupplier pairs;
AtomicInteger index;

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     int position = index.incrementAndGet() % pairs.size;
     if (position < 0) {
          position *= -1;
          position = pairs.size - position;
     }
     return pairs.get(position);
}

索引在所有线程之间共享。

我发现这个解决方案不可扩展(因为所有线程都是递增的),也许有人有更好的主意?

50-1000 个线程 将使用此迭代器。

最佳答案

您的问题详细信息不明确 - 您的示例表明两个线程可以处理相同的 Pair但你在描述中另有说明。

由于比较难实现,我会提供一个Iterable<Pair<Q,E>>这将交付 Pair每个线程一个,直到供应商循环 - 然后它会重复。

public interface Supplier<T> {
  public int size();

  public T get(int index);

}

public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}

public class IterableSupplier<T> implements Iterable<T> {
  // The common supplier to use across all threads.
  final Supplier<T> supplier;
  // The atomic counter.
  final AtomicInteger i = new AtomicInteger();

  public IterableSupplier(Supplier<T> supplier) {
    this.supplier = supplier;
  }

  @Override
  public Iterator<T> iterator() {
    /**
     * You may create a NEW iterator for each thread while they all share supplier
     * and Will therefore distribute each Pair between different threads.
     *
     * You may also share the same iterator across multiple threads.
     *
     * No two threads will get the same pair twice unless the sequence cycles.
     */
    return new ThreadSafeIterator();
  }

  private class ThreadSafeIterator implements Iterator<T> {
    @Override
    public boolean hasNext() {
      /**
       * Always true.
       */
      return true;
    }

    private int pickNext() {
      // Just grab one atomically.
      int pick = i.incrementAndGet();
      // Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
      int actual = pick % supplier.size();
      if (pick != actual) {
        // So long as someone has a success before we overflow int we're good.
        i.compareAndSet(pick, actual);
      }
      return actual;
    }

    @Override
    public T next() {
      return supplier.get(pickNext());
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("Remove not supported.");
    }

  }

}

注意:我对代码进行了一些调整以适应这两种情况。你可以拍一个Iterator每个线程或共享单个 Iterator跨线程。

关于java - 如何从每个线程的数组中读取唯一元素?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19382345/

相关文章:

java - 更新 Android 应用中的原始资源

java - 如何将非并行junit参数化测试转为并行运行

java - Spring Boot @Value 没有被填充..为什么?

java - @PostConstruct 和 "No Hibernate Session bound to thread"异常

java - Spring Security - 在重定向到登录时保留 URL 参数

Python:如何在其中一个线程因错误而中断后添加新线程

java - Java游戏滞后

java - 500个Worker Threads,什么样的线程池?

java - 并发应用程序不如单线程快

database - 具有并发写入功能的 SQLite 替代方案(Delphi)