Listing 4: A thread-safe queue implementation that uses monitor/condition variables

#ifndef __WIN32Q3_H__
#define __WIN32Q3_H__

#include <windows.h>
#include <process.h>
#include <assert.h>
#include <deque>
#include <iostream>

using namespace std;

// BOUNDED INTERTHREAD QUEUE
// NOTE : for simplicity, the class declaration includes
// all the code in-line. A real implementation would probably
// want to move the executable code to a separate file and place
// the declaration into an include file

template <class T> class BoundedInterthreadQueue {
private:
    deque<T>     m_qData;  // underlying data store
    // note: SignalObjectAndWait doesn't support a 
    // critical section, so use this mutex instead to 
    // lock the data
    HANDLE       m_mtxObject;           // monitor mutex
    HANDLE       m_evtSpaceAvailable;   // event
    HANDLE       m_evtElementAvailable; // event
    long         m_nBound;              // max elements in queue

    void init(long bound) {
        // NOTE : error handling is omitted
        // a strategy should be implemented for what to do 
        // if a CreateMutex doesn't work.
        m_nBound = bound;
        
        // create the object mutex
        m_mtxObject = CreateMutex(0,0,0);
        assert(m_mtxObject != INVALID_HANDLE_VALUE);

        // create the 'put' event, with auto reset
        // use autoreset for Notify
        // use manualreset for NotifyAll
        m_evtSpaceAvailable = CreateEvent(0,FALSE,0,0);
        assert(m_evtSpaceAvailable != INVALID_HANDLE_VALUE);
    
        // create the 'get' event, with auto reset
        // use autoreset for Notify
        // use manualreset for NotifyAll
        m_evtElementAvailable  = CreateEvent(0,FALSE,0,0);
        assert(m_evtElementAvailable != INVALID_HANDLE_VALUE);
    }

public:
    BoundedInterthreadQueue() {
        // whatever default queue bound
        // or throw an exception to indicate
        // that an upper bound is required
        init(10);
    }

    BoundedInterthreadQueue(long bound) {
        // specified bound
        init(bound);
    }

    ~BoundedInterthreadQueue() {
        // close out all the resources
        if (m_evtSpaceAvailable != INVALID_HANDLE_VALUE) {
            CloseHandle(m_evtSpaceAvailable);
        }
        if (m_evtElementAvailable != INVALID_HANDLE_VALUE) {
            CloseHandle(m_evtElementAvailable);
        }
        if (m_mtxObject != INVALID_HANDLE_VALUE) {
            CloseHandle(m_mtxObject);
        }
    }


    // insert data into the queue
    void put(T p) {
        DWORD w;
        BOOL  b;

        // lock access to the underlying objects
        w = WaitForSingleObject(m_mtxObject,INFINITE);
        assert(w == WAIT_OBJECT_0);

        // while no space is available
        // size > bound is the PREDICATE
        // with this pattern, the PREDICATE can be any logical
        // expression!
        while(m_qData.size() >= m_nBound) {
            // release monitor and wait for space to become 
            // available
            b = SignalObjectAndWait(m_mtxObject,
                    m_evtSpaceAvailable, INFINITE, FALSE);
            assert(b != 0xffffffff);

            // reenter the monitor
            w = WaitForSingleObject(m_mtxObject,INFINITE);
            assert(w == WAIT_OBJECT_0);
        }
            
        // at this point, there is space available and we have 
        // the m_mtxObject lock
        m_qData.push_back(p);

        // notify next waiter that an object is available
        // use 'pulse' because if no one is waiting, 
        // we don't need to leave the event signaled
        b = PulseEvent(m_evtElementAvailable);
        assert(b != 0);

        // if manual reset is enabled, add it here
        // b = ResetEvent(m_evtElementAvailable);
        // assert(b != 0);

        // signal object is available
        b = ReleaseMutex(m_mtxObject);
        assert(b != 0);
    }

    // get data from the queue
    T get() {
        T     p;
        DWORD w;
        BOOL  b;

        // lock access to the underlying objects
        w = WaitForSingleObject(m_mtxObject,INFINITE);
        assert(w == WAIT_OBJECT_0);

        // several threads can get this far, but only 
        // one at a time will get by the critical section
        // empty is the PREDICATE
        // with this pattern, the PREDICATE can be any logical
        // expression!
        while(m_qData.empty()) {
            // release monitor and wait for elements to 
            // become available
            b = SignalObjectAndWait(m_mtxObject,
                    m_evtElementAvailable, INFINITE, FALSE);
            assert(b != 0xffffffff);
            // reenter monitor
            w = WaitForSingleObject(m_mtxObject,INFINITE);
            assert(w == WAIT_OBJECT_0);
        }

        // get the element
        p = m_qData.front();
        m_qData.pop_front();

        // notify next waiter that space is available
        // use 'pulse' because if no one is waiting, 
        // we don't need to leave the event signaled
        b = PulseEvent(m_evtSpaceAvailable);
        assert(b != 0);

        // if manual reset is enabled, add it here
        // b = ResetEvent(m_evtSpaceAvailable);
        // assert(b != 0);

        // signal object is available
        b = ReleaseMutex(m_mtxObject);
        assert(b != 0);

        return p;
    }
};