Listing 3: A thread-safe queue implementation that uses semaphores

#ifndef __WIN32Q2_H__
#define __WIN32Q2_H__

#include <windows.h>
#include <process.h>
#include <assert.h>
#include <deque>
#include <iostream>

using namespace std;



class Data {
public:
    Data() : x(0) {}

    Data(int i) : x(i) {}

    int x;
};

// BOUNDED INTERTHREAD QUEUE
// NOTE : for simplicity, the class declaration includes
// all the code in-line. A real implementation would probably
// want to move the executable code to a separate file and place
// the declaration into an include file
class BoundedInterthreadQueue {
private:
    deque<Data *>    m_qData;           // underlying data store
    CRITICAL_SECTION m_csec;            // data store lock
    HANDLE       m_semSpaceAvailable;   // semaphore
    HANDLE       m_semElementAvailable; // semaphore

    void init(long bound) {
        // NOTE : error handling is omitted
        // a strategy should be implemented for what to do 
        // if a CreateSemaphore doesn't work.

        // initialize the critical section protecting the deque
        InitializeCriticalSection(&m_csec);
    
        // create the 'put' counter semaphore
        // the initial count is equal to the bound
        m_semSpaceAvailable = CreateSemaphore(0,bound,bound,0);
        assert(m_semSpaceAvailable != INVALID_HANDLE_VALUE);
    
        // create the 'get' counter semaphore
        // the initial count is equal to 0
        m_semElementAvailable = CreateSemaphore(0,0,bound,0);
        assert(m_semSpaceAvailable != INVALID_HANDLE_VALUE);
    }

public:
    BoundedInterthreadQueue() {
        // whatever default queue bound
        init(10);
    }

    BoundedInterthreadQueue(long bound) {
        // specified bound
        init(bound);
    }

    ~BoundedInterthreadQueue() {
        if (m_semSpaceAvailable != INVALID_HANDLE_VALUE) {
            CloseHandle(m_semSpaceAvailable);
        }
        if (m_semElementAvailable != INVALID_HANDLE_VALUE) {
            CloseHandle(m_semElementAvailable);
        }
    }


    void put(Data *p) {
        DWORD w;
        BOOL  b;

        w = WaitForSingleObject(m_semSpaceAvailable,INFINITE);
        assert(w == WAIT_OBJECT_0);

        // several threads can get this far, but only one at a time
        // will get by the critical section
        EnterCriticalSection(&m_csec);
        if (m_qData.size() < m_qData.max_size()) {
            m_qData.push_back(p);
        }
        LeaveCriticalSection(&m_csec);

        // signal element is available
        b = ReleaseSemaphore(m_semElementAvailable,1,0);
        assert(b != 0);
    }

    Data *get() {
        Data *p;
        DWORD w;
        BOOL  b;

        // wait for available element
        w = WaitForSingleObject(m_semElementAvailable,INFINITE);
        assert(w == WAIT_OBJECT_0);

        // several threads can get this far, but only one 
        // at a time will get by the critical section
        EnterCriticalSection(&m_csec);
        if (!m_qData.empty()) {
            p = m_qData.front();
            m_qData.pop_front();
        }
        else {
            p = 0;
        }
        LeaveCriticalSection(&m_csec);

        // signal element is available
        b = ReleaseSemaphore(m_semSpaceAvailable,1,0);
        assert(b != 0);

        return p;
    }
};