Figure 3: PrioritySemaphore.cpp

#include "PrioritySemaphore.h"

////////////////////////////////////////////////////////////////
//
CPrioritySemaphore::CPrioritySemaphore() : 
   m_arrSemaphoreAndEvents(NULL),
   m_arrNumWaitingThreads(NULL),
   m_lNumPriorityLevels(0)
{
   // InitializeCriticalSection has no error checking.
   InitializeCriticalSection(&m_critsecPriorityProtection);
}

////////////////////////////////////////////////////////////////
//
CPrioritySemaphore::~CPrioritySemaphore()
{
   Cleanup();
   // DeleteCriticalSection has no error checking.
   DeleteCriticalSection(&m_critsecPriorityProtection);
}

////////////////////////////////////////////////////////////////
//
BOOL CPrioritySemaphore::Create(
   LONG lNumPriorityLevels, // number of priority levels
   LONG lInitialCount, // initial count
   LONG lMaximalCount) // maximal count
{
   // Check if already created.
   if(NULL != m_arrSemaphoreAndEvents)
   {
      SetLastError(ERROR_ALREADY_EXISTS);
      return FALSE;
   }
  
   // Check parameters.
   if(0 >= lNumPriorityLevels ||
      MAXIMUM_WAIT_OBJECTS <= lNumPriorityLevels)
   {
      SetLastError(ERROR_INVALID_PARAMETER);
      return FALSE;
   }
  
   // Transfer number of priority levels to member variable.
   m_lNumPriorityLevels = lNumPriorityLevels;
  
   // Allocate all memory and create all synchronization objects
   try
   {
      // Create the array of handles (semaphore+events). 
      // Allow failure of new to be handled with or without 
      // exceptions.
      m_arrSemaphoreAndEvents = new HANDLE[lNumPriorityLevels];
      if (NULL == m_arrSemaphoreAndEvents)
      {
         SetLastError(ERROR_NOT_ENOUGH_MEMORY);
         throw 0;
      }
    
      // NULL the array for proper cleanup in case of failure.
      for(LONG i=0; i<lNumPriorityLevels; ++i) 
         m_arrSemaphoreAndEvents[i] = NULL;

      // Create the semaphore.
      m_arrSemaphoreAndEvents[0] = 
         CreateSemaphore(NULL, lInitialCount, lMaximalCount, 
            NULL);
      //
      if(NULL == m_arrSemaphoreAndEvents[0]) throw 0;

      // Create the events and place the handles in the array, 
      // following the semaphore.
      for(i=1; i<lNumPriorityLevels; ++i) 
      {
         m_arrSemaphoreAndEvents[i] = 
            CreateEvent(NULL, TRUE, TRUE, NULL);
         if(NULL == m_arrSemaphoreAndEvents[i]) throw 0;
      }

      // Create the array of ints to hold numbers of waiting 
      // threads. Allow failure of new to be handled with or 
      // without exceptions.
      m_arrNumWaitingThreads = new LONG[lNumPriorityLevels];
      if(NULL == m_arrNumWaitingThreads)
      {
         SetLastError(ERROR_NOT_ENOUGH_MEMORY);
         throw 0;
      }
  
      // Initialize the number of waiting threads to 0 for 
      // each level.
      for(i=0; i<lNumPriorityLevels; ++i) 
         m_arrNumWaitingThreads[i] = 0L;
   }
   catch(int)
   {
      Cleanup();
      return FALSE;
   }
   catch(...)
   {
      Cleanup();
      throw;
   }

   return TRUE;
}

////////////////////////////////////////////////////////////////
//
DWORD CPrioritySemaphore::Wait(
   LONG lWaitPriority,
   DWORD dwMillisecondsTimeout /* = INFINITE */
   )
{
   if(NULL == m_arrSemaphoreAndEvents) 
   {
      SetLastError(ERROR_INVALID_DATA);
      return WAIT_FAILED;
   }
  
   if(0 > lWaitPriority || m_lNumPriorityLevels <= lWaitPriority)
   {
      SetLastError(ERROR_INVALID_PARAMETER);
      return WAIT_FAILED;
   }
  
   // Priority levels range from 0 to m_lNumPriorities - 1. 
   // If the priority level is the highest one, we want to set and 
   // reset the event with index 1 in the array. If it's the second 
   // highest priority level, the event index is 2, etc.
   LONG lEventIndex = m_lNumPriorityLevels - lWaitPriority;
  
   // Set the event so no wait operation with lower priority 
   // completes first, and increment the wait counter for this 
   // priority level.
   EnterCriticalSection(&m_critsecPriorityProtection);
   //
   if(0 < lWaitPriority)
   {
      if (!ResetEvent(m_arrSemaphoreAndEvents[lEventIndex]))
      {
         LeaveCriticalSection(&m_critsecPriorityProtection);
         return WAIT_FAILED;
      }
   }
   // 
   // Use InterlockedIncrement because of function 
   // GetNumWaitingThreads.
   InterlockedIncrement(
      (LONG*) (m_arrNumWaitingThreads + lEventIndex - 1)
      );
   //
   LeaveCriticalSection(&m_critsecPriorityProtection);
  
   // We must wait on the semaphore plus all events up to and 
   // excluding the one that is set and reset by the current 
   // priority level.
   DWORD dwWaitResult = WaitForMultipleObjects(
      lEventIndex,             // number of objects to wait on
      m_arrSemaphoreAndEvents, // address of array of objects
      TRUE,                    // wait for all objects to be 
                               // signalled
      dwMillisecondsTimeout);

   // Decrement the wait counter for this priority level, and
   // release the block on wait operations with lower priority
   // if we're the last one waiting on this level.
   EnterCriticalSection(&m_critsecPriorityProtection);
   //
   // Use InterlockedDecrement because of function 
   // GetNumWaitingThreads
   InterlockedDecrement(
      (LONG*) (m_arrNumWaitingThreads + lEventIndex - 1)
      );
   //
   if(0 < lWaitPriority)
   {
      if(0 == m_arrNumWaitingThreads[lEventIndex-1])
      {
         if (!SetEvent(m_arrSemaphoreAndEvents[lEventIndex]))
            dwWaitResult = WAIT_FAILED;
      }
   }
   //
   LeaveCriticalSection(&m_critsecPriorityProtection);
  
   // Return value is WAIT_OBJECT_0, WAIT_TIMEOUT or WAIT_FAILED
   return dwWaitResult;
}

////////////////////////////////////////////////////////////////
//
BOOL CPrioritySemaphore::Release(
   LONG lReleaseCount,     // amount to add to the current count
   LPLONG lpPreviousCount) // memory location to receive previous 
                           // count
{
   if(NULL == m_arrSemaphoreAndEvents) 
   {
      SetLastError(ERROR_INVALID_DATA);
      return FALSE;
   }
  
   return ReleaseSemaphore(
      m_arrSemaphoreAndEvents[0], 
      lReleaseCount, 
      lpPreviousCount);
}

////////////////////////////////////////////////////////////////
//
int CPrioritySemaphore::GetNumWaitingThreads(
   LONG lWaitPriority // priority level to check
   )
{
   if(NULL == m_arrSemaphoreAndEvents) 
   {
      SetLastError(ERROR_INVALID_DATA);
      return FALSE;
   }
  
   if(0 > lWaitPriority ||
       m_lNumPriorityLevels <= lWaitPriority)
   {
      SetLastError(ERROR_INVALID_PARAMETER);
      return 0;
   }

   // Priority levels range from 0 to m_lNumPriorities - 1. The
   // array containing the numbers of waiting threads starts with
   // the highest priority.
   //
   return 
      m_arrNumWaitingThreads[
         m_lNumPriorityLevels - lWaitPriority - 1];
}

////////////////////////////////////////////////////////////////
//
void CPrioritySemaphore::Cleanup()
{
   // Close semaphore handle and all event handles and 
   // delete array.
   if(NULL != m_arrSemaphoreAndEvents)
   {
      // Close semaphore and event handles.
      for(LONG i=0; i<m_lNumPriorityLevels; ++i)
     {
        if(NULL != m_arrSemaphoreAndEvents[i])
           CloseHandle(m_arrSemaphoreAndEvents[i]);
     }

     delete[] m_arrSemaphoreAndEvents;
     m_arrSemaphoreAndEvents = NULL;
   }

   delete[] ((int*) m_arrNumWaitingThreads);
   m_arrNumWaitingThreads = NULL;

   m_lNumPriorityLevels = 0;
}