c++ - 将数据用于互斥锁和等待时出现死锁

标签 c++ concurrency mutex condition-variable

我需要一些有关并发 C++ 编程的帮助。 我有一个名称文件,名为 "names.txt",格式如下:

0 James
1 Sara
2 Isaac

我还有另一个名为 "op.txt" 的文件,其中包含对名称文件的一些操作,格式如下:

0 1 + // this means add Sara to James and store it in 0 position
1 2 $ // this means swap values in position 1 and position 2

和一个包含操作输出的文件 "output.txt",格式如下:

0 JamesSara
1 Isaac
2 Sara

问题是创建一个线程来读取 names.txtop.txt 并存储它们。接下来创建一些可变线程来并发操作,最后在一个线程中执行output.txt

这是我针对这个问题的代码,当并发线程数为 大于 2。但是 1 和 2 线程的输出不正确。 我在这段代码中遗漏了什么?

#include <fstream>
#include <iostream>
#include <vector>
#include <sstream>
#include <cstdlib>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <deque>

using namespace std;

std::mutex _opMutex;
std::condition_variable _initCondition;
std::condition_variable _operationCondition;

int _counter = 0;
int _initCounter = 0;
int _doOperationCounter = 0;

struct OperationStruct
{
    int firstOperand;
    int secondOperand;
    char cOperator;
};

const int THREADS = 5;

std::deque<std::pair<int, string> > _nameVector;
std::deque<OperationStruct> _opStructVec;

void initNamesAndOperations()
{
    ifstream infile;

    std::pair<int, string> namePair;

    infile.open("names.txt");
    if (!infile)
    {
        cout << "Unable to open file";
        exit(-1);
    }

    int id;
    string value;

    while (infile >> id >> value)
    {
        namePair.first = id;
        namePair.second = value;
        _nameVector.push_back(namePair);
    }
    infile.close();

    infile.open("op.txt");

    if (!infile)
    {
        cout << "Unable to open file";
        exit(-1);
    }

    int firstOperand;
    int secondOperand;
    char cOperator;

    while (infile >> firstOperand >> secondOperand >> cOperator)
    {
        OperationStruct opSt;
        opSt.firstOperand = firstOperand;
        opSt.secondOperand = secondOperand;
        opSt.cOperator = cOperator;
        _opStructVec.push_back(opSt);
        ++_initCounter;
    }
    infile.close();

    return;
}

void doOperationMath(int firstIndex, string firstValue, string secondValue, char cOp)
{
    //basic mathematics
    switch (cOp)
    {
    case '+':
    {
                for (int i = 0; i < _nameVector.size(); ++i)
                {
                    std::pair<int, string> acc = _nameVector[i];
                    if (acc.first == firstIndex)
                    {
                        acc.second = firstValue + secondValue;
                        _nameVector[i].second = acc.second;
                    }
                }
    }
    break;

    default:
        break;
    }

    ++_doOperationCounter;
}

void doOperationSwap(int firstIndex, int secondIndex, string firstValue, string secondValue)
{
    //swap
    for (int i = 0; i < _nameVector.size(); ++i)
    {
        if (_nameVector[i].first == firstIndex)
            _nameVector[i].second = secondValue;

        if (_nameVector[i].first == secondIndex)
            _nameVector[i].second = firstValue;
    }
    ++_doOperationCounter;
}

void doOperations()
{
    while (_doOperationCounter < _initCounter)
    {
        std::unique_lock<mutex> locker(_opMutex);
        _initCondition.wait(locker, [](){return !_opStructVec.empty(); });
        OperationStruct opSt = _opStructVec.front();
        _opStructVec.pop_front();
        locker.unlock();
        _operationCondition.notify_one();
        int firstId = opSt.firstOperand;
        int secondId = opSt.secondOperand;
        char cOp = opSt.cOperator;

        string firstValue = "";
        string secondValue = "";

        for (int j = 0; j < _nameVector.size(); ++j)
        {
            std::pair<int, string> acc = _nameVector[j];
            if (firstId == acc.first)
                firstValue = acc.second;

            if (secondId == acc.first)
                secondValue = acc.second;
        }

        if (cOp == '$')
        {
            doOperationSwap(firstId, secondId, firstValue, secondValue);
        }
        else
        {
            doOperationMath(firstId, firstValue, secondValue, cOp);
        }

    }

    return;
}

void doOutputFile()
{
    ofstream outfile;

    outfile.open("sampleOutput.txt", std::ios::out | std::ios::app);
    if (!outfile)
    {
        cout << "Unable to open the file";
        exit(-1);
    }

    while (_counter < _initCounter)
    {
        std::unique_lock<mutex> locker(_opMutex);
        _operationCondition.wait(locker, [](){return !_nameVector.empty(); });
        auto accPair = _nameVector.front();
        _nameVector.pop_front();
        locker.unlock();

        outfile << accPair.first << " " << accPair.second << endl;
        ++_counter;
    }

    return;
}

int main()
{
    thread th1(initNamesAndOperations);

    std::vector<thread> operationalThreads;
    for (int i = 0; i < THREADS; ++i)
    {
        operationalThreads.push_back(thread(doOperations));
    }

    thread th3(doOutputFile);

    th1.join();

    for (auto& opthread : operationalThreads)
        opthread.join();

    th3.join();

    return 0;
}

最佳答案

如果从多个线程修改变量,您可能必须使用一些同步来确保读取正确的值。最简单的方法可能是对变量使用 std::atomic 以确保操作顺序正确。

此外,您的代码中没有任何内容可以确保您的 doOperations 线程在您读取整个文件之前不会完成。

显然,您需要先读取整个数据,或者有办法等待某些数据可用(或到达数据末尾)。如果读取初始数据很快但处理速度很慢,那么更简单的解决方案是在开始处理线程之前读取数据。

可能发生的情况是,如果您创建了很多线程,那么在您创建最后一个线程时,initNamesAndOperations 将读取整个文件。

我强烈建议您购买并阅读 Anthony WilliamsC++ Concurrency in Action。通过阅读这本书,您将对现代 C++ 多线程有一个很好的理解,它将对您编写正确的代码有很大帮助。

关于c++ - 将数据用于互斥锁和等待时出现死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47237431/

相关文章:

multithreading - 线程和进程同步的信号量和互斥量

c++ - 可能的死锁情况 : interrupt and mutex

c++ - 设置 3d 空间中对象的方向

c++ - 删除从析构函数调用的抛出析构函数

c++ - 类型未推断为右值引用 : why not?

c# - 具有基于 XML 的存储的 WCF 服务。并发问题?

c++ - 网络打印机的 DeviceCapabilities

java - 同时运行两个线程

java - 并发还是顺序?

rest - 有没有什么方法可以在 Chef 节点中获得互斥?