java 多线程ExecutorService

标签 java multithreading performance parallel-processing executorservice

我正在尝试为 lucene 索引创建多线程。假设我有一个从 101 到 999 的数字列表。我想要实现的是将这个列表分成不同的组,并且每个组由不同的线程处理。

我可以将其分区到不同的组,但是,在运行多线程时,在这种情况下所有线程仅采用第一个分区(l[0])。 例如,上面的代码创建了 3 个线程( t1,t2, t3),并且我还有三个分区(p1,p2,p3),其中 p1 [101,400],p2[401,600],p3[601,999]。假设,t1 在 p1 上运行,t2 在 p2 上运行,t3 在 p3 上运行。但现在,所有线程都在 p1 上运行。

有谁知道怎么解决吗?

根据蒂姆的建议添加了更改,但结果仍然相同

final int BLOCK_SIZE = 1; 
AtomicInteger nextBlock = new AtomicInteger(0);
 int blockToProcess =  nextBlock.getAndIncrement(); 
 int endBlocks = (blockToProcess+partitions.size())*BLOCK_SIZE; 
 for(int i=BLOCK_SIZE*blockToProcess;i<endBlocks;i++)
 { 
  Myclass it=new Myclass(l);
  todo.add( Executors.callable(it));
  } 
 taskExecutor.invokeAll(todo);

对于分区,我使用了 Guava 库并通过输出确认,因此分区似乎没问题

我也尝试了以下操作,结果仍然相同。事实上,以下是我最初拥有的。

 for(int i=0;i<partitions.size;i++)
{ 
Myclass it=new Myclass(partitions.get(i)); 
taskExecutor.execute(it);
 } 
<小时/>

为了让事情变得更简单、更干净,我刚刚创建了全新的测试文件,如下所示 有两个类:Test 和 TestThreads 对于测试类

    public class Test {

public static void main(String[] args) throws InterruptedException {
    // TODO Auto-generated method stub

    int numOfthreads=2;
    List<String> originalList=new ArrayList<String>();

    for(int i=0;i<20;i++)
    {
        originalList.add(Integer.toString(i));
    }

    int partitionSize = IntMath.divide(originalList.size(), numOfthreads, RoundingMode.UP);
    List<List<String>> partitions=Lists.partition(originalList, partitionSize);
    List<Callable<Object>> todo = new ArrayList<Callable<Object>>();
    int count=0;
    ExecutorService taskExecutor = Executors.newFixedThreadPool(numOfthreads);

      for(int i=0;i<partitions.size();i++)
      { 
          TestThreads  it=new TestThreads(partitions.get(i));

            todo.add( Executors.callable(it));
             System.out.println("Created thread " +count+", containing: "
             +partitions.get(i).size()+" files\n");
             for(String s:partitions.get(i))
             {
                 System.out.print(s+" ");
             }
             count++;
             System.out.println("\n");
         }

        taskExecutor.invokeAll(todo);

}

 }

对于 TestThreads 类:

   public class TestThreads implements Runnable {
private static List<String> lis;
public TestThreads(List<String> list)
{
    lis=list;
}
   public void run()
   {
System.out.println("This is thread "+Thread.currentThread().getId());

System.out.println("-----------------------------------------");
for(String s:lis)
{
    System.out.println(s);
}
   }
   }

更新:

这是导致问题的静态列表,删除它后一切正常。感谢 Tim 和 Xiezi 的时间和帮助!

最佳答案

您需要一种方法将 block 分配给不同的线程。

最简单的方法可能是使用 AtomicInteger 来存储需要处理的“下一个 block ”。每个线程对 nextBlock 整数调用 getAndIncrement(),然后处理相关 block 。

例如:

static final int BLOCK_SIZE = 100;
AtomicInteger nextBlock = new AtomicInteger(0);


private class Processor implements Runnable {
     public void run() {
          int blockToProcess = nextBlock.getAndIncrement();
          int end = (blockToProcess+1)*BLOCK_SIZE;

          for (int i=BLOCK_SIZE*blockToProcess;i<end;i++) {
              process(data[i]);
          }
     }
}

所有代码均来自内存,因此可能有一些需要更正的拼写错误/方法名称等。

重要的一点是 AtomicInteger 的使用,这确保了线程安全,每个线程都被分配了一个不同的 block 来处理。

我本想将其添加到您的问题中,但在这里就可以了。您尝试这样做,但这没有满足您的需要:

final int BLOCK_SIZE = 1; 
AtomicInteger nextBlock = new AtomicInteger(0);
 int blockToProcess =  nextBlock.getAndIncrement(); 
 int endBlocks = (blockToProcess+partitions.size())*BLOCK_SIZE; 
 for(int i=BLOCK_SIZE*blockToProcess;i<endBlocks;i++)
 { 
  Myclass it=new Myclass(l);
  todo.add( Executors.callable(it));
  } 
 taskExecutor.invokeAll(todo);

这里实际上有两个选择 - 因为您是提前创建任务,所以您可以告诉每个任务此时要处理什么。我建议每个任务在执行时选择下一个 block 的方式。

要按照您在这里尝试的方式执行此操作,您可以忘记 AtomicInteger 并执行以下操作:

 for(int i=0;i<partitions.size;i++)
 { 
    Myclass it=new Myclass(partitions.get(i)); // or just MyClass(i) and then MyClass pulls out the list from partitions
    taskExecutor.execute(it);
  } 

哪里

 public class MyClass implements Runnable {
       List<String> toProcess;
       MyClass(List<String> toProcess) {
           this.toProcess = toProcess;
       }


       @override
       public void run() {
           // Process the list
       }
 }

 public class MyClass implements Runnable {
       int toProcess;
       MyClass(int toProcess) {
           this.toProcess = toProcess;
       }


       @override
       public void run() {
           // Process the list
           List<String> list = partitions.get(toProcess);
       }
 }

关于java 多线程ExecutorService,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20675536/

相关文章:

javascript - JS手机: how many memory I have available

java - 尝试将实例转换为 PersistenceCapable 失败。确保它已得到增强

multithreading - Scala Stream尾部懒惰和同步

mongodb - 批量读取 Mongo DB 的最佳方式 - 有这样的东西吗?

algorithm - 比较海量数据的最佳算法

multithreading - .NET 线程面试问题

java - 在玻璃板上喷漆,无需重新喷漆其他部件

java - 从查询中填充 Jtable

java - 为 Spring AOP 切面提供超时执行

c# - 在 C# 控制台应用程序中更改线程上下文