Most Win32 synchronization primitives are just that primitive. But you can use them to build queues that are safe and easy to use.
In his article, "Communicating Sequential Processes", C.A.R. Hoare introduced one of the first formalizations of thread synchronization concepts. Hoare described a design pattern for a Monitor/Condition Variable construct, which, when implemented properly, provides for thread-safe access to an object. In addition, this construct gives threads the capability to wait for changes in an arbitrary condition associated with the object and then respond to those changes. This type of waiting is called a "predicate wait," where the predicate is arbitrary. One predicate wait application is a bounded interthread queue implementation. This article discusses the mechanisms provided by Win32 to support these constructs. It turns out that the Java thread synchronization mechanisms are modeled explicitly around the monitor/condition variable scheme. It's a nice model, and Win32 also supports it, though not quite so explicitly.
The simple lock/unlock mechanism, using critical sections, mutexes, or semaphores, provides protection for shared resources, but it provides no way to wait for conditions to occur. For example, consider an application that has threads operating in a producer/consumer relationship. One or more threads may be producers, generating data that is placed in a container. Other threads may be consumers and need to retrieve data from the container as it becomes available. What happens when a consumer looks at the container and finds it empty? The consumer needs to wait until something shows up. A consumer can wait by doing other work and then trying again later (polling), but this strategy is inefficient and hard to structure in a program. It would be better for the consumer to simply go to sleep and wake up when data becomes available. Producer threads present a similar problem. If the container has a bounded size, then at some point the producer may find the container full and thus need to wait for space to become available before continuing.
A bounded interthread queue is an application of the monitor/condition pattern. Similar to an STL deque or queue, this sort of queue lets you add elements at the back and remove them from the front. However, the interthread queue also provides a way to wait for elements or space to become available. This makes it easy for one thread to be a producer, generating elements that are added to the queue, and another thread to be a consumer, getting elements from the queue as they become available.
Why bound the queue? Although you could implement an interthread queue that is not bounded, in general a robust application should place bounds of some kind on all dynamic resource allocations. Otherwise it will be indeterminate when a particular application will die from resource starvation.
The declaration for a bounded queue object might look like this:
Class Data { ... }; class BoundedInterthreadQueue { public: void push_back(const Data& x); void pop_front(); const Data &front(); bool empty(); size_type size() const; size_type max_size() const; };The queue's actual data-store object (not shown) would most likely be an STL container, such as a vector or deque. You couldn't use one directly, though, even if the STL implementation was thread safe. Being thread safe is not enough. An STL container does not implement the additional semantics required. It does not provide a thread the ability to detect an empty queue and then wait for an element to show up. It does not provide the ability to detect a full container and then wait for available space. With basic STL containers, an application must poll.
To solve this problem, I add methods that will model the proper semantics. For simplicity, I continue to use an STL container as the underlying implementation, but I expose a different interface tailored to producer/consumer semantics. The STL container interface won't work because "getting" an object is non-atomic. You must first look at the object with front or top and then separately remove it with pop_front. This implies a requirement for some sort of wrapper. Note that the wrappers could (and probably should) be made generic with a template. (For simplicity that is left off here.)
Also, for simplicity, assume that the producer allocates a Data object, passes it in to BoundedInterthreadQueue, which returns pointers to data objects to the consumer. The consumer deallocates the object. Other allocation policies, such as copy on put and get, could be implemented.
A first cut implementation in pseudocode might look like this:
class BoundedInterthreadQueue { public: void put(Data *x) { wait for space in the container store the object into the container signal that an object is available } Data *get() { Wait for an object to be available Remove the object from the container Signal that space is available Return the object } };Producer/Consumer Dynamics
Before going further with the algorithms, I present a simple test fixture to illustrate the dynamics of producer and consumer threads. This text fixture is shown in main.cpp (Listing 1), which creates a number of producer and consumer threads and performs some multithreaded gets and puts on the queue.
At the top of main.cpp there are two defines, SLOW_PRODUCER and SLOW_CONSUMER, and two constants, NUMBER_OF_PRODUCERS and NUMBER_OF_CONSUMERS. These values force the test threads to model the possible application scenarios.
A real application will involve one of three situations:
1. The producer is slower than the consumer (simulated by defining SLOW_PRODUCER).
2. The consumer is slower than the producer (simulated by defining SLOW_CONSUMER).
3. The producer and consumer run at about the same rate (simulated by defining neither SLOW_PRODUCER nor SLOW_CONSUMER).
In situation 1, the consumer will always be ahead of the producer. How fast the producer can run governs throughput, and the system will never overflow. The consumer will spend most of its time waiting. The order of execution will usually alternate as put-get-put-get. This situation models a steady-state input rate, and typically the queue size doesn't need to be very large.
In situation 2, the consumer is the bottleneck. The bounding of the queue automatically forces the producer to throttle back when the consumer can't keep up. The order of execution will usually be a series of puts and then a series of gets. This situation models a bursty input model. The queue bound must be large enough to handle the maximum expected input burst. Of course, if the mismatch in rates is too great, data will eventually be lost somehow.
In situation 3, the overall system is the bottleneck. The execution order will be random and will be influenced by the kernel scheduling algorithm.
The test fixture supports any number of producers and consumers, and you can set the number created using the constants NUMBER_OF_PRODUCERS and NUMBER_OF_CONSUMERS.
A Thread-Safe, Polled Implementation
This implementation doesn't provide the required semantics, but I present it as an illustration. It is a polled scheme. This implementation also is not thread safe to start with, since it relies on non-atomic accesses to the underlying container. So first, I add thread safety using a Win32 critical section, but leave out the waiting parts for now. The complete code is in win32q1.h (Listing 2). Notice that it's up to this class's clients to handle the overflow and underflow situations. However they do it, some sort of periodic polling will be required, which may be undesirable in a multithreaded case.
class BoundedInterthreadQueue { public: void put(Data *x) { Enter Critical Section if there is space in the container Store the object in the container else Handle it as an error (throw an exception, return code etc) endif Leave Critical Section } Data *get() { Enter Critical Section if the container is not empty Get an object from the container else Handle it as an error (throw an exception, return code etc) endif Leave Critical Section Return the object } };A Thread-Safe Semaphore Implementation
The polled implementation has been made thread safe, but it is still missing the wait semantics. For a situation where the condition to be tested is strictly an integral count (such as "number of elements in the container"), a limited monitor/condition variable pattern can be implemented using standard Win32 counting semaphores. Using the semaphores in this manner is a special case, since it doesn't allow arbitrary predicates. On the other hand, this implementation is nice because it can be used on Windows 95/98 systems, while the full monitor/condition variable implementation is available only on Windows NT 4.0 or later, as we'll see later in this article. The full monitor/condition variable implementation allows the program to wait on an arbitrary predicate rather than just a count value.
Since both get and put must be able to do a wait, I will use two counting semaphores. The get semaphore will be initialized with a count of zero and will be incremented when elements are added to the queue and decremented when elements are removed. The put semaphore will be initialized to the maximum number of elements the queue can hold and will be incremented when elements are removed and decremented when elements are added.
The get method works as follows:
1. Wait on ElementAvailable.
2. Lock access to the underlying STL deque.
3. Get an item.
4. Unlock the deque
5. Signal SpaceAvailable.The put method is similar:
1. Wait on SpaceAvailable.
2. Lock access to the underlying STL deque.
3. Push an item.
4. Unlock the deque.
5. Signal ElementAvailable.The complete code for this implementation, with test threads, is shown in win32q2.h (Listing 3).
Thread-Safe Implementation with Predicate Wait
So far, I haven't shown a real monitor/condition variable construct. For BoundedInterthreadQueue, a strict implementation isn't required because I could use counting semaphores to provide the bounding. But what if the condition I need to wait on is a more complex predicate? Perhaps the producer will produce one element at a time, but the consumer wants to get three at a time. Maybe some other system condition needs to be part of the predicate. A complete monitor/condition variable implementation allows arbitrarily complex predicates.
I now reimplement the bounded queue example using a monitor/condition variable scheme. To implement the bounded queue, a Win32 system call, SignalObjectAndWait, is required. This call is implemented only in Windows NT 4.0 and Windows 2000. It isn't supported in Windows 95/98. If you're using Visual C++, you need to be sure the symbol _WIN32_WINNT=0x0400 is defined. The complete implementation is shown in win32q4.h (Listing 4). It still uses the main.cpp test harness.
SignalObjectAndWait is the basis for implementing the monitor/condition variable pattern in Win32 (under NT). A tricky call, SignalObjectAndWait performs two operations in an atomic manner. It signals one synchronization object and then waits on another. The signal and wait are atomic, meaning no thread preemption will occur between them. Pseudocode for SignalObjectAndWait would look like the following. (Ignore alertable for now, assume it is always set to FALSE.)
DWORD SignalObjectAndWait(MUTEX mtx, Event wt, DWORD timeout, BOOL alertable) { disable preemption Signal the object specified by argument 'mtx' WaitForSingleObject on argument 'wt' with 'timeout' and at the same time implicitly reenable preemption, allowing some other thread to run }A "monitor" is a critical section, in this case implemented with a Win32 mutex. The previous implementations used a critical section, but SignalObjectAndWait doesn't support those, so I use a mutex. Each BoundedInterthreadQueue method that accesses the underlying container state must implement the monitor by calling WaitForSingleObject(mutex) at the beginning of the method and releasing it at the end with ReleaseMutex.
In this implementation, the space-available and element-available signals use Win32 events rather than semaphores because these events are meaningful only to a thread that is actually waiting at the time the event occurs. If no one is waiting, the event is a non-event.
I now add the condition variable functionality to make it a non-polled implementation. The pseudocode for the put routine is shown in Figure 1. The pseudocode for the get routine is shown in Figure 2.
It is not completely obvious why the two SignalObjectAndWait actions need to be atomic. The reason is that this call is entered only when the predicate is not true. The thread entering this call has released the monitor and is waiting for something to happen to allow it to reevaluate the predicate. In this case, a thread doing a put will be waiting for another thread to do a get, which will cause the SpaceAvailable event to be pulsed, which will wake up the thread doing the put and let it retry the condition.
If SignalObjectAndWait was not atomic, then the put thread might be preempted between releasing the monitor (the signal) and performing the wait (on the event). The get thread could run in that gap, pulse the event, and later the put thread would run and wait on the event. But the event has already been lost, so the put thread will never wake up, reevaluate the predicate, and succeed. Or, if a subsequent get is performed, then two spaces will exist in the container, but the waiting put thread will use only one of them, so in effect a space in the container will be lost.
This monitor/condition pattern is modeled in Java with the calls wait, notify, and notifyAll. The Java wait is analogous to the Win32 SignalObjectAndWait and the notify is analogous to PulseEvent with autoreset. Instead of just waking up one waiting thread, the notifyAll call in Java wakes up all waiting threads and lets them all reevaluate the predicate. Reevaluating the predicate may be required if the predicate doesn't force a strict one-to-one correspondence between get and put type operations. In Win32, notifyAll is analogous to PulseEvent with a manual reset, which will wake up all waiting threads.
Generic Thread-Safe Monitor/Condition Variable Queue
win32q4.h and main4.cpp (not shown) provide a template version of BoundedInterthreadQueue. Adding templates is very simple in this case; the container will then support any STL deque-compatible type. (All source files are available from the CUJ ftp site. See p. 3 for downloading instructions.)
Summary
Some STL container implementations are thread safe, but these containers are not bounded and provide no direct support for threads waiting on a condition (such as container full). However, STL containers can be used as the underlying data store for thread-safe, bounded containers that support condition wait. In this article I showed an implementation based on semaphores suitable for Windows 95/98; I then showed an implementation for Windows NT and Windows 2000 that supported waiting on an arbitrary predicate. The latter implementation depends upon the SignalObjectAndWait function available within Windows NT and Windows 2000. This function is not available on Windows 95/98.
David M. Howard works at Sierra Nevada Corporation, a defense electronics firm located in Sparks, Nevada. He has 15 years programming experience in multithreaded applications. He has an MS in Computer Science from the University of North Texas and has MCSD and Sun Java Programmer/Developer certification.