Listing 1: The Channel class.

template < typename _Tp, typename _queueTp = std::deque<_Tp> >

class Channel : private _queueTp
{
private:
  Mutex monitor_; 
  Condition bufferNotFull_, bufferNotEmpty_;
  volatile bool bMayStop_;
  ...
public:
  // for consumer thread...
  _Tp poll(long msecs = -1) // ignore msecs for now, 
  {
    Lock lk(monitor_);
    while (!bMayStop_ && 0 == ((_queueTp *)this)->size())
    {
      bufferNotEmpty_.wait(lk);
    }
    // check if caller intentionally calls for stop
    if(bMayStop_ && 0 == ((_queueTp *)this)->size()) 
      throw pc_exception("consumer to end"); 
    // pop back
    _Tp item = pop();
    bufferNotFull_.notify_one();
    return item;
  }
  // for producer thread...
  bool offer(_Tp item, 
    long msecs = -1) // ignore msecs for now
  {
    Lock lk(monitor_);
    while (maxSize_ == ((_queueTp *)this)->size())
    {
      bufferNotFull_.wait(lk);
    }
    // push front
    push(item);
    bufferNotEmpty_.notify_one();
    return true;
  }
  virtual void mayStop(bool bMayStop = true) 
  {
    Lock lk(monitor_);
    bMayStop_ = bMayStop;
    if(bMayStop) 
    {
      // if producers exit, wake up the 
      // waiting consumer threads...
      bufferNotEmpty_.notify_all();
    }
  }
  ...
}; // Channel