java - Java 中 ConcurrentHashMap 的多线程

标签 java multithreading

我做了两个线程,一个是获取数据,另一个是保存数据。 我的问题是在存储从Thread1读取的数据的过程中没有处理它。

我想提取 1,000,000 个元素并将它们创建为文件。元素大小太大,所以我将元素大小除以 100,000。然后,循环将运行 10 次。一个线程从其他服务器读取100,000个数据。另一个线程从第一个线程获取数据并将其写入文件。

我的原始场景如下:

第一个线程读取总的键、值大小。 它将是100,000 ~ 1,000,000。我假设我会处理 1,000,000 个数据。然后 Count 设置为 1,000,000。第一个线程除以 100,000 并从服务器读取数据除以 100,000。然后,第一个线程调用 setData(Key,Value map)。 它将循环 10 次。

第二个线程将循环 10 次。首先,通过调用 getMap() 方法获取数据。它调用 writeSeq(hashmap) 方法。它将数据写入写入器流。它还没有冲洗。这里有一个问题。通过调用getMap()成功获取数据大小。但是,writeSeq 方法无法处理所有大小的值。当我得到 100,000 的大小时,它会作为随机处理。它将是 100, 1500, 0, 8203 ...

第一个主题如下:

public void run() {
        getValueCount(); //initialize value.

        while (this.jobFlag) {
            getSortedMap(this.count); //count starts the number of all elements size.
//For example, Total size is 1,000,000. Then count will sets a 1,000,000 and it is decreased as 100,000.
// Also setMap() is called in this method.
            if (!jobFlag) //If all processing is done, jobFlag is set as false.
                break;
        }

        resetValue();
    }

第二个线程如下:

public void run() {
        setWriter(); //Writer Stream creates;

        double count  = 10; //the number of loop. 

        ConcurrentHashMap<String, String> hash = new ConcurrentHashMap<String,String>();

        for (int i = 0; i <= count - 1; i++) {
            hash = share.getMap();
            writeSeq(hash);
        }

        closeWriter(); //close Writer stream
    }

这是共享源:

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

public class ShareData {

    ConcurrentHashMap<String, String> map;

    public synchronized ConcurrentHashMap<String, String> getMap(){
        if (this.map == null) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        ConcurrentHashMap<String, String> hashmap = map;

        this.map = null;

        return hashmap;
    }

    public synchronized void setMap(ConcurrentHashMap<String, String> KV) {
        if (this.map != null) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        this.map = KV;
        notify();
    }

}

之后,保存数据的第二个线程启动。 KV 的大小很好,但处理 foreach 时不会处理所有值。此外,每次创建文件时,大小都不同。是同步的问题吗?

public synchronized void writeSeq(ConcurrentHashMap<String, String> KV) {

        AtomicInteger a = new AtomicInteger(0);
        System.out.println(KV.size()); //ex) 65300
        redisKV.entrySet().parallelStream().forEach(
                entry -> { 
                    try {
                        a.incrementAndGet();
                        writer.append(new Text(entry.getKey()), new Text(entry.getValue()));
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                });
        System.out.println(a.get()); //ex) 1300
        i = 0;
        notify();

    }

最佳答案

The size of KV is fine, but all values are not processed when foreach is processed. Also, each time i create a file, the size is different. Is it problem of synchronized?

不清楚。我可以看到一个小问题,但它不太可能导致您描述的问题。

  • if (map == null) wait(); 代码应该是 while 循环。

  • if (map != null) wait(); 代码应该是 while 循环。

问题是,如果一个线程收到虚假通知,它可能会以错误的状态继续执行map。您需要重试测试。 (如果您阅读 Object 的 javadoc,您将看到一个正确实现条件变量的示例。)

除此之外,您的问题的根本原因似乎并不在您向我们展示的代码中。

<小时/>

但是,如果我猜测的话,我的猜测是一个线程正在添加或删除 ConcurrentHashMap 中的条目,而第二个线程正在处理它1 。您向我们展示的 getMap/setMap 方法必须正确使用(即在适当的点使用适当的参数调用)以避免两个线程互相干扰。您还没有向我们展示该代码。

所以,如果我的猜测是正确的,那么你的问题是逻辑错误而不是低级同步问题。但如果您需要更好的答案,您将需要编写并发布正确的 MCVE。

<小时/>

1 - ConcurrentHashMap 的迭代器是弱一致。这意味着,如果您在迭代时更新 map ,您可能会错过迭代中的条目,或者可能多次看到它们。

关于java - Java 中 ConcurrentHashMap 的多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52599478/

相关文章:

Java 服务器 - 如何从 InputStreamReader 获取输入并将其转换为字符串?

java - java中通过传递参数实现多线程

java - 第二个 AsyncTask 没有执行

java - 方法不存在 (MainActivity.java)

c# - 异步请求上的共享 HttpClient block

Java 线程和 POSIX 线程,用户级还是内核级?

java - 线程 "Thread 2"在 JavaFX 应用程序中做什么?

java - 如何在 netbeans 项目的文件夹中添加库

java - 如何通过 RestEASY 将带有元音符号 (ü, ö, ä) 的 JSON 发送到我的服务器?

java - 使用 SOAP 使用 Sharepoint Web 服务