java并发写入集合,然后读取 - 结果不一致

标签 java concurrency

我读自 here Set 有几种不同的线程安全选项。在我的应用程序中,我有 10 个线程同时将内容添加到一个集合中(不必设置,但更好)。所有线程完成后,我需要遍历集合。

我读到 ConcurrentSkipListSet 和 Collections.newSetFromMap(new ConcurrentHashMap()) 都具有不一致的批处理操作(addAll、removeAll 等)和迭代器。我的实验也证实了这一点。当我使用ConcurrentSkipListSet时,在所有线程添加后,读取有点随机。我随机得到不同大小的集合。

然后我尝试了 Collections.synchronizedSet(new HashSet<>()),我认为它应该是线程安全的,因为它同时阻止多个写入访问。 然而,它似乎也存在阅读不一致的问题。我仍然在结果集中随机得到不同的大小。

我应该怎样做才能确保读数一致?正如我所说,我不必使用 Set。我可以使用List,或者其他,只要有办法避免重复添加

显示代码有点困难,因为它是一个非常大的包的一部分。但总的来说,它看起来像这样

public class MyRecursiveTask extends RecursiveTask<Integer> {
    private List<String> tasks; 
    protected ConcurrentSkipListSet<String> dictionary;
    public MyRecursiveTask(ConcurrentSkipListSet<String> dictionary,
                           List<String> tasks){
        this.dictionary=dictionary;
        this.tasks=tasks;
    }

    protected Integer compute() {
        if (this.tasks.size() > 100) {
            List<RecursiveFeatureExtractor> subtasks =
                new ArrayList<>();
            subtasks.addAll(createSubtasks());
            int count=0;
            for (MyRecursiveTask subtask : subtasks)
                subtask.fork();
            for (MyRecursiveTask subtask : subtasks)
                count+=subtask.join();
            return count;
        } else {
            int count=0;
            for (File task: tasks) {
                    // code to process task
                 String outcome = [method to do some task]
                 dictionary.add(outcome);
                 count++;
            }
            return count;
        }
    }

    private List<MyRecursiveTask> createSubtasks() {
        List<MyRecursiveTask> subtasks =
            new ArrayList<>();

        int total = tasks.size() / 2;
        List<File> tasks1= new ArrayList<>();
        for (int i = 0; i < total; i++)
            tasks1.add(tasks.get(i));
        MyRecursiveTask subtask1 = new MyRecursiveTask(
            dictionary, tasks1);

        List<File> tasks2= new ArrayList<>();
        for (int i = total; i < tasks.size(); i++)
            tasks2.add(tasks.get(i));
        MyRecursiveTask subtask2 = new MyRecursiveTask(
            dictionary, tasks2);

        subtasks.add(subtask1);
        subtasks.add(subtask2);

        return subtasks;
    }
}

然后是创建此类线程工作人员列表的代码:

....
List<String> allTasks = new ArrayList<String>(100000);
....
//code to fill in "allTasks"
....

ConcurrentSkipListSet<String> dictionary = new ConcurrentSkipListSet<>();
//I also tried "dictionary = Collections.Collections.synchronizedSet(new 
//HashSet<>())" and changed other bits of code accordingly. 
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
MyRecursiveTask mrt = new MyRecursiveTask (dictionary,
            );
int total= forkJoinPool.invoke(mrt);
System.out.println(dictionary.size()); //this value is a bit random. If real     
//size should be 999, when I run the code once i may get 989; second i may 
//get 999; third I may get 990 etc....

谢谢

最佳答案

不看代码,很难判断哪里出了问题。我猜想读取结果的线程运行得太早,而某些线程仍在写入。使用 Thread.join 等待写入者。 Collections.synchronizedSet 当然是线程安全的。

Javadoc 考虑这一点:

It is imperative that the user manually synchronize on the returned set when iterating over it:

   Set s = Collections.synchronizedSet(new HashSet());
       ...   synchronized (s) {
       Iterator i = s.iterator(); // Must be in the synchronized block
       while (i.hasNext())
           foo(i.next());   }   

Failure to follow this advice may result in non-deterministic behavior. The returned set will be serializable if the specified set is serializable.

关于java并发写入集合,然后读取 - 结果不一致,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31928149/

相关文章:

Haskell 作为高并发服务器

c# - 在多个任务中(一次?)编辑哈希集是线程安全的吗?

java - gradle 在集成测试中找不到 lombok 生成的构造函数

java - 测量长时间间隔

java - 如何让 JFrame 关闭外部窗口?

java - 如何从没有 api 的网站提取数据到 Android 应用程序中?

java - 二维数组,如何以编程方式添加新的数组元素? java

c++ - Google 的 WorkStealingDequeue 使用 memory_order_seq_cst 作为完整的内存屏障。有效吗?

java - ehcache diskstore可以安全地并发访问吗?

Java循环任务,日期问题