c++ - 线程安全管道端接

标签 c++ multithreading visual-c++ mfc win32-process

(开始前请注意:虽然我的问题很笼统,但我的代码需要使用遗留的 Visual Studio 2008 MFC 应用程序进行编译,并且必须使用 MFC 或 win32 同步,请避免使用 ie boost 或 c++ 11 回答)

我正在尝试实现一个线程安全管道(具有单个读取器和单个写入器的队列),我执行了以下操作:

template<class T>
class CMultiThreadPipe { 

private:
    HANDLE hSemaphore, hTerminateEvent1, hTerminateEvent2;
    CRITICAL_SECTION listMutex; 
    CList<T*, T*> list;

public:
    CMultiThreadPipe() { 
        InitializeCriticalSection(&listMutex);
        hSemaphore = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
        hTerminateEvent1 = ::CreateEvent(NULL, TRUE, FALSE, NULL); 
        hTerminateEvent2 = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    // pdata must be allocated with new. The dequeueing thread will delete it
    void Enqueue(T* pdata) { 
        EnterCriticalSection(&listMutex);
        list.AddHead(pdata);
        LeaveCriticalSection(&listMutex);
        ReleaseSemaphore(hSemaphore, 1, NULL);
    }

    // if Dequeue returns null it means the pipe was destroyed and no further queue method calls are legal
    // Dequeue caller is responsible to delete the returned instance
    T* Dequeue()
    {
        HANDLE handles[] = { hTerminateEvent1, hSemaphore };
        DWORD waitRes = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
        if (waitRes==WAIT_OBJECT_0) {
            SetEvent(hTerminateEvent2);
            return NULL; // terminated
        }
        EnterCriticalSection(&listMutex);
        T* elem = list.RemoveTail(); 
        LeaveCriticalSection(&listMutex);
        return elem; // handler must delete item
    }

    void Destroy() {
        SetEvent(hTerminateEvent1);
        WaitForSingleObject(hTerminateEvent2, INFINITE);
        EnterCriticalSection(&listMutex);
        POSITION pos = list.GetHeadPosition(); 
        for (int i = 0; i < list.GetCount(); i++) delete list.GetNext(pos); 
        LeaveCriticalSection(&listMutex);
        DeleteCriticalSection(&listMutex);
        CloseHandle(hSemaphore);
    }

    ~CMultiThreadPipe() { 
        Destroy();
    }
};

代码是这样使用的:

class QueueData {
    public:
        QueueData(int i) : m_data(i) {};
        int m_data;
};

UINT DequeueThreadProc(LPVOID dummy);

CMultiThreadedPipe<QueueData>* pPipe = NULL;

void main() {
    pPipe = new CMultiThreadedPipe<QueueData>();
    start new thread running DequeueThreadProc

    int counter=0;
    for (int counter=0; counter<10; counter++)
    {
        pPipe->Enqueue(new QueueData(counter));
        Sleep(300);
    }
    delete pPipe;
}

UINT DequeueThreadProc(LPVOID ignore)
{
    QueueData* queueData;
    while ((queueData = pPipe->Dequeue()) != NULL) {
        delete queueData;
        Sleep(1000);
    };
    return 0;
}

我遇到的问题是终止,在上面的实现中,当管道被销毁时(总是由入队线程),它正在等待出队线程知道它在删除队列之前已经终止。它必须这样做以防止出列线程在管道被销毁后尝试出列的情况。

如果出队线程不继续调用 dequeue,则第一个线程将卡在析构函数中,如果出队线程在调用出队之间等待很长时间,则第一个线程的析构函数将相应地卡在那里。

我阅读了各种关于它的帖子,没有提到安全销毁。任何帮助表示赞赏!

最佳答案

对于从多个线程访问的安全销毁对象,您需要对其使用引用计数。在将对象指针传递给新线程之前 - 您增加了对对象的引用。当线程不再使用对象时,或者如果创建线程失败,则减少引用计数。当对象的最后一个引用被释放时——你可以安全地调用对象的析构函数。你不需要在这里等待任何线程。

也用于实现这样的队列 - 在 Windows 中存在特殊对象 - 在用户空间中命名为 I/O Completion Ports(在内核空间中称为 KQUEUE)。使用此对象 - 实现将更加高效和简单 - 您无需管理 self 列表(代码中的 CList),同步对其的访问 - 所有这些都将在内核空间中为您完成( PostQueuedCompletionStatus -> KeInsertQueue , GetQueuedCompletionStatus -> KeRemoveQueue )。您只需要创建 iocp,(kqueue)对象。

class CMultiThreadPipe {

public:

    class __declspec(novtable) QueueData {
    public:

        virtual void ProcessItem() = 0;

        virtual ~QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }

        QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }
    };

private:
    HANDLE _hIOCP;
    LONG _dwRef;
    ULONG _nThreads;

    void DequeueThreadProc()
    {
        ULONG NumberOfBytesTransferred;
        QueueData* pData;
        OVERLAPPED* pOverlapped;

        while (GetQueuedCompletionStatus(_hIOCP, 
            &NumberOfBytesTransferred, 
            (ULONG_PTR*)&pData, 
            &pOverlapped, INFINITE))
        {
            if (pData)
            {
                pData->ProcessItem();
            }
            else
            {
                break;
            }
        }

        Release();
    }

    __declspec(noreturn) static DWORD CALLBACK _DequeueThreadProc(PVOID pThis)
    {
        reinterpret_cast<CMultiThreadPipe*>(pThis)->DequeueThreadProc();
        FreeLibraryAndExitThread((HMODULE)&__ImageBase, 0);
    }

    ~CMultiThreadPipe()
    {
        if (_hIOCP)
        {
            CloseHandle(_hIOCP);
        }
    }

public:

    CMultiThreadPipe() : _dwRef(1), _hIOCP(0)
    {
    }

    void AddRef()
    {
        InterlockedIncrement(&_dwRef);
    }

    void Release()
    {
        if (!InterlockedDecrement(&_dwRef))
        {
            delete this;
        }
    }

    ULONG Create(DWORD NumberOfDequeueThreads)
    {
        if (_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, NumberOfDequeueThreads))
        {
            ULONG n = 0;
            do 
            {
                HMODULE hModule;
                if (GetModuleHandleExW(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS, (PCWSTR)_DequeueThreadProc, &hModule))
                {
                    AddRef();

                    if (HANDLE hThread = CreateThread(0, 0, _DequeueThreadProc, this, 0, 0))
                    {
                        CloseHandle(hThread);
                        n++;
                    }
                    else
                    {
                        Release();
                        FreeLibrary(hModule);
                    }
                }

            } while (--NumberOfDequeueThreads);

            _nThreads = n;

            return n ? NOERROR : ERROR_GEN_FAILURE;
        }

        return GetLastError();
    }

    ULONG Enqueue(QueueData* pData)
    {
        return PostQueuedCompletionStatus(_hIOCP, 0, (ULONG_PTR)pData, 0) ? NOERROR : GetLastError();
    }

    void Destroy()
    {
        if (ULONG n = _nThreads)
        {
            do 
            {
                PostQueuedCompletionStatus(_hIOCP, 0, 0, 0);
            } while (--n);
        }
    }
};

和用法:

class QueueData : public CMultiThreadPipe::QueueData
{
    int m_data; 

    virtual void ProcessItem()
    {
        DbgPrint("%x: %s<%p>(%u)\n", GetCurrentThreadId(), __FUNCTION__, this, m_data);
        delete this;
    }
public:
    QueueData(int i) : m_data(i) {};
};

void testQueue()
{
    if (CMultiThreadPipe* pPipe = new CMultiThreadPipe)
    {
        if (pPipe->Create(8) == NOERROR)
        {
            int n = 64;

            do 
            {
                if (QueueData* pData = new QueueData(n))
                {
                    if (pPipe->Enqueue(pData))
                    {
                        delete pData;
                    }
                }
            } while (--n);

            pPipe->Destroy();
        }
        pPipe->Release();
    }
}

注意这样的 CMultiThreadPipe 实现 - 当工作线程退出时您不需要等待。即使您的代码在 dll 中并且您卸载了 dll - 您也不需要等待。每个线程都有自己的对象和模块引用。并在退出时释放它

关于c++ - 线程安全管道端接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52015122/

相关文章:

c++ - 如何修复引用特定 NuGet 包交付的 dll 版本的托管 C++ 程序集 (VS2017)

java - 检查线程关闭并在关闭后执行任务

java - wso2 mb 2.2.0 启动错误

c# - 不为空的空对象

c# - 如何使用 `#import` 和 `tlb` 文件从 C# 程序集在 C++ 中实现 COM 回调接口(interface)?

c++ - 代码分析说 Inconsistent annotation for 'wWinMain' : this instance has no annotations

c++ - C++20 中模板匹配函数重载的问题

c++ - 为什么 RemoveSplash 不再是 CMainFrame 的成员?

c++ - 如果为鼠标事件设置为透明,您如何仍然使用小部件的功能?

c++ - Visual Studio 2012 c++ 对 std 的智能感知评论可能吗?