omni_condition (Was Re: [omniORB] nameservice as NT-service and NT alpha port)

Saroj Mahapatra saroj@Bear.COM
Wed, 08 Apr 1998 16:01:41 -0400


Hi,

I think the NT implementation of omni_condition should change. In particular,
it should not assume that the thread that calls
omni_condition::wait() is a omni_thread.  I have already seen several messages
in this mailing list which result from this
assumption (incorrect).    omni_condition::broadcast() is inefficient too,
because it makes n calls to the thread
scheduler , where n is the size of the waiting queue.
Here is a solution that I devised after some goofing up and several mail
exchanges with Douglas
Schmidt,  David Holmes, Bil Lewis. It is implemented using STL, but you can
easily change it.

Another thing, I think you should seriously consider using SGI STL  or
STLport.   You can really reduce a lot of code and
get a great implementation.  I can contribute the patch to compile SGI STL for
Solaris SUNWspro 4.2 compiler.  Or, you can
get it from STLport.  The effort you put in using STL initially will pay rich
dividends later.


Thank you,
Saroj Mahapatra


class Mutex { ... };
class MutexLock
{
public:
  MutexLock(Mutex* mutex) : mutex_(mutex) {}
  ~MutexLock() { if (mutex_) mutex_->unlock();}
  void reset() { if (mutex_) mutex_->unlock(); mutex_ = 0; }
  void reset(Mutex* m) { // m should not be 0.
    if (mutex_) mutex_->unlock();
    (mutex_ = m)->lock();
  }
private:
  Mutex* mutex_;
  ...
};

class ConditionVariable
{
public:
  ...
private:
  struct HandlePtr {
    HANDLE handle;
    HandlePtr(HANDLE h) : handle(h) {}
    ~HandlePtr() { CloseHandle(handle); }
  };
  typedef std::pair<HANDLE, int* > Broadcast;
  struct BroadcastPtr {
    Broadcast broadcast;
    BroadcastPtr(Broadcast b) : broadcast(b) {}
    ~BroadcastPtr() { CloseHandle(b.first); delete b.second; }
  };
  struct MutexUnlock { ... };
  typedef std::deque<std::pair<HANDLE, Broadcast> > WaitQueue;
  WaitQueue waiters_;
  std::vector<HANDLE> free_events_; // in non-signaled state
  std::vector<Broadcast> free_broads_; // in non-signaled state
  BroadcastPtr broadcast_ptr_; // should be after free_broads_.
  Mutex mutex_;

  HANDLE get_event() {  // called with mutex_ held
    // returns a handle in non-signaled state.
    HANDLE ret;
    if (!free_events_.empty()) {
      ret = free_events_.back();
      free_events_.pop_back();
    } else ret = CreateEvent(0, true /* manual-reset*/,
        false /* non-signaled*/, 0);
    return ret;
  }

  Broadcast get_broadcast() { // called with mutex_ held
    // returns a sema in non-signaled state
    Broadcast ret;
    if (!free_broads_.empty()) {
      ret = free_broads_.back();
      free_broads_.pop_back();
    } else ret = Broadcast(CreateSemaphore(0, 0/*non-signaled*/,
        0x7fffffff, 0),
      new int /* no need to initialize */);
    return ret;
  }

  void destroy_broadcast(Broadcast b) {
    CloseHandle(b.first);
    delete b.second;
  }
};

ConditionVariable::ConditionVariable()
  : broadcast_ptr_(get_broadcast())
{
}

ConditionVariable::~ConditionVariable()
{
  if (!waiters_.empty()) {
    typedef WaitQueue::const_iterator I;
    I last = waiters_.end(); --last;
    for (I i = waiters_.begin(); i != last; ++i) {
      CloseHandle((*i).first);
      I next = i; ++next;
      if ((*i).second.second != (*next).second.second)
        destroy_broadcast((*i).second);
    }
    CloseHandle((*last).first);
    destroy_broadcast((*last).second);
  }
  std::for_each(free_events_.begin(), free_events_.end(), CloseHandle);
  std::for_each(free_broads_.begin(), free_broads_.end(),
     destroy_broadcast);
}

void ConditionVariable::wait(Mutex* mutex)
{
  MutexLock lock(&mutex_);
  MutexUnlock unlock(mutex);
  HandlePtr w(get_event());

  // copy broadcast_  before releasing mutex_.
  BroadCast b(broadcast_ptr_.broadcast);
  waiters_.push_back(std::make_pair(w.handle, b));
  HANDLE wait_for[] = { w.handle, b.first };

  lock.reset();

  DWORD status = WaitForMultipleObjects(2, wait_for, false, INFINITE);
  if (status == WAIT_FAILED) throw_error(GetLastError());

  lock.reset(&mutex_);

  if (status == WAIT_OBJECT_0) ResetEvent(w.handle);
  else if (status == WAIT_OBJECT_0 + 1 && --(*b.second) == 0)
    free_broads_.push_back(b);
  free_events_.push_back(w.handle);
  w.release();   // free_events_ owns the handle.
  lock.reset();   // first unlock mutex_ and then lock mutex;
    // otherwise deadlock may occur between
    // this thread and a new thread that
    // already holds 'mutex'.
}

void ConditionVariable::signal()
{
  MutexLock lock(&mutex_);
  if (!waiters_.empty()) {
    SetEvent(waiters_.front());
    waiters_.pop_front();
  }
}

void ConditionVariable::broadcast()
{
  MutexLock lock(&mutex_);
  // no waiter can be released or no new waiter can join
  // until mutex_ is released.
  if (!waiters_.empty()) {
    Braodcast b(broadcast_ptr_.broadcast);
    ReleaseSemaphore(b.first, *(b.second) = waiters_.size(), 0);
    waiters_.clear();
    // now change broadcast_ptr_ so that no waiter can get it twice and
    // no new waiter can steal it.
    broadcast_ptr_.broadcast = get_broadcast();
  }
}






--
*******************************************************************************
Bear Stearns is not responsible for any recommendation, solicitation, offer or
agreement or any information about any transaction, customer account or account
activity contained in this communication.
*******************************************************************************