c++ - 问题生产者消费者中的竞争条件

标签 c++ multithreading

我正在研究消费者生产者问题,这个例子有四个函数:生产者调用 produce 和消费者调用 consume。这里的问题是当我像 (valgrind --tool=helgrind ./a.out) 那样调用 helgrind 时给我一个竞争条件。我认为错误出在 for 循环中的生产者和消费者函数中。

#include <iostream>
#include <sstream>
#include <vector>
#include <stack>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <chrono>
using namespace std;

// print function for "thread safe" printing using a stringstream
void print(ostream& s) { cout << s.rdbuf(); cout.flush(); s.clear(); }

const int num_producers = 5;
const int num_consumers = 10;
const int producer_delay_to_produce = 10;   
const int consumer_delay_to_consume = 30;  
const int consumer_max_wait_time = 200;     
const int max_production = 10;              
const int max_products = 10;                

atomic<int> num_producers_working(0);       
stack<int> products;                        
mutex xmutex;
mutex nuevo_mutx;                         

condition_variable is_not_full;             
condition_variable is_not_empty;            

void produce(int producer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;

        is_not_full.wait(lock, [] { return products.size() != max_products; });
        product = products.size();
        products.push(product);

        print(stringstream() << "Producer " << producer_id << " produced " << product << "\n");
        is_not_empty.notify_all();
}

void consume(int consumer_id)
{
        unique_lock<mutex> lock(xmutex);
        int product;

        if(is_not_empty.wait_for(lock, chrono::milliseconds(consumer_max_wait_time),
                [] { return products.size() > 0; }))
        {
                product = products.top();
                products.pop();

                print(stringstream() << "Consumer " << consumer_id << " consumed " << product << "\n");
                is_not_full.notify_all();
        }
}
void producer(int id)
{
        ++num_producers_working;
        for(int i = 0; i < max_production; ++i)
        {

                produce(id);
                this_thread::sleep_for(chrono::milliseconds(producer_delay_to_produce));
        }

        print(stringstream() << "Producer " << id << " has exited\n");
        --num_producers_working;
}

void consumer(int id)
{
        // Wait until there is any producer working

        while(num_producers_working == 0) this_thread::yield();

        while(num_producers_working != 0 || products.size() > 0)
        {
                consume(id);
                this_thread::sleep_for(chrono::milliseconds(consumer_delay_to_consume));
        }

        print(stringstream() << "Consumer " << id << " has exited\n");
}
int main()
{
        vector<thread> producers_and_consumers;

        // Create producers
        for(int i = 0; i < num_producers; ++i)
                producers_and_consumers.push_back(thread(producer, i));

        // Create consumers
        for(int i = 0; i < num_consumers; ++i)
                producers_and_consumers.push_back(thread(consumer, i));

        // Wait for consumers and producers to finish
        for(auto& t : producers_and_consumers)
                t.join();
}

helgrind 日志:

==7004== Helgrind, a thread error detector
==7004== Copyright (C) 2007-2017, and GNU GPL'd, by OpenWorks LLP et al.
==7004== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==7004== Command: ./a.out
==7004== Producer 0 produced 0 Producer 1 produced 1 Producer 2 produced 2 Producer 3 produced 3 Producer 4 produced 4 Consumer 0 consumed 4 Producer 2 produced 4 Producer 1 produced 5 Producer 0 produced 6 Producer 3 produced 7 Producer 4 produced 8 Consumer 1 consumed 8 Consumer 2 consumed 7 Consumer 3 consumed 6 Producer 1 produced 6 Producer 2 produced 7 Producer 4 produced 8 Producer 3 produced 9 Consumer 4 consumed 9 Producer 0 produced 9 Consumer 5 consumed 9 Producer 1 produced 9 Consumer 6 consumed 9 Producer 2 produced 9 Consumer 7 consumed 9 Producer 4 produced 9 Consumer 8 consumed 9 Consumer 0 consumed 8 Producer 1 produced 8 Producer 3 produced 9 Consumer 1 consumed 9 Producer 0 produced 9 Consumer 2 consumed 9 Producer 2 produced 9 Consumer 9 consumed 9 Consumer 3 consumed 8 Producer 4 produced 8 Producer 1 produced 9 Consumer 4 consumed 9 Producer 3 produced 9 Consumer 5 consumed 9 Producer 4 produced 9 Consumer 6 consumed 9 Producer 2 produced 9 Consumer 7 consumed 9 Producer 0 produced 9 Consumer 0 consumed 9 Producer 4 produced 9 Consumer 8 consumed 9 Producer 3 produced 9 Consumer 1 consumed 9 Producer 1 produced 9 Consumer 2 consumed 9 Producer 2 produced 9 Consumer 3 consumed 9 Producer 0 produced 9 Consumer 9 consumed 9 Producer 4 produced 9 Consumer 4 consumed 9 Producer 3 produced 9 Consumer 5 consumed 9 Producer 0 produced 9 Consumer 6 consumed 9 Producer 1 produced 9 Consumer 7 consumed 9 Producer 3 produced 9 Consumer 0 consumed 9 Consumer 8 consumed 8 Producer 2 produced 8 Producer 4 produced 9 Consumer 1 consumed 9 Consumer 2 consumed 8 Producer 0 produced 8 Producer 1 produced 9 Consumer 3 consumed 9 Producer 3 produced 9 Consumer 9 consumed 9 Producer 2 produced 9 Consumer 4 consumed 9 Producer 1 produced 9 Consumer 5 consumed 9 Producer 0 produced 9 Consumer 6 consumed 9 Producer 4 produced 9 Consumer 7 consumed 9 Producer 2 produced 9
==7004== ---Thread-Announcement------------------------------------------
==7004==
==7004== Thread #3 was created
==7004==    at 0x572387E: clone (clone.S:71)
==7004==    by 0x53EAEC4: create_thread (createthread.c:100)
==7004==    by 0x53EAEC4: pthread_create@@GLIBC_2.2.5 (pthread_create.c:797)
==7004==    by 0x4C36A27: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x4EFF834: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x10AEB0: std::thread::thread<void (&)(int), int&>(void (&)(int), int&) (thread:126)
==7004==    by 0x109EF1: main (nuevo.cpp:93)
==7004==
==7004== ---Thread-Announcement------------------------------------------
==7004==
==7004== Thread #4 was created
==7004==    at 0x572387E: clone (clone.S:71)
==7004==    by 0x53EAEC4: create_thread (createthread.c:100)
==7004==    by 0x53EAEC4: pthread_create@@GLIBC_2.2.5 (pthread_create.c:797)
==7004==    by 0x4C36A27: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x4EFF834: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x10AEB0: std::thread::thread<void (&)(int), int&>(void (&)(int), int&) (thread:126)
==7004==    by 0x109EF1: main (nuevo.cpp:93)
==7004==
==7004== ---Thread-Announcement------------------------------------------
==7004==
==7004== Thread #2 was created
==7004==    at 0x572387E: clone (clone.S:71)
==7004==    by 0x53EAEC4: create_thread (createthread.c:100)
==7004==    by 0x53EAEC4: pthread_create@@GLIBC_2.2.5 (pthread_create.c:797)
==7004==    by 0x4C36A27: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x4EFF834: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x10AEB0: std::thread::thread<void (&)(int), int&>(void (&)(int), int&) (thread:126)
==7004==    by 0x109EF1: main (nuevo.cpp:93)
==7004==
==7004== ----------------------------------------------------------------
==7004==
==7004==  Lock at 0x3121C0 was first observed
==7004==    at 0x4C3403C: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x1097F9: __gthread_mutex_lock(pthread_mutex_t*) (gthr-default.h:748)
==7004==    by 0x10A613: std::mutex::lock() (std_mutex.h:103)
==7004==    by 0x10B138: std::unique_lock<std::mutex>::lock() (std_mutex.h:267)
==7004==    by 0x10A751: std::unique_lock<std::mutex>::unique_lock(std::mutex&) (std_mutex.h:197)
==7004==    by 0x109925: produce(int) (nuevo.cpp:33)
==7004==    by 0x109C6D: producer(int) (nuevo.cpp:65)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==    by 0x10ADF9: std::__invoke_result<void (*)(int), int>::type std::__invoke<void (*)(int), int>(void (*&&)(int), int&&) (invoke.h:95)
==7004==    by 0x10DCC2: decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (*)(int), int> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (thread:234)
==7004==    by 0x10DC63: std::thread::_Invoker<std::tuple<void (*)(int), int> >::operator()() (thread:243)
==7004==    by 0x10DC33: std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(int), int> > >::_M_run() (thread:186)
==7004==  Address 0x3121c0 is 0 bytes inside data symbol "xmutex"
==7004==
==7004== Possible data race during write of size 1 at 0x5DA3350 by thread #3
==7004== Locks held: none
==7004==    at 0x4C3C56C: mempcpy (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x568D993: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1258)
==7004==    by 0x5681976: fwrite (iofwrite.c:39)
==7004==    by 0x4EEE3B4: long std::__copy_streambufs_eof<char, std::char_traits<char> >(std::basic_streambuf<char, std::char_traits<char> >*, std::basic_streambuf<char, std::char_traits<char> >*, bool&) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x4F55827: std::ostream::operator<<(std::basic_streambuf<char, std::char_traits<char> >*) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x109899: print(std::ostream&) (nuevo.cpp:13)
==7004==    by 0x109CFD: producer(int) (nuevo.cpp:69)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==    by 0x10ADF9: std::__invoke_result<void (*)(int), int>::type std::__invoke<void (*)(int), int>(void (*&&)(int), int&&) (invoke.h:95)
==7004==    by 0x10DCC2: decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (*)(int), int> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (thread:234)
==7004==    by 0x10DC63: std::thread::_Invoker<std::tuple<void (*)(int), int> >::operator()() (thread:243)
==7004==    by 0x10DC33: std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(int), int> > >::_M_run() (thread:186)
==7004==
==7004== This conflicts with a previous write of size 1 by thread #4
==7004== Locks held: 1, at address 0x3121C0
==7004==    at 0x4C3C546: mempcpy (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x568D993: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1258)
==7004==    by 0x5681976: fwrite (iofwrite.c:39)
==7004==    by 0x4EEE3B4: long std::__copy_streambufs_eof<char, std::char_traits<char> >(std::basic_streambuf<char, std::char_traits<char> >*, std::basic_streambuf<char, std::char_traits<char> >*, bool&) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x4F55827: std::ostream::operator<<(std::basic_streambuf<char, std::char_traits<char> >*) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x109899: print(std::ostream&) (nuevo.cpp:13)
==7004==    by 0x1099F0: produce(int) (nuevo.cpp:40)
==7004==    by 0x109C6D: producer(int) (nuevo.cpp:65)
==7004==  Address 0x5da3350 is 0 bytes inside a block of size 1,024 alloc'd
==7004==    at 0x4C30F2F: malloc (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x568018B: _IO_file_doallocate (filedoalloc.c:101)
==7004==    by 0x5690378: _IO_doallocbuf (genops.c:365)
==7004==    by 0x568F497: _IO_file_overflow@@GLIBC_2.2.5 (fileops.c:759)
==7004==    by 0x568D9EC: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1266)
==7004==    by 0x5681976: fwrite (iofwrite.c:39)
==7004==    by 0x4EEE3B4: long std::__copy_streambufs_eof<char, std::char_traits<char> >(std::basic_streambuf<char, std::char_traits<char> >*, std::basic_streambuf<char, std::char_traits<char> >*, bool&) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x4F55827: std::ostream::operator<<(std::basic_streambuf<char, std::char_traits<char> >*) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x109899: print(std::ostream&) (nuevo.cpp:13)
==7004==    by 0x1099F0: produce(int) (nuevo.cpp:40)
==7004==    by 0x109C6D: producer(int) (nuevo.cpp:65)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==  Block was alloc'd by thread #2
==7004== Producer 1 has eConsumer 2 consumed 9
==7004== ---Thread-Announcement------------------------------------------
==7004==
==7004== Thread #15 was created
==7004==    at 0x572387E: clone (clone.S:71)
==7004==    by 0x53EAEC4: create_thread (createthread.c:100)
==7004==    by 0x53EAEC4: pthread_create@@GLIBC_2.2.5 (pthread_create.c:797)
==7004==    by 0x4C36A27: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x4EFF834: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x10AEB0: std::thread::thread<void (&)(int), int&>(void (&)(int), int&) (thread:126)
==7004==    by 0x109F41: main (nuevo.cpp:97)
==7004==
==7004== ---Thread-Announcement------------------------------------------
==7004==
==7004== Thread #9 was created
==7004==    at 0x572387E: clone (clone.S:71)
==7004==    by 0x53EAEC4: create_thread (createthread.c:100)
==7004==    by 0x53EAEC4: pthread_create@@GLIBC_2.2.5 (pthread_create.c:797)
==7004==    by 0x4C36A27: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x4EFF834: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x10AEB0: std::thread::thread<void (&)(int), int&>(void (&)(int), int&) (thread:126)
==7004==    by 0x109F41: main (nuevo.cpp:97)
==7004==
==7004== ----------------------------------------------------------------
==7004==
==7004==  Lock at 0x3121C0 was first observed
==7004==    at 0x4C3403C: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x1097F9: __gthread_mutex_lock(pthread_mutex_t*) (gthr-default.h:748)
==7004==    by 0x10A613: std::mutex::lock() (std_mutex.h:103)
==7004==    by 0x10B138: std::unique_lock<std::mutex>::lock() (std_mutex.h:267)
==7004==    by 0x10A751: std::unique_lock<std::mutex>::unique_lock(std::mutex&) (std_mutex.h:197)
==7004==    by 0x109925: produce(int) (nuevo.cpp:33)
==7004==    by 0x109C6D: producer(int) (nuevo.cpp:65)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==    by 0x10ADF9: std::__invoke_result<void (*)(int), int>::type std::__invoke<void (*)(int), int>(void (*&&)(int), int&&) (invoke.h:95)
==7004==    by 0x10DCC2: decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (*)(int), int> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (thread:234)
==7004==    by 0x10DC63: std::thread::_Invoker<std::tuple<void (*)(int), int> >::operator()() (thread:243)
==7004==    by 0x10DC33: std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(int), int> > >::_M_run() (thread:186)
==7004==  Address 0x3121c0 is 0 bytes inside data symbol "xmutex"
==7004==
==7004== Possible data race during write of size 1 at 0x5DA335F by thread #15
==7004== Locks held: 1, at address 0x3121C0
==7004==    at 0x4C3C546: mempcpy (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x568D993: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1258)
==7004==    by 0x5681976: fwrite (iofwrite.c:39)
==7004==    by 0x4EEE3B4: long std::__copy_streambufs_eof<char, std::char_traits<char> >(std::basic_streambuf<char, std::char_traits<char> >*, std::basic_streambuf<char, std::char_traits<char> >*, bool&) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x4F55827: std::ostream::operator<<(std::basic_streambuf<char, std::char_traits<char> >*) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x109899: print(std::ostream&) (nuevo.cpp:13)
==7004==    by 0x109BA8: consume(int) (nuevo.cpp:55)
==7004==    by 0x109DD1: consumer(int) (nuevo.cpp:81)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==    by 0x10ADF9: std::__invoke_result<void (*)(int), int>::type std::__invoke<void (*)(int), int>(void (*&&)(int), int&&) (invoke.h:95)
==7004==    by 0x10DCC2: decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (*)(int), int> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (thread:234)
==7004==    by 0x10DC63: std::thread::_Invoker<std::tuple<void (*)(int), int> >::operator()() (thread:243)
==7004==
==7004== This conflicts with a previous read of size 1 by thread #9
==7004== Locks held: 1, at address 0x3121C0
==7004==    at 0x5712187: write (write.c:27)
==7004==    by 0x568D1BC: _IO_file_write@@GLIBC_2.2.5 (fileops.c:1203)
==7004==    by 0x568EF50: new_do_write (fileops.c:457)
==7004==    by 0x568EF50: _IO_do_write@@GLIBC_2.2.5 (fileops.c:433)
==7004==    by 0x568D9EC: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1266)
==7004==    by 0x5681976: fwrite (iofwrite.c:39)
==7004==    by 0x4EEE3B4: long std::__copy_streambufs_eof<char, std::char_traits<char> >(std::basic_streambuf<char, std::char_traits<char> >*, std::basic_streambuf<char, std::char_traits<char> >*, bool&) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x4F55827: std::ostream::operator<<(std::basic_streambuf<char, std::char_traits<char> >*) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x109899: print(std::ostream&) (nuevo.cpp:13)
==7004==  Address 0x5da335f is 15 bytes inside a block of size 1,024 alloc'd
==7004==    at 0x4C30F2F: malloc (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x568018B: _IO_file_doallocate (filedoalloc.c:101)
==7004==    by 0x5690378: _IO_doallocbuf (genops.c:365)
==7004==    by 0x568F497: _IO_file_overflow@@GLIBC_2.2.5 (fileops.c:759)
==7004==    by 0x568D9EC: _IO_file_xsputn@@GLIBC_2.2.5 (fileops.c:1266)
==7004==    by 0x5681976: fwrite (iofwrite.c:39)
==7004==    by 0x4EEE3B4: long std::__copy_streambufs_eof<char, std::char_traits<char> >(std::basic_streambuf<char, std::char_traits<char> >*, std::basic_streambuf<char, std::char_traits<char> >*, bool&) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x4F55827: std::ostream::operator<<(std::basic_streambuf<char, std::char_traits<char> >*) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x109899: print(std::ostream&) (nuevo.cpp:13)
==7004==    by 0x1099F0: produce(int) (nuevo.cpp:40)
==7004==    by 0x109C6D: producer(int) (nuevo.cpp:65)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==  Block was alloc'd by thread #2
==7004== Consumer 8 consumed 8 Consumer 1 consumed 7 Consumer 0 consumed 6 Producer 0 produced 6 Consumer 3 consumed 6 Producer 2 has exited Producer 4 has exited Consumer 9 consumed 5 xited Producer 3 produced 5 Consumer 4 consumed 5 Producer 0 has exited Consumer 5 consumed 4 Producer 3 has exited
==7004== ---Thread-Announcement------------------------------------------
==7004==
==7004== Thread #13 was created
==7004==    at 0x572387E: clone (clone.S:71)
==7004==    by 0x53EAEC4: create_thread (createthread.c:100)
==7004==    by 0x53EAEC4: pthread_create@@GLIBC_2.2.5 (pthread_create.c:797)
==7004==    by 0x4C36A27: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x4EFF834: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x10AEB0: std::thread::thread<void (&)(int), int&>(void (&)(int), int&) (thread:126)
==7004==    by 0x109F41: main (nuevo.cpp:97)
==7004==
==7004== ---Thread-Announcement------------------------------------------
==7004==
==7004== Thread #12 was created
==7004==    at 0x572387E: clone (clone.S:71)
==7004==    by 0x53EAEC4: create_thread (createthread.c:100)
==7004==    by 0x53EAEC4: pthread_create@@GLIBC_2.2.5 (pthread_create.c:797)
==7004==    by 0x4C36A27: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x4EFF834: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x10AEB0: std::thread::thread<void (&)(int), int&>(void (&)(int), int&) (thread:126)
==7004==    by 0x109F41: main (nuevo.cpp:97)
==7004==
==7004== ----------------------------------------------------------------
==7004==
==7004==  Lock at 0x3121C0 was first observed
==7004==    at 0x4C3403C: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x1097F9: __gthread_mutex_lock(pthread_mutex_t*) (gthr-default.h:748)
==7004==    by 0x10A613: std::mutex::lock() (std_mutex.h:103)
==7004==    by 0x10B138: std::unique_lock<std::mutex>::lock() (std_mutex.h:267)
==7004==    by 0x10A751: std::unique_lock<std::mutex>::unique_lock(std::mutex&) (std_mutex.h:197)
==7004==    by 0x109925: produce(int) (nuevo.cpp:33)
==7004==    by 0x109C6D: producer(int) (nuevo.cpp:65)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==    by 0x10ADF9: std::__invoke_result<void (*)(int), int>::type std::__invoke<void (*)(int), int>(void (*&&)(int), int&&) (invoke.h:95)
==7004==    by 0x10DCC2: decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (*)(int), int> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (thread:234)
==7004==    by 0x10DC63: std::thread::_Invoker<std::tuple<void (*)(int), int> >::operator()() (thread:243)
==7004==    by 0x10DC33: std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(int), int> > >::_M_run() (thread:186)
==7004==  Address 0x3121c0 is 0 bytes inside data symbol "xmutex"
==7004==
==7004== Possible data race during read of size 8 at 0x312190 by thread #13
==7004== Locks held: none
==7004==    at 0x10BAE1: std::_Deque_iterator<int, int&, int*>::difference_type std::operator-<int, int&, int*>(std::_Deque_iterator<int, int&, int*> const&, std::_Deque_iterator<int, int&, int*> const&) (stl_deque.h:356)
==7004==    by 0x10B1B6: std::deque<int, std::allocator<int> >::size() const (stl_deque.h:1272)
==7004==    by 0x10A79D: std::stack<int, std::deque<int, std::allocator<int> > >::size() const (stl_stack.h:191)
==7004==    by 0x109DAF: consumer(int) (nuevo.cpp:79)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==    by 0x10ADF9: std::__invoke_result<void (*)(int), int>::type std::__invoke<void (*)(int), int>(void (*&&)(int), int&&) (invoke.h:95)
==7004==    by 0x10DCC2: decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (*)(int), int> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (thread:234)
==7004==    by 0x10DC63: std::thread::_Invoker<std::tuple<void (*)(int), int> >::operator()() (thread:243)
==7004==    by 0x10DC33: std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(int), int> > >::_M_run() (thread:186)
==7004==    by 0x4EFF57E: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.25)
==7004==    by 0x4C36C26: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==7004==    by 0x53EA6DA: start_thread (pthread_create.c:463)
==7004==
==7004== This conflicts with a previous write of size 8 by thread #12
==7004== Locks held: 1, at address 0x3121C0
==7004==    at 0x10B419: std::deque<int, std::allocator<int> >::pop_back() (stl_deque.h:1607)
==7004==    by 0x10A82D: std::stack<int, std::deque<int, std::allocator<int> > >::pop() (stl_stack.h:261)
==7004==    by 0x109B24: consume(int) (nuevo.cpp:53)
==7004==    by 0x109DD1: consumer(int) (nuevo.cpp:81)
==7004==    by 0x10B73E: void std::__invoke_impl<void, void (*)(int), int>(std::__invoke_other, void (*&&)(int), int&&) (invoke.h:60)
==7004==    by 0x10ADF9: std::__invoke_result<void (*)(int), int>::type std::__invoke<void (*)(int), int>(void (*&&)(int), int&&) (invoke.h:95)
==7004==    by 0x10DCC2: decltype (__invoke((_S_declval<0ul>)(), (_S_declval<1ul>)())) std::thread::_Invoker<std::tuple<void (*)(int), int> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) (thread:234)
==7004==    by 0x10DC63: std::thread::_Invoker<std::tuple<void (*)(int), int> >::operator()() (thread:243)
==7004==  Address 0x312190 is 48 bytes inside data symbol "products"
==7004== Consumer 6 consumed 3 Consumer 7 consumed 2 Consumer 2 consumed 1 Consumer 1 consumed 0 Consumer 0 has exited Consumer 3 has exited Consumer 8 has exited Consumer 9 has exited Consumer 4 has exited Consumer 5 has exited Consumer 7 has exited Consumer 6 has exited Consumer 2 has exited Consumer 1 has exited
==7004==
==7004== For counts of detected and suppressed errors, rerun with: -v
==7004== Use --history-level=approx or =none to gain increased speed, at
==7004== the cost of reduced accuracy of conflicting-access information
==7004== ERROR SUMMARY: 599 errors from 3 contexts (suppressed: 1951 from 58)

最佳答案

我解决了竞争条件的问题,helgrind 不再提示,错误是 while(num_producers_working != 0 || products.size() > 0) 中的方法大小 最简单的解决方案是将大小放入关键部分:

template<typename T>
size_t stack_size(const stack<T> &obj ){
    lock_guard<mutex> lock(xmutex);
    return obj.size();

}

导致:while(num_producers_working != 0 || stack_size(products) > 0)

关于c++ - 问题生产者消费者中的竞争条件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55461063/

相关文章:

c++ - 如何创建图形并使用此代码调用算法

C++ - MFC - 更改 CMenu 的文本

c++ - 从原子内容的意义上说,从匿名管道中读取是原子的吗?

java - JAVA如何同时运行两个进程

java - 如何使用 wait() 和 notify() 正确暂停线程

c++ - 窗口 MFC : adjust child dialog to a tab control display area

c++ - 将指针用于条件 while/for 循环会在编译时出错

c++ - 无法从一种迭代器类型转换为另一种,但两者完全相同

java - 带有 RequestQue 的 while 循环中的 Thread.sleep ()

c# - 在不同 UI 线程上创建的 WPF Access 窗口