#ifndef __WIN32Q3_H__
#define __WIN32Q3_H__
#include <windows.h>
#include <process.h>
#include <assert.h>
#include <deque>
#include <iostream>
using namespace std;
// BOUNDED INTERTHREAD QUEUE
// NOTE : for simplicity, the class declaration includes
// all the code in-line. A real implementation would probably
// want to move the executable code to a separate file and place
// the declaration into an include file
template <class T> class BoundedInterthreadQueue {
private:
deque<T> m_qData; // underlying data store
// note: SignalObjectAndWait doesn't support a
// critical section, so use this mutex instead to
// lock the data
HANDLE m_mtxObject; // monitor mutex
HANDLE m_evtSpaceAvailable; // event
HANDLE m_evtElementAvailable; // event
long m_nBound; // max elements in queue
void init(long bound) {
// NOTE : error handling is omitted
// a strategy should be implemented for what to do
// if a CreateMutex doesn't work.
m_nBound = bound;
// create the object mutex
m_mtxObject = CreateMutex(0,0,0);
assert(m_mtxObject != INVALID_HANDLE_VALUE);
// create the 'put' event, with auto reset
// use autoreset for Notify
// use manualreset for NotifyAll
m_evtSpaceAvailable = CreateEvent(0,FALSE,0,0);
assert(m_evtSpaceAvailable != INVALID_HANDLE_VALUE);
// create the 'get' event, with auto reset
// use autoreset for Notify
// use manualreset for NotifyAll
m_evtElementAvailable = CreateEvent(0,FALSE,0,0);
assert(m_evtElementAvailable != INVALID_HANDLE_VALUE);
}
public:
BoundedInterthreadQueue() {
// whatever default queue bound
// or throw an exception to indicate
// that an upper bound is required
init(10);
}
BoundedInterthreadQueue(long bound) {
// specified bound
init(bound);
}
~BoundedInterthreadQueue() {
// close out all the resources
if (m_evtSpaceAvailable != INVALID_HANDLE_VALUE) {
CloseHandle(m_evtSpaceAvailable);
}
if (m_evtElementAvailable != INVALID_HANDLE_VALUE) {
CloseHandle(m_evtElementAvailable);
}
if (m_mtxObject != INVALID_HANDLE_VALUE) {
CloseHandle(m_mtxObject);
}
}
// insert data into the queue
void put(T p) {
DWORD w;
BOOL b;
// lock access to the underlying objects
w = WaitForSingleObject(m_mtxObject,INFINITE);
assert(w == WAIT_OBJECT_0);
// while no space is available
// size > bound is the PREDICATE
// with this pattern, the PREDICATE can be any logical
// expression!
while(m_qData.size() >= m_nBound) {
// release monitor and wait for space to become
// available
b = SignalObjectAndWait(m_mtxObject,
m_evtSpaceAvailable, INFINITE, FALSE);
assert(b != 0xffffffff);
// reenter the monitor
w = WaitForSingleObject(m_mtxObject,INFINITE);
assert(w == WAIT_OBJECT_0);
}
// at this point, there is space available and we have
// the m_mtxObject lock
m_qData.push_back(p);
// notify next waiter that an object is available
// use 'pulse' because if no one is waiting,
// we don't need to leave the event signaled
b = PulseEvent(m_evtElementAvailable);
assert(b != 0);
// if manual reset is enabled, add it here
// b = ResetEvent(m_evtElementAvailable);
// assert(b != 0);
// signal object is available
b = ReleaseMutex(m_mtxObject);
assert(b != 0);
}
// get data from the queue
T get() {
T p;
DWORD w;
BOOL b;
// lock access to the underlying objects
w = WaitForSingleObject(m_mtxObject,INFINITE);
assert(w == WAIT_OBJECT_0);
// several threads can get this far, but only
// one at a time will get by the critical section
// empty is the PREDICATE
// with this pattern, the PREDICATE can be any logical
// expression!
while(m_qData.empty()) {
// release monitor and wait for elements to
// become available
b = SignalObjectAndWait(m_mtxObject,
m_evtElementAvailable, INFINITE, FALSE);
assert(b != 0xffffffff);
// reenter monitor
w = WaitForSingleObject(m_mtxObject,INFINITE);
assert(w == WAIT_OBJECT_0);
}
// get the element
p = m_qData.front();
m_qData.pop_front();
// notify next waiter that space is available
// use 'pulse' because if no one is waiting,
// we don't need to leave the event signaled
b = PulseEvent(m_evtSpaceAvailable);
assert(b != 0);
// if manual reset is enabled, add it here
// b = ResetEvent(m_evtSpaceAvailable);
// assert(b != 0);
// signal object is available
b = ReleaseMutex(m_mtxObject);
assert(b != 0);
return p;
}
};