java - 同时执行两个方法

标签 java multithreading

我正在开发多年来的第一个多线程应用程序。我遇到的问题是我需要同时执行两个方法。这是我的引擎类:

public class ThreadPoolEngine {

    // create BlockingQueue to put fund transfer objects
    private BlockingQueue<GlobalSearchRequest> searchQueue;

    public ThreadPoolExecutor executor;

    private HashMap<String, GlobalSearchProcessorCallable> callableMap;

    private ArrayList<Future<Integer>> futurList;

    Logger logger = Logger.getLogger(ThreadPoolEngine.class);

    private Integer gthreadCount;
    private Integer gjobPerThread;

    public ThreadPoolEngine(Integer threadCount, Integer jobPerThread) {
        gthreadCount = threadCount;
        gjobPerThread = jobPerThread;
        // create a thread pool with the entered no of threads
        executor = new HammerThreadPoolExecutor(threadCount, threadCount, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

        searchQueue = new ArrayBlockingQueue<GlobalSearchRequest>(jobPerThread);

        callableMap = new HashMap<String, GlobalSearchProcessorCallable>();

        // create list to store reference to Future objects
        futurList = new ArrayList<Future<Integer>>();
    }

    public void createAndSubmitTasks() {
        // create Callables
        for (int i = 0; i < gthreadCount; i++) {

            GlobalSearchProcessorCallable callable1 = new GlobalSearchProcessorCallable(
                    "SearchProcessor_" + i, searchQueue);
            callableMap.put(callable1.getThreadName(), callable1);

            // submit callable tasks
            Future<Integer> future;
            future = executor.submit(callable1);
            futurList.add(future);
        }
    }

    public void populateSearchQueue() throws InterruptedException {
        // put orderVO objects in BlockingQueue
        KeywordFactory key = KeywordFactory.getInstance();

        for (int i = 0; i < gjobPerThread*gthreadCount; i++) {
            // this method will put SearchRequest object in the order queue
            try {
                searchQueue.put(new GlobalSearchRequest(key.getRandomPhrase(3)));
            } catch (KeywordNoDataFileException e) {
                e.printStackTrace();
            }
        }
    }

    public void printProcessorStatus() throws InterruptedException {
        // print processor status until all orders are processed
        while (!searchQueue.isEmpty()) {
            for (Map.Entry<String, GlobalSearchProcessorCallable> e : callableMap
                    .entrySet()) {
                logger.debug(e.getKey() + " processed order count: "
                        + e.getValue().getProcessedCount());
            }
            Thread.sleep(1000);
        }
    }

    public void shutDown(boolean forceShutdown) {
        if (!forceShutdown) {
            // shutdown() method will mark the thread pool shutdown to true
            executor.shutdown();
            logger.debug("Executor shutdown status " + executor.isShutdown());
            logger.debug("Executor terninated status "
                    + executor.isTerminated());

            // Mark threads to return threads gracefully.
            for (Map.Entry<String, GlobalSearchProcessorCallable> orderProcessor : callableMap
                    .entrySet()) {
                orderProcessor.getValue().setRunning(false);
            }
        } else {

            for (Future<Integer> f : futurList) {
                f.cancel(true);
            }

            // shutdown() method will mark the thread pool shutdown to true
            executor.shutdownNow();
        }
    }

    public void printWorkersResult() {
        for (Future<Integer> f : futurList) {
            try {
                Integer result = f.get(1000, TimeUnit.MILLISECONDS);
                logger.debug(f + " result. Processed orders " + result);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            } catch (ExecutionException e) {
                logger.error(e.getCause().getMessage(), e);
            } catch (TimeoutException e) {
                logger.error(e.getMessage(), e);
            } catch (CancellationException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
}

好的,我有一个主类来实例化这个类,并调用该类的两个方法,即 populateSearchQueue 和 createAndSubmitTasks 来运行我的工作类并处理搜索队列中的项目。

问题 populateSearchQueue 方法可能需要很长时间才能构建(我将一次用十亿个查询来锤击系统),并且可能会占用大量内存。 java中有没有一种方法可以让我的主类同时调用populateSearchQueue和createAndSubmitTasks,以便工作线程可以开始在队列上工作,同时仍然由populateSearchQueue方法构建?

最佳答案

我确实解决了。我再次阅读我的代码,意识到创建线程池需要很少的时间。因此,调用 createAndSubmitTasks 创建线程池,并为每个线程池分配一个等待执行某些操作的工作线程类。当该方法完成后,我现在有 1000 个线程池坐在那里什么都不做。然后,当我调用 populateSearchQueue 时,那些空闲了几毫秒才转到下一个方法的工作线程现在开始从队列中抓取作业,我得到了我想要的结果。将内容放入队列的方法正在处理,同时工作线程从该队列中获取作业并运行它们。

所以我颠倒了调用方法的顺序。这是一件美丽的事情。

关于java - 同时执行两个方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13885994/

相关文章:

multithreading - 我如何总结使用 Rust 从 1 到 1000000 的并发性?

multithreading - Web-Wokers,并发还是并行?

python - 使用 python 从终端读取字符串

java - 获取 JPAQuery 结果作为页面

java - Mysql 复杂对象 Hibernate 批量插入

java - 我需要知道控制流是什么以及这段代码到底发生了什么?

java - 为什么我的Java程序中突然出现3个线程?

c++ - 从多个线程使用 stdlib 的 rand()

java - 为什么 Java 产生的时间比卡萨布兰卡实时时间晚一小时?

java - 将条件列表传递给 JOOQ 中的 WHERE 子句