Java线程池

标签 java threadpoolexecutor

我正在尝试编写多线程java程序来并行获取mongo数据并存储它。下面是CallBack的代码,使用它创建的70个线程的线程池 worker 。我正在使用 Callable 来回调 CallBack。

问题是获取的项目多于返回到回调列表的项目。不知道出了什么问题。有人可以帮忙吗?即使“FETCHED ....”打印的数字也比“INDEXED ....”更大。线程是否相互跨过?

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

import javax.xml.parsers.ParserConfigurationException;

import org.apache.solr.client.solrj.SolrServerException;
import org.xml.sax.SAXException;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallBack {
    List<TbsProblem> problemsToBeIndex = new ArrayList<TbsProblem>();
    final int NO_OF_THREAD = 70;

    public void returnResult(List<TbsProblem> result) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        problemsToBeIndex.addAll(result);
        System.out.println(" Data Indexed "+problemsToBeIndex.size());
    }
    public  List<TbsProblem> andAction() throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(NO_OF_THREAD);
            int ctr=0;
            while(ctr <= 100000) {
                CallingBackWorker worker = new CallingBackWorker();
                worker.setCallBack(this);
                final Future future = es.submit( worker);
                ctr +=100;
            }

            while(!es.isTerminated()) {}
            es.shutdown();
            System.out.println(" finished the retrival ");
        System.out.println("try to do something while the work is being done....");
        System.out.println("&quot;End work&quot; "+ new java.util.Date());
        return problemsToBeIndex;
    }

    public static void main(String[] argv) throws IOException, SAXException, ParserConfigurationException, SolrServerException {
        new CallBack().andAction();
    }
}

package com.chegg.migrator.question.parallel.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import com.chegg.migrator.question.entity.TbsProblem;

public class CallingBackWorker implements Callable<Object>{
    CallBack callBack;

    static int calls = 0;
    static int fetched =0;
    static int indexed =0;
    List<TbsProblem> problems = new ArrayList<TbsProblem>();

    public CallingBackWorker() {
        super();
    }

    @Override
    public Object call() throws Exception {
        System.out.println("  fetching the data ....."+calls++);
        List<TbsProblem> problems = new ArrayList<TbsProblem>();
        for(int i=0;i<50;i++) {
            TbsProblem problem = new TbsProblem();
            problem.setId("fetched"+fetched);
            problems.add(problem);
        }
        Thread.sleep(500);
        fetched +=problems.size();
        System.out.println(" FETCHED ^^^^^^"+fetched);

        List<String> lists = new ArrayList<String>();
        for(TbsProblem tbs : problems) {
            lists.add(tbs.getId());
        }
        Thread.sleep(500);
        indexed += lists.size();
        System.out.println("   committed, exiting.");
        System.out.println(" INDEXED $$$$"+indexed);
        callBack.returnResult(problems);
        return null;
    }

      public CallBack getCallBack() {
        return callBack;
    }

    public void setCallBack(CallBack callBack) {
        this.callBack = callBack;
    }
}

最佳答案

fetched 是否在每个可调用对象之外声明?并且您在多个线程中增加它?如果是这样那就有问题了。递增整数不是线程安全的。如果是这种情况,请用 AtomicInteger 替换 fetched 或在同步块(synchronized block)内递增它。

为什么在多线程中递增整数会出现问题?每个线程都会执行以下操作:

STEP 1: read current value of fetched
STEP 2: calculate current value + problems.size()
STEP 3: assign new value to fetched

图像线程 (1) 完成步骤 1 和 2,将 fetched 的新值计算为 10。然后线程 (2) 到 (50) 完成步骤 1,2 和 3。fetched 现在的值为 1000。最后,线程 (1) 完成步骤 3,再次为 fetched 分配值 10。

关于Java线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17476171/

相关文章:

java - AspectJ - 实现给定接口(interface)的类的静态类型间声明

java - ThreadPoolExecutor 的排队行为可自定义以更喜欢创建新线程而不是排队?

executorservice - 一段时间后调用 ExecutorService

Java NIO Http 客户端请求与线程池

android - 在自定义 Runnable 中使用 Retrofit 执行请求

android - 无法访问主线程上的数据库 - Android Room - 使用 ThreadPoolExecutor

java - 我的小程序需要客户端访问资源的权限的策略文件位置在哪里?

java - Spring 的 ThreadPoolTask​​Executor 的工作

java - 如何正确子类泛型?

java - jaxb如何按属性对元素进行排序?