Thread Communication in Parallel Algorithms

Enabling efficient interaction between threads

By Lalit Pant

Lalit is the chief technical officer for Atlas Software, a consulting firm based in Pittsburgh, Pennsylvania. He can be contacted at lalitp@earthlink.net.

With the increasing availability of symmetric multiprocessing (SMP) hardware, parallel algorithms are playing a vital role in extracting faster performance from computer systems. But while the idea of parallelizing algorithms to achieve greater speeds is appealing, there are practical difficulties that need to be overcome before this can be done effectively. Chief amongst them is the issue of efficient interthread communication. Most parallel algorithms require some means by which threads can talk to each other. An inefficient communication mechanism is likely to retard the potential speed gains of parallelization.

In this article, I'll examine thread communication mechanisms that may be used within parallel algorithms. To facilitate comparisons between these mechanisms, I present an abstract problem and discuss the different mechanisms in the context of this problem. The discussion is based on primitives provided by Windows NT.

The Problem: Parallel Graph Traversal

Performing an operation on the nodes of a graph is a problem that arises in a variety of contexts, including downloading pages rooted at a web URL up to a specified depth, performing an operation on a set of files in a filesystem, and the like.

Examples such as these involve traversing a graph, and running an algorithm on each node as it is being traversed. The trick is to do the traversal in parallel. Here, I will take up the traversal of a special case of a graph -- the multiway tree. Differences between the two are not important for discussing usage of thread communication mechanisms within a parallel algorithm.

The first step is to look at the sequential traversal of a multiway tree. Listing One is a tree definition where each node contains a list of zero or more child nodes. The root node defines the tree.

Listing Two implements a sequential algorithm for breadth-first (or level order) traversal of a tree using a queue. The algorithm starts by putting the root of the tree into the queue. Within each iteration, a node is taken off the queue, its children are enqueued, and the node is processed. This continues until there are no more children; that is, until the leaf nodes are reached. Use of a queue is what makes this traversal breadth first. Using a stack would have resulted in a depth-first traversal.

Now I'll do the same thing in parallel. For this, I introduce the abstraction of a "concurrency engine." Instead of directly putting nodes that have to be processed into a queue, I enqueue them into a concurrency engine. The concurrency engine is an "active" object that has multiple worker threads running inside it to handle requests. Once a node is enqueued into the engine, a worker thread processes it in parallel. The concurrency engine is based on the class template ConcurrencyEngine, defined in Listing Three. The template has two template parameters -- Request and Work. These parameters let you specify a background task and contextual data required by the task to do its job. Work is a functor whose function call operator is called in a background thread within the engine. A pointer to the instance of the Request template parameter enqueued into the engine with enque() is passed to the function call operator. The init method of the engine sets up the underlying thread communication channel between threads calling enque() on the engine and threads running the Work functor.

So what exactly is a concurrency engine? It is an abstraction that provides a mechanism for asynchronous, concurrent execution of tasks. The template parameters used to instantiate an engine let you specify a policy that defines what these tasks will do and the contextual data required by these tasks. You put requests into the engine by calling enque(). A worker thread inside the engine calls deque() to receive the request. It then calls the Work functor to run a concurrent task based on this request. How the request is dispatched from a thread calling enque() to a thread calling deque() is up to the engine. Listing Four implements a parallel traversal algorithm that uses the concurrency engine. Compare this with Listing Two. Enqueuing the root looks the same, but what happened to the rest of the stuff? Well, it's gone into a Work functor, LotAlgo. The thread that initiates the traversal just needs to enqueue the root. The worker threads running the functor handle everything after that. Listing Five shows what they do.

The LotAlgo functor is called by a worker thread with the root node of the tree as a parameter. It, in turn, enqueues the children of this node back into the engine to allow a fellow worker to process these in parallel. The functor then proceeds to run a predefined unit of work on the current node.

With that out of the way, I'll now turn to thread-communication mechanisms that can be used to build a channel between a thread calling ConcurrencyEngine:: enque() and one calling Concurrency- Engine::deque().

Thread Communication Mechanisms

Executive objects within the NT kernel come in two flavors -- synchronizable and nonsynchronizable. Synchronizable objects can be used to synchronize threads. Such objects are either in a signaled or a nonsignaled state during the course of their lives. A thread can do a wait on these objects. If the object is in a signaled state, the thread's wait is satisfied. Otherwise, the thread blocks until some other thread (or an interrupt) comes along and sets the object's state to signaled, at which point the thread is awakened by the operating system. A few of these NT objects -- Mutexes, Events, and Semaphores -- exist for the sole purpose of synchronization. Others serve a different purpose, but can be used for synchronization as well. Of these, one that will be used later is the File object. Critical sections are lightweight objects provided by the Win32 subsystem for fast intraprocess synchronization. I refer to these synchronization objects as "data locks."

Another important class of synchronization primitives is "predicate locks." Operating systems such as Solaris provide a built-in predicate lock known as the "condition variable" (see Listing Six), but you have to roll your own on NT. A condition variable is linked to a predicate associated with some shared data in your program. Condition variables work in conjunction with external data locks. The predicate itself is not part of the condition variable. It is up to the threads using the condition variable to protect access to the shared data associated with the predicate by using the external lock. If a thread wants to examine the predicate to, say, check that the predicate is True before proceeding, it has to acquire the external lock. If it then finds that the predicate is False, it can do a wait on the condition variable. This releases the lock associated with the condition, allowing another thread to come along and change the shared data -- after acquiring the lock. If a thread sets the predicate to True, it can do a signal on the condition variable, waking up a thread sleeping inside the condition's wait. The signaler then has to release the external lock to allow the waiter to run. Condition variables, when used in conjunction with plain, old-fashioned queues, allow the construction of an elegant thread-communication mechanism -- the producer-consumer queue.

Producer-Consumer Queues

Producer-consumer queues provide a powerful mechanism for transferring information between threads. Consumer threads try to get information from producer threads via a shared buffer. If that information is not immediately available, they go to sleep until the time that a producer signals availability of information. Similarly, producer threads try to send information to consumer threads. If the information cannot be buffered, they go to sleep until a consumer thread signals availability of space in the buffer.

Listing Seven implements a producer-consumer queue. The underlying buffer is protected by a mutex during updates in read() and write(). The same mutex is used to initialize two condition variables that signal the notEmpty and notFull conditions, which allow consumers to block if no information is available, and producers to block if the underlying buffer is full. Listing Eight is an implementation of ConcurrencyEngine::enque() and ConcurrencyEngine::deque() using producer-consumer queues.

Though producer-consumer queues provide a powerful abstraction for interthread communication, they have certain implementation disadvantages. At minimum, they require a predicate lock to signal the not-full condition, another predicate lock to signal the not-empty condition, and an external lock to protect updates to the underlying buffer. The order in which consumer threads wake up to service requests from a producer is also likely to be first-in/first-out when operating system-supplied synchronization primitives are being used. This is not very efficient in terms of the availability of a thread's working set within a system's memory hierarchy, a point that is explained in a subsequent section. In addition, a large number of consumer threads are likely to bog down a computation-intensive algorithm. What is required is a mechanism for thread communication that minimizes locking overhead, allows for intelligent scheduling of worker threads, and provides for control over the number of workers running concurrently on a system. Enter I/O completion ports.

I/O Completion Ports

I/O completion ports are synchronization mechanisms that first appeared in Windows NT 3.5. To use them, you create a file handle for overlapped or asynchronous I/O, associate the handle with a completion port, and have your worker threads wait on the port. As soon as information becomes available on a handle attached to the port, a worker thread is woken up to handle the request. Completion ports have significant advantages over other synchronization mechanisms. With the other NT synchronization objects, waiting threads are released in first-in/first-out order. With completion ports, worker threads are released in last-in/first-out order. How does that matter? The explanation goes like this. The thread that ran last is the one most likely to have its working set in memory. On the other hand, the thread that has been waiting the longest is the one most likely to have more of its working set swapped out to disk. Thus, releasing waiting threads in last-in/first-out order is likely to minimize page faults and cache misses.

Completion ports also allow control over the number of threads associated with a port that can execute concurrently. So, you can have any number of worker threads waiting for requests over a port, but only a subset of these, as specified by the concurrency value of the port, will be allowed to execute concurrently. For example, suppose you have a four-processor system and you assign a concurrency value of four to your completion port. You start eight worker threads to handle requests for this port. Now, five requests become available on the port. Four of the workers can kick in and start servicing these requests. The concurrency value of four will prevent a fifth thread from waking up and handling the last request. But that's okay, because if the fifth thread woke up, it would preempt one of the four running threads. Your overall throughput would probably go down because of the context switch overhead this introduces. Having the four additional worker threads on standby helps whenever a running thread does something that makes it block, like reading from a disk. The moment that happens, another worker wakes up and starts processing the next request packet pending in the port.

I now need to hook up the worker threads within the concurrency engine to a completion port, and to make the enque() method put request packets into the port. Sockets offer a solution. On NT, socket handles are native file handles. Using them, it is fairly easy to hook things up appropriately. The first step is to associate a completion port with the engine. The engine constructor creates a bunch of worker threads to handle requests for this port. It also creates a manager thread. This thread in turn creates a TCP server socket, binds it to a TCP port, and starts waiting for connections on this socket. A client thread then calls init() on the engine. init() creates a client socket and connects it to the server socket. The manager thread within the engine accepts this connection and associates the corresponding socket with the completion port. Subsequently, a thread may call enque(). The enque() method writes to the client socket, at which point one of the workers sleeping on the completion port wakes up in the deque() method, reads the request packet, and services the request.

I now have a parallel tree traversal algorithm that uses sockets and I/O completion ports instead of a producer-consumer queue for thread communication inside the concurrency engine. An interesting feature of this algorithm is that it is possible to run the core of the concurrency engine on a fast machine on the network, thus speeding up execution of the algorithm.

But while this socket-based completion port solution is conceptually pleasing, a practical problem remains for single-machine implementations. Request packets reach worker threads via kernel memory. In the write to the socket inside enque(), the request packet is copied from user space to kernel space. Within deque(), it is copied back into user space. This is a fairly costly operation. Here, the flexibility of completion ports kicks in again. As it turns out, it is possible to bypass the usage of file objects when using completion ports. To do this, you have to post a packet to a completion port. This causes a worker thread waiting on that port to wake up, with access to the data that was posted. This is exactly what is required for the tree traversal algorithm. The enque() method posts a node to a completion port, and a worker thread dequeues this node and works on it as before; see Listing Nine.

Notice the advantages when you compare this with the usage of a producer-consumer queue. First, the producer-consumer queue solution has to employ a lock to guard updates to the queue. There is no explicit locking (and the attendant potential for bugs) involved in the completion port solution. Of course, there is no such thing as a free lunch. Inside the kernel, NT would be using a lock to manage access to the internal queue associated with the completion port. But the kernel handling of locking minimizes user-mode to kernel-mode switches in the case of completion ports. There is only one such mode switch within the enque()/deque() functions. In the producer-consumer queue case, there are three within each function! Second, the producer-consumer solution has to explicitly manage buffer memory, and enqueue-into/dequeue-from this buffer in a threadsafe manner. This is done implicitly by the NT kernel in the case of completion ports.

Conclusion

It is possible to adjust the underlying thread communication mechanism in a parallel algorithm without disturbing the overall structure of the algorithm. A communication mechanism based on I/O completion ports turns out to be optimal. It is easy to code and is very efficient in terms of working set usage, context switches (between threads), and mode switches (from user to kernel). Here, I've looked at a case where the operation running concurrently on the nodes of a graph can work independently on each node. In cases where dependencies between nodes with respect to an operation do exist, additional abstractions are required to synchronize the threads taking part in the execution of the algorithm.

DDJ

Listing One

// Multiway tree definition
template <class T> class MTree
{
public :
   // contains tree elements
   class Node
   {
 public:
      T& operator *();
      T* operator ->();
      ...
 private:
      T m_val;
      vector<Node*> m_children;
      ...
   }
 // enumerates child nodes of a node
   class Enumeration
   {
   public:
      Node& nextElement();
      bool hasMoreElements()
   }
   MTree();
   ~MTree();
   // returns tree root
   Node& root();
   // returns parent of given node
   Node& parent(Node& child);
   // returns child nodes of given node
   Enumeration& children(Node& parent);
private:
   Node *m_pRoot;
   ...
};

Back to Article

Listing Two

// Sequential level order traversal
// Function arguments :
// t - tree being traversed
// oper - the operation to be performed on each node
template<class T, class NodeOp>
void levelOrderTraversal(Tree<T>& t, const NodeOp& oper)
{
   queue<Tree<T>::Node *> q;
   q.push(&t.root());
   while(!q.empty())
   {
      Tree<T>::Node* node =3D q.front();
      q.pop();
      oper(*node);
      Tree<T>::Enumeration& e =3D t.children(*node);
      for(; e.hasMoreElements(); )
      {
         q.push(&e.nextElement());
      }
      e.free();
   }
}
// Operation to be performed on each node
// Passed as second template parameter to the function template above
template <class T> struct NodeOper
{
   void operator() (Tree<T>::Node& node) const
   {
    cout << *node << endl;
   }
};

Back to Article

Listing Three

// Concurrency engine
template <class Request, class Work> class ConcurrencyEngine
{
public :
   ConcurrencyEngine(Work& w);
   int init();
   // put request into engine
   void enque(Request* r);
private :
   Request* deque(); // Called by internal threads
   ...
};

// While instantiating an engine, a functor needs to be supplied as the 2nd
// template parameter. Its function call operator is called in the background
// by the concurrency engine with a pointer to an instance of the first
// template parameter
template <class Request> struct SomeFunctor
{
   void operator() (Request *r);
   ...
};

Back to Article

Listing Four

// Parallel level order traversal
// Function arguments :
// t - tree being traversed
// d - the operation to be performed on each node
template<class T, class Do>
void levelOrderTraversalMt(Tree<T>& t, const Do& d)
{
   LotAlgo<T, Do> algo(t, d); // LotAlgo defined in listing five
   ConcurrencyEngine<Tree<T>::Node, LotAlgo<T, Do> >  q(algo);
   algo.setEngine(&q);
   q.init();
   q.enque(&t.root());
}

Back to Article

Listing Five

// Level order traversal functor
template <class T, class Do>
struct LotAlgo
{
   Tree<T>& m_t;
   const Do& m_work;
   ConcurrencyEngine<Tree<T>::Node, LotAlgo<T, Do> >*  m_q;
   ...
   void operator() (Tree<T>::Node* r)
   {
      Tree<T>::Enumeration& e =3D m_t.children(*r);
      // enque child nodes back into the engine
      for(; e.hasMoreElements(); )
      {
         m_q->enque(&e.nextElement());
      }
      // work on the current node
      m_work(r);
      e.free();
   }
};

Back to Article

Listing Six

// condition cariable interface
class Cv
{
public :
    Cv(HANDLE lock);
    ~Cv();
    void wait();
    void signal();
};

Back to Article

Listing Seven

// Producer Consumer queue
template <class Message> class PCQueue
{
private:
   HANDLE m_lock;
   Cv      notEmpty;                         
   Cv      notFull;                          
   queue<Message>   m_queue;                          
   size_t         maxSize;
public:
   PCQueue(size_t size=3D1500)                    
      : maxSize(size), m_lock(CreateMutex(0, 0, 0)), notEmpty(m_lock),
        notFull(m_lock)
   {
   }
   ~PCQueue()                    
   {
      CloseHandle(m_lock);
   }
   Message read(void)
   {
      WaitForSingleObject(m_lock, INFINITE);
      while(m_queue.empty())
      {                         
         notEmpty.wait();                              
      }
      Message result =3D m_queue.front();
      m_queue.pop_front();

      notFull.signal();
      ReleaseMutex(m_lock);
      return result;
   }
   void write(Message m)
   {
      WaitForSingleObject(m_lock, INFINITE);
      while(m_queue.size() =3D=3D maxSize)
      {
         notFull.wait();                               
      }
      queue.push_back(m);
      notEmpty.signal();                               
      ReleaseMutex(m_lock);
   }
};

Back to Article

Listing Eight

// Engine dispatch mechanism using producer consumer queue
template <class Request, class Work> class ConcurrencyEngine
{
 ...
private :
 ...
 PCQueue<Request *> m_pcQueue;
};
template <class Request, class Work>
void ConcurrencyEngine<Request, Work>::enque(Request* r)
{
   m_pcQueue.write(r);
}
template <class Request, class Work>
Request* ConcurrencyEngine<Request, Work>::deque()
{
   return m_pcQueue.read ();
}

Back to Article

Listing Nine

// Engine dispatch mechanism using completion port
template <class Request, class Work> class ConcurrencyEngine
{
   ...
private :
   ...
   HANDLE m_ioPort; // completion port for enqued requests
};
template <class Request, class Work>
void ConcurrencyEngine<Request, Work>::enque(Request* r)
{
   PostQueuedCompletionStatus(m_ioPort, sizeof(Request *), (unsigned
 long)r, 0);
}
template <class Request, class Work>
Request* ConcurrencyEngine<Request, Work>::deque()
{
   DWORD key, read;
   OVERLAPPED *pOv;
   ...
   GetQueuedCompletionStatus(m_ioPort, &read, &key, &pOv, -1);
   return (Request *)key;
}



Back to Article


Copyright © 1999, Dr. Dobb's Journal