[omniORB] CosNotifyChannelAdmin ClientType in Python

Gary Duzan gduzan@bbn.com
Tue May 6 18:19:01 2003


------- =_aaaaaaaaaa0
Content-Type: text/plain; charset="us-ascii"
Content-ID: <11836.1052241491.1@bbn.com>

In Message <3EB7D29A.2090102@lmco.com> ,
   Joe Bronkema <joseph.d.bronkema@lmco.com> wrote:

=>I'm trying to use the Notification service from Python, and ran into a 
=>little snag.  It could be a misunderstanding on my part of how CORBA 
=>enums are supposed to work in Python, or maybe a bug.  I'm sure someone 
=>on the list can tell me which.
=>
=>I have successfully created a channel and obtained the default supplier 
=>admin.  My next step is to get a proxy consumer, which I try to do as 
=>follows:
=>
=>(proxyConsumer, proxyId) =  SupplierAdmin.obtain_notification_push_consumer(
=>    CosNotifyChannelAdmin.ClientType.STRUCTURED_EVENT)
=>
=>This produces an error:
=>AttributeError: Enum instance has no attribute 'STRUCTURED_EVENT'.

   You want CosNotifyChannelAdmin.STRUCTURED_EVENT. Enum constants
are defined in the enclosing scope, not in their own.

=>While I'm at it, does anyone have some Python examples for using the 
=>notification service?

   I don't know if they'll be of any use, but I'll attach some scripts
I've used in the past. They were used with TAO's notification service,
and have some code bits to deal with its quirks, but they should be
generic, for the most part.

					Gary Duzan
					BBN Technologies
					A Verizon Company



------- =_aaaaaaaaaa0
Content-Type: text/plain; name="attach_consumer.py"; charset="us-ascii"
Content-ID: <11836.1052241491.2@bbn.com>
Content-Description: attach_consumer.py

#!/usr/bin/env python

###########################################################################

import sys

# Import the CORBA module
from omniORB import CORBA

# Import CORBA stubs
import CosNaming
import CosNotification, CosNotifyChannelAdmin, CosNotifyComm

###########################################################################

def attach_consumer(channel, consumer):
  consumer_admin = channel._get_default_consumer_admin()
  (proxy_supplier, proxy_id) = consumer_admin.obtain_notification_push_supplier(CosNotifyChannelAdmin.STRUCTURED_EVENT)
  proxy_supplier = proxy_supplier._narrow(CosNotifyChannelAdmin.StructuredProxyPushSupplier)
  proxy_supplier.connect_structured_push_consumer(consumer)
  consumer_admin.subscription_change([CosNotification.EventType("*", "*")], [])

  return 0

###########################################################################

def usage(argv):
  print ""
  print "Usage: %s --channel=<channel_ior> --consumer=<consumer_ior>" % argv[0]
  print ""

def main(argv):

  orb = CORBA.ORB_init(argv, CORBA.ORB_ID)

  from getopt import getopt

  (oplist, args) = getopt(argv[1:], '', [ 'channel=', 'consumer=' ])

  options = {}

  for opt in oplist:
    options[opt[0]] = opt[1]

  if not options.has_key('--channel') or not options.has_key('--consumer'):
    usage(0)
    return 1

  channel = orb.string_to_object(options['--channel'])
  channel = channel._narrow(CosNotifyChannelAdmin.EventChannel)
  consumer = orb.string_to_object(options['--consumer'])
  consumer = consumer._narrow(CosNotifyComm.StructuredPushConsumer)

  return attach_consumer(channel, consumer)


if __name__ == '__main__':
  sys.exit(main(sys.argv))

###########################################################################

------- =_aaaaaaaaaa0
Content-Type: text/plain; name="bind_proxy_consumer.py"; charset="us-ascii"
Content-ID: <11836.1052241491.3@bbn.com>
Content-Description: bind_proxy_consumer.py

#!/usr/bin/env python

###########################################################################

import sys

# Import the CORBA module
from omniORB import CORBA

# Import CORBA stubs
import CosNaming
import CosNotification, CosNotifyChannelAdmin
import CosNotifyComm__POA

###########################################################################

def bind_proxy_consumer(ns, channel, name):
  supplier_admin = channel._get_default_supplier_admin()
  (proxy_consumer, proxy_id) = supplier_admin.obtain_notification_push_consumer(CosNotifyChannelAdmin.STRUCTURED_EVENT)
  proxy_consumer = proxy_consumer._narrow(CosNotifyChannelAdmin.StructuredProxyPushConsumer)
  proxy_consumer.connect_structured_push_supplier(CosNotifyComm__POA.StructuredPushSupplier()._this())

  ns.rebind([CosNaming.NameComponent(name, '')], proxy_consumer)

  return 0

###########################################################################

def usage(argv):
  print ""
  print "Usage: %s --channel=<channel_ior> --consumer-name=<name>" % argv[0]
  print ""

def main(argv):

  orb = CORBA.ORB_init(argv, CORBA.ORB_ID)

  from getopt import getopt

  (oplist, args) = getopt(argv[1:], '', [ 'channel=', 'consumer-name=' ])

  options = {}

  for opt in oplist:
    options[opt[0]] = opt[1]

  if not options.has_key('--channel') or not options.has_key('--consumer-name'):
    usage(0)
    return 1

  ns = orb.resolve_initial_references('NameService')._narrow(CosNaming.NamingContext)
  channel = orb.string_to_object(options['--channel'])
  channel = channel._narrow(CosNotifyChannelAdmin.EventChannel)
  name = options['--consumer-name']

  return bind_proxy_consumer(ns, channel, name)


if __name__ == '__main__':
  sys.exit(main(sys.argv))

###########################################################################

------- =_aaaaaaaaaa0
Content-Type: text/plain; name="monitor_channel.py"; charset="us-ascii"
Content-ID: <11836.1052241491.4@bbn.com>
Content-Description: monitor_channel.py

#!/usr/bin/env python

###########################################################################

import sys

# Import the CORBA module
from omniORB import CORBA
import PortableServer

# Import CORBA stubs
import CosNaming
import CosNotification, CosNotifyChannelAdmin
import CosNotifyComm__POA

###########################################################################

class StructuredPushConsumerImpl(CosNotifyComm__POA.StructuredPushConsumer):
  def __init__(self, orb):
    self.orb = orb

  def disconnect_structured_push_consumer(self):
    self.orb.shutdown(0)
    pass

  def offer_change(self, added, removed):
    pass

  def push_structured_event(self, notification):
    etext = "Domain: %s|" % notification.header.fixed_header.event_type.domain_name
    etext = etext + "Type: %s|" % notification.header.fixed_header.event_type.type_name

    for nv in notification.filterable_data:
      etext = etext + "%s: %s|" % (nv.name, nv.value._v)
    
    print etext

    sys.stdout.flush()

###########################################################################

def monitor_channel(orb, channel):
  poa = orb.resolve_initial_references('RootPOA')
  poamgr = poa._get_the_POAManager()
  poamgr.activate()

  impl = StructuredPushConsumerImpl(orb)
  obj = impl._this()

  consumer_admin = channel._get_default_consumer_admin()
  (proxy_supplier, proxy_id) = consumer_admin.obtain_notification_push_supplier(CosNotifyChannelAdmin.STRUCTURED_EVENT)

  proxy_supplier = proxy_supplier._narrow(CosNotifyChannelAdmin.StructuredProxyPushSupplier)
  proxy_supplier.connect_structured_push_consumer(obj)

  consumer_admin.subscription_change([CosNotification.EventType("*", "*")], [])

  orb.run()

  return 0

###########################################################################

def usage(argv):
  print ""
  print "Usage: %s --channel=<channel_ior>" % argv[0]
  print ""

def main(argv):

  orb = CORBA.ORB_init(argv, CORBA.ORB_ID)

  from getopt import getopt

  (oplist, args) = getopt(argv[1:], '', [ 'channel=' ])

  options = {}

  for opt in oplist:
    options[opt[0]] = opt[1]

  if not options.has_key('--channel'):
    usage(argv)
    return 1

  channel = orb.string_to_object(options['--channel'])
  channel = channel._narrow(CosNotifyChannelAdmin.EventChannel)

  return monitor_channel(orb, channel)


if __name__ == '__main__':
  sys.exit(main(sys.argv))

###########################################################################

------- =_aaaaaaaaaa0
Content-Type: text/plain; name="tie_channels.py"; charset="us-ascii"
Content-ID: <11836.1052241491.5@bbn.com>
Content-Description: tie_channels.py

#!/usr/bin/env python

###########################################################################

import sys

# Import the CORBA module
from omniORB import CORBA

# Import CORBA stubs
import CosNaming
import CosNotification, CosNotifyChannelAdmin

###########################################################################

def tie_channels(supplier, consumer):
  supplier_admin = consumer._get_default_supplier_admin()
  (proxy_consumer, proxy_id) = supplier_admin.obtain_notification_push_consumer(CosNotifyChannelAdmin.STRUCTURED_EVENT)

  consumer_admin = supplier._get_default_consumer_admin()
  (proxy_supplier, proxy_id) = consumer_admin.obtain_notification_push_supplier(CosNotifyChannelAdmin.STRUCTURED_EVENT)

  proxy_consumer.connect_structured_push_supplier(proxy_supplier)
  proxy_supplier.connect_structured_push_consumer(proxy_consumer)

  consumer_admin.subscription_change([CosNotification.EventType("*", "*")], [])

  return 0

###########################################################################

def usage(argv):
  print ""
  print "Usage: %s --supplier=<supplier_ior> --consumer=<consumer_ior>" % argv[0]
  print ""

def main(argv):

  orb = CORBA.ORB_init(argv, CORBA.ORB_ID)

  from getopt import getopt

  (oplist, args) = getopt(argv[1:], '', [ 'supplier=', 'consumer=' ])

  options = {}

  for opt in oplist:
    options[opt[0]] = opt[1]

  if not options.has_key('--supplier') or not options.has_key('--consumer'):
    usage(0)
    return 1

  supplier = orb.string_to_object(options['--supplier'])
  consumer = orb.string_to_object(options['--consumer'])

  return tie_channels(supplier, consumer)


if __name__ == '__main__':
  sys.exit(main(sys.argv))

###########################################################################

------- =_aaaaaaaaaa0--