我正在尝试为 lucene 索引创建多线程。假设我有一个从 101 到 999 的数字列表。我想要实现的是将这个列表分成不同的组,并且每个组由不同的线程处理。
我可以将其分区到不同的组,但是,在运行多线程时,在这种情况下所有线程仅采用第一个分区(l[0])。 例如,上面的代码创建了 3 个线程( t1,t2, t3),并且我还有三个分区(p1,p2,p3),其中 p1 [101,400],p2[401,600],p3[601,999]。假设,t1 在 p1 上运行,t2 在 p2 上运行,t3 在 p3 上运行。但现在,所有线程都在 p1 上运行。
有谁知道怎么解决吗?
根据蒂姆的建议添加了更改,但结果仍然相同
final int BLOCK_SIZE = 1;
AtomicInteger nextBlock = new AtomicInteger(0);
int blockToProcess = nextBlock.getAndIncrement();
int endBlocks = (blockToProcess+partitions.size())*BLOCK_SIZE;
for(int i=BLOCK_SIZE*blockToProcess;i<endBlocks;i++)
{
Myclass it=new Myclass(l);
todo.add( Executors.callable(it));
}
taskExecutor.invokeAll(todo);
对于分区,我使用了 Guava 库并通过输出确认,因此分区似乎没问题
我也尝试了以下操作,结果仍然相同。事实上,以下是我最初拥有的。
for(int i=0;i<partitions.size;i++)
{
Myclass it=new Myclass(partitions.get(i));
taskExecutor.execute(it);
}
<小时/>
为了让事情变得更简单、更干净,我刚刚创建了全新的测试文件,如下所示 有两个类:Test 和 TestThreads 对于测试类
public class Test {
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
int numOfthreads=2;
List<String> originalList=new ArrayList<String>();
for(int i=0;i<20;i++)
{
originalList.add(Integer.toString(i));
}
int partitionSize = IntMath.divide(originalList.size(), numOfthreads, RoundingMode.UP);
List<List<String>> partitions=Lists.partition(originalList, partitionSize);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>();
int count=0;
ExecutorService taskExecutor = Executors.newFixedThreadPool(numOfthreads);
for(int i=0;i<partitions.size();i++)
{
TestThreads it=new TestThreads(partitions.get(i));
todo.add( Executors.callable(it));
System.out.println("Created thread " +count+", containing: "
+partitions.get(i).size()+" files\n");
for(String s:partitions.get(i))
{
System.out.print(s+" ");
}
count++;
System.out.println("\n");
}
taskExecutor.invokeAll(todo);
}
}
对于 TestThreads 类:
public class TestThreads implements Runnable {
private static List<String> lis;
public TestThreads(List<String> list)
{
lis=list;
}
public void run()
{
System.out.println("This is thread "+Thread.currentThread().getId());
System.out.println("-----------------------------------------");
for(String s:lis)
{
System.out.println(s);
}
}
}
更新:
这是导致问题的静态列表,删除它后一切正常。感谢 Tim 和 Xiezi 的时间和帮助!
最佳答案
您需要一种方法将 block 分配给不同的线程。
最简单的方法可能是使用 AtomicInteger
来存储需要处理的“下一个 block ”。每个线程对 nextBlock
整数调用 getAndIncrement()
,然后处理相关 block 。
例如:
static final int BLOCK_SIZE = 100;
AtomicInteger nextBlock = new AtomicInteger(0);
private class Processor implements Runnable {
public void run() {
int blockToProcess = nextBlock.getAndIncrement();
int end = (blockToProcess+1)*BLOCK_SIZE;
for (int i=BLOCK_SIZE*blockToProcess;i<end;i++) {
process(data[i]);
}
}
}
所有代码均来自内存,因此可能有一些需要更正的拼写错误/方法名称等。
重要的一点是 AtomicInteger 的使用,这确保了线程安全,每个线程都被分配了一个不同的 block 来处理。
我本想将其添加到您的问题中,但在这里就可以了。您尝试这样做,但这没有满足您的需要:
final int BLOCK_SIZE = 1;
AtomicInteger nextBlock = new AtomicInteger(0);
int blockToProcess = nextBlock.getAndIncrement();
int endBlocks = (blockToProcess+partitions.size())*BLOCK_SIZE;
for(int i=BLOCK_SIZE*blockToProcess;i<endBlocks;i++)
{
Myclass it=new Myclass(l);
todo.add( Executors.callable(it));
}
taskExecutor.invokeAll(todo);
这里实际上有两个选择 - 因为您是提前创建任务,所以您可以告诉每个任务此时要处理什么。我建议每个任务在执行时选择下一个 block 的方式。
要按照您在这里尝试的方式执行此操作,您可以忘记 AtomicInteger 并执行以下操作:
for(int i=0;i<partitions.size;i++)
{
Myclass it=new Myclass(partitions.get(i)); // or just MyClass(i) and then MyClass pulls out the list from partitions
taskExecutor.execute(it);
}
哪里
public class MyClass implements Runnable {
List<String> toProcess;
MyClass(List<String> toProcess) {
this.toProcess = toProcess;
}
@override
public void run() {
// Process the list
}
}
或
public class MyClass implements Runnable {
int toProcess;
MyClass(int toProcess) {
this.toProcess = toProcess;
}
@override
public void run() {
// Process the list
List<String> list = partitions.get(toProcess);
}
}
关于java 多线程ExecutorService,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20675536/