我正在尝试编写多线程java程序来并行获取mongo数据并存储它。下面是CallBack的代码,使用它创建的70个线程的线程池 worker 。我正在使用 Callable 来回调 CallBack。
问题是获取的项目多于返回到回调列表的项目。不知道出了什么问题。有人可以帮忙吗?即使“FETCHED ....”打印的数字也比“INDEXED ....”更大。线程是否相互跨过?
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.solr.client.solrj.SolrServerException;
import org.xml.sax.SAXException;
import com.chegg.migrator.question.entity.TbsProblem;
public class CallBack {
List<TbsProblem> problemsToBeIndex = new ArrayList<TbsProblem>();
final int NO_OF_THREAD = 70;
public void returnResult(List<TbsProblem> result) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
problemsToBeIndex.addAll(result);
System.out.println(" Data Indexed "+problemsToBeIndex.size());
}
public List<TbsProblem> andAction() throws IOException, SAXException, ParserConfigurationException, SolrServerException {
ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(NO_OF_THREAD);
int ctr=0;
while(ctr <= 100000) {
CallingBackWorker worker = new CallingBackWorker();
worker.setCallBack(this);
final Future future = es.submit( worker);
ctr +=100;
}
while(!es.isTerminated()) {}
es.shutdown();
System.out.println(" finished the retrival ");
System.out.println("try to do something while the work is being done....");
System.out.println(""End work" "+ new java.util.Date());
return problemsToBeIndex;
}
public static void main(String[] argv) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
new CallBack().andAction();
}
}
package com.chegg.migrator.question.parallel.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import com.chegg.migrator.question.entity.TbsProblem;
public class CallingBackWorker implements Callable<Object>{
CallBack callBack;
static int calls = 0;
static int fetched =0;
static int indexed =0;
List<TbsProblem> problems = new ArrayList<TbsProblem>();
public CallingBackWorker() {
super();
}
@Override
public Object call() throws Exception {
System.out.println(" fetching the data ....."+calls++);
List<TbsProblem> problems = new ArrayList<TbsProblem>();
for(int i=0;i<50;i++) {
TbsProblem problem = new TbsProblem();
problem.setId("fetched"+fetched);
problems.add(problem);
}
Thread.sleep(500);
fetched +=problems.size();
System.out.println(" FETCHED ^^^^^^"+fetched);
List<String> lists = new ArrayList<String>();
for(TbsProblem tbs : problems) {
lists.add(tbs.getId());
}
Thread.sleep(500);
indexed += lists.size();
System.out.println(" committed, exiting.");
System.out.println(" INDEXED $$$$"+indexed);
callBack.returnResult(problems);
return null;
}
public CallBack getCallBack() {
return callBack;
}
public void setCallBack(CallBack callBack) {
this.callBack = callBack;
}
}
最佳答案
fetched 是否在每个可调用对象之外声明?并且您在多个线程中增加它?如果是这样那就有问题了。递增整数不是线程安全的。如果是这种情况,请用 AtomicInteger 替换 fetched 或在同步块(synchronized block)内递增它。
为什么在多线程中递增整数会出现问题?每个线程都会执行以下操作:
STEP 1: read current value of fetched
STEP 2: calculate current value + problems.size()
STEP 3: assign new value to fetched
图像线程 (1) 完成步骤 1 和 2,将 fetched 的新值计算为 10。然后线程 (2) 到 (50) 完成步骤 1,2 和 3。fetched 现在的值为 1000。最后,线程 (1) 完成步骤 3,再次为 fetched 分配值 10。
关于Java线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17476171/