[omniORB] Proper way for orderly server process shutdown?

Sai-Lai Lo S.Lo@uk.research.att.com
09 Jul 1999 21:42:05 +0100


Alright, I removed the non-deterministic behaviour and now all server
threads is guaranteed to have exited before BOA::destroy() returns.

Could you try to see if this help?

I'm seeing no core dump with glibc-2.0 and glibc-2.1 except if I turned up
the orb trace level on a dual processor glibc-2.0 machine. As I said
earlier, the program core dump inside the pthread library. Looking at it
under gdb, it core dump when one thread (server thread) is doing stdio
write to stderr and the main thread is doing cerr write. Looks like a
library issue.

Here is the patch against omniORB 2.8.0 prerelease 1.

*** 2.8.0pre1/src/lib/omniORB2/orbcore/tcpSocket.h	Thu Mar 11 16:31:31 1999
--- 2.8.0new/src/lib/omniORB2/orbcore/tcpSocket.h	Fri Jul  9 19:10:58 1999
***************
*** 125,133 ****
--- 125,139 ----
  			      IOP::TaggedProfileList& profilelist) const;
  
    friend class nobody;
+   friend class tcpSocketRendezvouser;
+   friend class tcpSocketWorker;
  
  private:
    enum { IDLE, ACTIVE, ZOMBIE } pd_state;
+ 
+   omni_mutex                    pd_shutdown_lock;
+   omni_condition                pd_shutdown_cond;
+   int                           pd_shutdown_nthreads;
  
    ~tcpSocketMTincomingFactory() {} // Cannot delete a factory instance
  
*** 2.8.0pre1/src/lib/omniORB2/orbcore/tcpSocketMTfactory.cc	Mon Jun 28 18:38:01 1999
--- 2.8.0new/src/lib/omniORB2/orbcore/tcpSocketMTfactory.cc	Fri Jul  9 21:16:08 1999
***************
*** 186,220 ****
  
  class tcpSocketRendezvouser : public omni_thread {
  public:
!   tcpSocketRendezvouser(tcpSocketIncomingRope *r) : omni_thread(r) {
!     start_undetached();
!   }
    virtual ~tcpSocketRendezvouser() { }
    virtual void* run_undetached(void *arg);
  
  private:
    tcpSocketRendezvouser();
  };
  
  class tcpSocketWorker : public omni_thread {
  public:
!   tcpSocketWorker(tcpSocketStrand* s) : omni_thread(s), pd_sync(s,0,0) {
!     s->decrRefCount();
!     start();
    }
-   virtual ~tcpSocketWorker() { }
    virtual void run(void *arg);
    static void _realRun(void* arg);
  
  private:
    Strand::Sync    pd_sync;
  };
  
  /////////////////////////////////////////////////////////////////////////////
  
! tcpSocketMTincomingFactory::tcpSocketMTincomingFactory() : pd_state(IDLE)
  {
    tcpSocketFactoryType::init();
  }
   
  CORBA::Boolean
--- 186,240 ----
  
  class tcpSocketRendezvouser : public omni_thread {
  public:
!   tcpSocketRendezvouser(tcpSocketIncomingRope *r,
! 			tcpSocketMTincomingFactory *f) : 
!         omni_thread(r), pd_factory(f)
!     {
!       start_undetached();
!     }
    virtual ~tcpSocketRendezvouser() { }
    virtual void* run_undetached(void *arg);
  
  private:
+   tcpSocketMTincomingFactory* pd_factory;
    tcpSocketRendezvouser();
  };
  
  class tcpSocketWorker : public omni_thread {
  public:
!   tcpSocketWorker(tcpSocketStrand* s, tcpSocketMTincomingFactory* f) : 
!           omni_thread(s), pd_factory(f), pd_sync(s,0,0) 
!     {
!       s->decrRefCount();
!       start();
!     }
!   virtual ~tcpSocketWorker() { 
!     omni_mutex_lock sync(pd_factory->pd_shutdown_lock);
!     assert(pd_factory->pd_shutdown_nthreads != 0);
!     if (pd_factory->pd_shutdown_nthreads > 0) {
!       pd_factory->pd_shutdown_nthreads--;
!     }
!     else {
!       pd_factory->pd_shutdown_nthreads++;
!       pd_factory->pd_shutdown_cond.signal();
!     }
    }
    virtual void run(void *arg);
    static void _realRun(void* arg);
  
  private:
+   tcpSocketMTincomingFactory* pd_factory;
    Strand::Sync    pd_sync;
  };
  
  /////////////////////////////////////////////////////////////////////////////
  
! tcpSocketMTincomingFactory::tcpSocketMTincomingFactory() 
!       : pd_state(IDLE), pd_shutdown_cond(&pd_shutdown_lock),
!         pd_shutdown_nthreads(0)
  {
    tcpSocketFactoryType::init();
+   
  }
   
  CORBA::Boolean
***************
*** 250,256 ****
    r->incrRefCount(1);
  
    if (pd_state == ACTIVE) {
!     r->rendezvouser = new tcpSocketRendezvouser(r);
    }
  }
  
--- 270,276 ----
    r->incrRefCount(1);
  
    if (pd_state == ACTIVE) {
!     r->rendezvouser = new tcpSocketRendezvouser(r,this);
    }
  }
  
***************
*** 267,273 ****
        while ((r = (tcpSocketIncomingRope*)next_rope())) {
  	if (r->pd_shutdown == tcpSocketIncomingRope::NO_THREAD) {
  	  r->pd_shutdown = tcpSocketIncomingRope::ACTIVE;
! 	  r->rendezvouser = new tcpSocketRendezvouser(r);
  	}
        }
      }
--- 287,293 ----
        while ((r = (tcpSocketIncomingRope*)next_rope())) {
  	if (r->pd_shutdown == tcpSocketIncomingRope::NO_THREAD) {
  	  r->pd_shutdown = tcpSocketIncomingRope::ACTIVE;
! 	  r->rendezvouser = new tcpSocketRendezvouser(r,this);
  	}
        }
      }
***************
*** 302,316 ****
  void
  tcpSocketMTincomingFactory::removeIncoming()
  {
!   Rope_iterator next_rope(&pd_anchor);
!   tcpSocketIncomingRope* r;
  
-   switch (pd_state) {
-   case ACTIVE:
-   case IDLE:
      try {
        while ((r = (tcpSocketIncomingRope*)next_rope())) {
- 	r->cancelThreads();
  	if (r->pd_shutdown != tcpSocketIncomingRope::NO_THREAD) {
  	  // rendezvouser has not been shutdown properly
  	  continue;
--- 322,372 ----
  void
  tcpSocketMTincomingFactory::removeIncoming()
  {
!   {
!     Rope_iterator next_rope(&pd_anchor);
!     tcpSocketIncomingRope* r;
! 
!     switch (pd_state) {
!     case ACTIVE:
!       try {
! 	while ((r = (tcpSocketIncomingRope*)next_rope())) {
! 	  if (pd_state == ACTIVE) {
! 	    r->cancelThreads();
! 	  }
! 	  if (r->pd_shutdown != tcpSocketIncomingRope::NO_THREAD) {
! 	    // rendezvouser has not been shutdown properly
! 	    continue;
! 	  }
! 	}
!       }
!       catch(...) {}
!     case IDLE:
!       pd_state = ZOMBIE;
!       break;
!     default:
!       return;
!     }
!   }
!   omni_thread::yield();
!   {
!     omni_mutex_lock sync(pd_shutdown_lock);
!     while (pd_shutdown_nthreads != 0) {
!       if (pd_shutdown_nthreads > 0) {
! 	pd_shutdown_nthreads = -pd_shutdown_nthreads;
!       }
!       if (omniORB::traceLevel >= 20) {
! 	omniORB::log << "tcpSocketMTincomingFactory::removeIncoming: blocks waiting for worker threads to exit\n";
! 	omniORB::log.flush();
!       }
!       pd_shutdown_cond.wait();
!     }
!   }
!   {
!     Rope_iterator next_rope(&pd_anchor);
!     tcpSocketIncomingRope* r;
  
      try {
        while ((r = (tcpSocketIncomingRope*)next_rope())) {
  	if (r->pd_shutdown != tcpSocketIncomingRope::NO_THREAD) {
  	  // rendezvouser has not been shutdown properly
  	  continue;
***************
*** 321,332 ****
  	  r->decrRefCount(1);
  	}
        }
-       pd_state = ZOMBIE;
      }
      catch(...) {}
!     break;
!   default:
!     break;
    }
  }
  
--- 377,388 ----
  	  r->decrRefCount(1);
  	}
        }
      }
      catch(...) {}
!   }
!   if (omniORB::traceLevel >= 20) {
!     omniORB::log << "tcpSocketMTincomingFactory::removeIncoming: done\n";
!     omniORB::log.flush();
    }
  }
  
***************
*** 514,519 ****
--- 570,576 ----
        pd_shutdown = SHUTDOWN;
        pd_lock.unlock();
    }
+ 
    CutStrands();
    
    if (rendezvouser) {
***************
*** 1099,1106 ****
  	omniORB::log.flush();
        }
  
        try {
! 	newthr = new tcpSocketWorker(newSt);
        }
        catch(...) {
  	newthr = 0;
--- 1156,1171 ----
  	omniORB::log.flush();
        }
  
+       omni_mutex_lock sync(pd_factory->pd_shutdown_lock);
+       if (pd_factory->pd_shutdown_nthreads >= 0) {
+ 	pd_factory->pd_shutdown_nthreads++;
+       }
+       else {
+ 	pd_factory->pd_shutdown_nthreads--;
+       }
+ 
        try {
! 	newthr = new tcpSocketWorker(newSt,pd_factory);
        }
        catch(...) {
  	newthr = 0;
***************
*** 1113,1118 ****
--- 1178,1194 ----
  	// threads to strands; etc.
  	newSt->decrRefCount();
  	newSt->shutdown();
+ 
+ 	omni_mutex_lock sync(pd_factory->pd_shutdown_lock);
+ 	assert(pd_factory->pd_shutdown_nthreads != 0);
+ 	if (pd_factory->pd_shutdown_nthreads > 0) {
+ 	  pd_factory->pd_shutdown_nthreads--;
+ 	}
+ 	else {
+ 	  pd_factory->pd_shutdown_nthreads++;
+ 	  pd_factory->pd_shutdown_cond.signal();
+ 	}
+ 
        }
      }
      catch(const CORBA::COMM_FAILURE &) {




-- 
Sai-Lai Lo                                   S.Lo@uk.research.att.com
AT&T Laboratories Cambridge           WWW:   http://www.uk.research.att.com 
24a Trumpington Street                Tel:   +44 1223 343000
Cambridge CB2 1QA                     Fax:   +44 1223 313542
ENGLAND