[omniORB] Notification Service with omniorbPy

Mark Zimmerman markzimm at frii.com
Wed Jan 25 11:54:21 GMT 2006


On Wed, Jan 25, 2006 at 06:39:41PM +0100, Niko S?nderhauf wrote:
> Hello,
> 
> I try to use the Notification Service with omniorbPy. I was looking for
> some examples, but all I found was how to use the Event Service. Could
> somebody please post a short example or a link?
> 
> So here is my scenario:
> We already have a naming-service and some structured-push-suppliers
> running. (They use ACE/TAO as ORB, but that shouldn't matter. However, I
> want my clients written in Python, so I have to use omniorbPy, right?)
> Now I want to register something (say class myConsumer) to all the
> events with eventname "TestEvent". As far as I saw from some C++
> examples, myConsumer has to be derived from  structuredPushConsumer and
> has a method called "push" or something, which is called every time an
> event with name "TestEvent" is broadcasted by the Notification Service.
> 
> I have already written a few functions to get references to all the
> objects in the naming service. So I can call
> 
> ec=getInterface("EventChannel",CosNotifyChannelAdmin.EventChannel)
> 

I have attached a short example that might get you started.

-- Mark

-------------- next part --------------
#!/usr/bin/env python

from omniORB import CORBA
import CosNaming
import CosNotification
import CosNotifyComm
import CosNotifyComm__POA
import CosNotifyChannelAdmin
import CosNotifyFilter
import time
import sys
import signal

class NotChanListen (CosNotifyComm__POA.StructuredPushConsumer):

#-----------------------------------------------------------------------------
#
   def __init__ (self, orb, evName, subFilter="1==1"):         # Constructor

#.............................................................................
#     Resolve the naming service.
#
      ns = orb.resolve_initial_references("NameService")
      rootCon = ns._narrow(CosNaming.NamingContext)

#.............................................................................
#     Resolve the event channel.
#
      name = [CosNaming.NameComponent("EventChannel","EventChannel")]
      echannel_ref = rootCon.resolve(name)
      channel = echannel_ref._narrow(CosNotifyChannelAdmin.EventChannel)

#.............................................................................
#     Resolve the Consumer Admin.
#
      self.cadmin, consID = channel.new_for_consumers (
                                          CosNotifyChannelAdmin.AND_OP)

#.............................................................................
#     Create the ProxyPushSupplier.
#
      psupp, prxID = self.cadmin.obtain_notification_push_supplier (
                                    CosNotifyChannelAdmin.STRUCTURED_EVENT)
      self.psupp = psupp._narrow(
                      CosNotifyChannelAdmin.StructuredProxyPushSupplier)

#.............................................................................
#     Create a default filter and bind it to the supplier.
#
      ffp = channel._get_default_filter_factory()
      self.filter = ffp.create_filter("EXTENDED_TCL")

      exp = [ \
         CosNotifyFilter.ConstraintExp (
            [ CosNotification.EventType("SPW", evName) ]
            , subFilter
         )
      ]
      self.cis1 = self.filter.add_constraints (exp)

#.............................................................................
#     Register the push consumer.
#
      id1 = self.cadmin.add_filter(self.filter)
      self.psupp.connect_structured_push_consumer(self._this())

   def disconnect (self):
      self.cadmin.remove_all_filters()
      self.filter.destroy()
      self.psupp.disconnect_structured_push_supplier()
      self.cadmin.destroy()

#-----------------------------------------------------------------------------
#
   def handleEvent (self, event):
      print 'You forgot to override handleEvent.'

   def push_structured_event ( self, event ):
      self.handleEvent (event)

   def disconnect_structured_push_consumer (self):
      pass

   def offer_change (self, added, removed):
      pass

#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
class LogListen (NotChanListen):

#-----------------------------------------------------------------------------
#
   def __init__ (self, orb):

      NotChanListen.__init__ (self, orb, 'Log')

#-----------------------------------------------------------------------------
#
   def handleEvent (self, event):

      evdata = {'Severity':9, 'HostName':'unknown', 'UserName':'unknown',
                'PID':0, 'PPID':0, 'ProgramName':'unknown', 'Time':-1}

      for field in event.filterable_data:
         evdata[field.name] = field.value.value()

      message = event.remainder_of_body.value()

      if evdata['Time'] > 0:
         when = time.gmtime (evdata['Time'])
         tstr = time.strftime ("%d-%h-%Y:%T", when)
      else:
         tstr = 'unknown'

      print '%1d %20s %8s %8s %5d %5d %10s %s' % \
            (evdata['Severity'], tstr, evdata['UserName'], \
             evdata['HostName'], evdata['PID'], evdata['PPID'], \
             evdata['ProgramName'], message)

#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
def handler(signum, frame):
   listener.disconnect()
   orb.shutdown(1)

#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
poa = orb.resolve_initial_references("RootPOA")

listener = LogListen(orb)

poaManager = poa._get_the_POAManager()
poaManager.activate()

signal.signal(signal.SIGINT, handler)

orb.run()



More information about the omniORB-list mailing list