Java ExecutorService - 扩展

标签 java multithreading executorservice threadpoolexecutor forkjoinpool

我正在尝试使用ExecutorService及其函数invokeAll在Java中编写程序。我的问题是:invokeAll 函数是否同时解决这些任务?我的意思是,如果我有两个处理器,那么同时会有两个 worker ?因为我无法使其正确缩放。如果我给出 newFixedThreadPool(2) 或 1,则需要相同的时间来完成该问题。

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
    tasks.add(new Map(ps, keyWords));
}
list = executor.invokeAll(tasks);

Map是一个实现了Callable的类,wp是一个Partial Solutions的 vector ,这个类保存了不同时间的一些信息。

为什么它不能扩展?可能是什么问题?

这是 PartialSolution 的代码:

import java.util.HashMap;
import java.util.Vector;

public class PartialSolution 
{
    public String fileName;//the name of a file
    public int b, e;//the index of begin and end of the fragment from the file
    public String info;//the fragment
    public HashMap<String, Word> hm;//here i retain the informations
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce

    public PartialSolution(String name, int b, int e, String i, boolean ok)
    {
        this.fileName = name;
        this.b = b;
        this.e = e;
        this.info = i;
        hm = new HashMap<String, Word>();
        if(ok == true)
        {
            hmt = new HashMap<String, Vector<Word>>();
        }
        else
        {
             hmt = null;
        }    
    }
}

这是 map 的代码:

public class Map implements Callable<PartialSolution>
{
    private PartialSolution ps;
    private Vector<String> keyWords;

    public Map(PartialSolution p, Vector<String> kw)
    {
        this.ps = p;
        this.keyWords = kw;
    }

    @Override
    public PartialSolution call() throws Exception 
    {
        String[] st = this.ps.info.split("\\n");
        for(int j = 0 ; j < st.length ; j++)
        {
            for(int i = 0 ; i < keyWords.size() ; i++)
            {
                if(keyWords.elementAt(i).charAt(0) != '\'')
                {
                    int k = 0;
                    int index = 0;
                    int count = 0;

                    while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1)
                    {
                        k = index + keyWords.elementAt(i).length();
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                } 
                else
                {
                    String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1);
                    StringBuffer sb = new StringBuffer(regex);
                    regex = sb.toString();
                    Pattern pt = Pattern.compile(regex);
                    Matcher m = pt.matcher(st[j]);
                    int count = 0;
                    while(m.find())
                    {
                        count++;
                    }
                    if(count != 0)
                    {
                        Word wr = this.ps.hm.get(keyWords.elementAt(i));
                        if(wr != null)
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = wr.nrap + count;
                            nw.lines = wr.lines;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                        else
                        {
                            Word nw = new Word(this.ps.fileName);
                            nw.nrap = count;
                            int grep = count;
                            while(grep > 0)
                            {
                                nw.lines.addElement(ps.b + j);
                                grep--;
                            }
                            this.ps.hm.put(keyWords.elementAt(i), nw);
                        }
                    }
                }
            }
        }
        this.ps.info = null;
        return this.ps;
    }
}

因此,在 Map 中,我从片段中取出每一行,并搜索每个表达式的出现次数,并保存行数。处理完所有片段后,在同一个 PartialSolution 中,我将信息保存在 HashMap 中并返回新的 PartialSolution。下一步我将具有相同文件名的 PartialSolution 组合起来,并在 Callable 类 Reduce 中引入它们,它与 Map 相同,不同之处在于它进行其他操作,但也返回一个 PartialSolution。

这是运行 map task 的代码:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>();
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>();
for(PartialSolution ps : wp)
{
   tasks.add(new Map(ps, keyWords));
}    
list = executor.invokeAll(tasks);

在任务中,我创建 Map 类型的任务,并在列表中获取它们。我不知道如何读取 JVM 线程转储。我希望我给你的信息足够好。如果有帮助的话,我在 NetBeans 7.0.1 中工作。

谢谢你, 亚历克斯

最佳答案

What i want to know is if the method invokeAll, if i created the ExcutorService with 10 threads, will solve 10 tasks at the same time or will solve one at a time?

如果您向具有 10 个线程的 ExecutorService 提交 10 个任务,它将同时运行它们。它们是否能够完全并行且彼此独立地进行取决于它们在做什么。但他们每个人都有自己的线程。

And another question, if i say list.get(i).get() this will return the PartialSolution after it was solved?

是的,它将阻塞直到计算完成(如果尚未完成)并返回结果。

I really don't understand why doesn't the time improves if i use 2 threads instead of 1.

我们需要查看更多代码。他们是否同步某些共享数据?这些任务需要多长时间?如果它们很短,您可能不会注意到任何差异。如果它们花费更长的时间,请查看 JVM 线程转储以验证它们是否全部都在运行。

关于Java ExecutorService - 扩展,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8307423/

相关文章:

java - 如何修改数组中的后续点

java - 定义 ComboBoxTableCell 和 ComboBox 节点 FXML

java - Java Future 多线程如何返回结果?

java - 扩展线程然后对任务进行排队的 ExecutorService

java - 为什么我的实例被垃圾收集

java - 如何从其他应用程序获取editText中的数据?

java - 从多个文件读取数据并应用业务逻辑

java - 如何以线程安全的方式在 JAX-RS 资源中使用 JerseyClient

java - future 超时是否会终止线程执行

java - 依次执行一个线程