[omniORB] Using omniNotify with omniOrbPy

Robert E. Gruber gruber@research.att.com
Wed, 13 Jun 2001 19:57:34 -0400


I have written a new test suite for omniNotify in python using omniORBpy.  I
am quite happy with how much I was able to do in relatively little time.

I will probably release it at some point (separtately from omniNotify since
it should be able to test any Notification Service).

In answer to your specific questions:

STUBS:

I run the following (mkcosstubs.py) in a Stubs dir, and put
the Stubs dir on my PYTHONPATH.  Then I can say, e.g.,
    import CosNotification
in python
----------------------------------------------------------------------------
------
#!/usr/bin/env python

import os

idlfiles = ["TimeBase", "CosTime", "CosEventComm", "CosEventChannelAdmin",
"CosTypedEventComm", "CosTypedEventChannelAdmin", "CosTimerEvent",
"CosNotification", "CosNotifyComm", "CosNotifyFilter",
"CosNotifyChannelAdmin", "CosTypedNotifyComm", "CosTypedNotifyChannelAdmin",
"AttNotifyChannelAdmin"]

# d is the top-level idl directory from an omniORB release
d = "/home/alcfp/project/omni/omni3rel/idl"

for x in idlfiles:
    y = x + ".idl"
    print x
    cmd = "omniidl -bpython -I%s -I%s/COS -DNOLONGLONG %s/COS/%s" % (d, d,
d, y)
    print cmd
    os.system(cmd)

----------------------------------------------------------------------------
----

The following helper file, ChanUtils.py, has a function that initializes
the orb and poa, another that looks up the default factory in the
naming service, 3 flavors of functions that get event channels from
the factory, and a cleanup function.

------------------------------------------------------------------------

######################################################################
# Module ChanUtils:

import sys
import omniORB
from omniORB import CORBA, PortableServer
import CosNaming,CosNotification,CosNotifyChannelAdmin
from Utils  import PropUtils

##############################################################
def setup_orb_and_poa(verbose):
    # Initialise the ORB
    omniORB.maxTcpConnectionPerServer(1000)
    orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)

    # Find the root POA
    poa = orb.resolve_initial_references("RootPOA")

    # Activate the POA
    poaManager = poa._get_the_POAManager()
    poaManager.activate()

    return orb, poa

##############################################################
def get_factory(orb, verbose):
    # Obtain a reference to the root naming context
    obj         = orb.resolve_initial_references("NameService")
    rootContext = obj._narrow(CosNaming.NamingContext)

    if rootContext is None:
        print "ChanUtils.get_factory: Failed to narrow the root naming
context"
        return None # error

    if (verbose > 0):
	print 'ChanUtils.get_factory: Resolve the name
"ChannelFactory.ChannelFactory"'
    name = [CosNaming.NameComponent("ChannelFactory", "ChannelFactory")]

    try:
        obj = rootContext.resolve(name)

    except CosNaming.NamingContext.NotFound, ex:
        print "ChanUtils.get_factory: Name not found"
        return None # error

    # Narrow the object to EventChannelFactory
    fact = obj._narrow(CosNotifyChannelAdmin.EventChannelFactory)

    if (fact is None):
        print "ChanUtils.factory: Object reference is not a
CosNotifyChannelAdmin.EventChannelFactory"
        return None # error

    return fact

##############################################################
def get_channel_qos(fact, nqos, aqos, verbose):
    "Get a new channel"
    chan = None
    try:
        chan, chan_id = fact.create_channel(nqos, aqos)
    except CosNotification.UnsupportedAdmin, eseqlist:
	if (verbose > 0):
	    print "get_channel_qos: UnsupportedAdmin exception"
	    for eseq in eseqlist:
		PropUtils.print_propseq(eseq)
        return None # error
    except CosNotification.UnsupportedQoS, eseqlist:
	if (verbose > 0):
	    print "get_channel_qos: UnsupportedQoS exception"
	    for eseq in eseqlist:
		PropUtils.print_propseq(eseq)
        return None # error
    except (CORBA.COMM_FAILURE):
	print "get_channel_qos: Caught COMM_FAILURE"
	return None # error
#    except:
#        print "ChanUtils.get_channel: create_channel failed"
#        return None # error

    if (chan is None):
        print "ChanUtils.get_channel: Could not obtain channel from factory"
        return None # error

    if (verbose > 0):
	print "ChanUtils.get_channel: New channel has chan_id = " , chan_id

    return chan

##############################################################
def get_channel(fact, verbose):
    "Get a new channel"
    nqos =  []
    aqos =  []
    return get_channel_qos(fact, nqos, aqos, verbose)

##############################################################
def get_channel_id(fact, id, verbose):

    # Get channel with channel ident id
    try:
        chan = fact.get_event_channel(id)
    except (CORBA.COMM_FAILURE):
	print "get_channel_id: Caught COMM_FAILURE"
	return None # error
    except:
        print "ChanUtils.get_channel_id:  get_event_channel(", id, "):
failed"
        return None # error

    if (chan is None):
        print "ChanUtils.get_channel_id: get_event_channel(", id, "):
failed"
        return None # error

    if (verbose > 0):
	print "ChanUtils.get_channel_id: returning channel with id", id

    return chan

##############################################################
def destroy_channel(chan, verbose):
    # Destroy the channel
    try:
	if (verbose > 0):
	    print "ChanUtils.destroy_channel: calling chan.destroy()"
        chan.destroy()
    except:
	if (verbose > 0):
	    print "ChanUtils.destroy_channel: chan.destroy failed"
	return 1
    if (verbose > 0):
	print "ChanUtils.destroy_channel: Destroyed the channel"
    return 0
----------------------------------------------------------------------------


-----Original Message-----
From: owner-omniorb-list@uk.research.att.com
[mailto:owner-omniorb-list@uk.research.att.com]On Behalf Of Mark
Zimmerman
Sent: Wednesday, June 13, 2001 2:54 PM
To: omniorb-list@uk.research.att.com
Subject: [omniORB] Using omniNotify with omniOrbPy


Greetings:

I am looking for some hints on how to post notification events from a
python script using omniOrbPy. There seems to be nothing built-in to
support this. Am I on uncharted territory here?

I have started by running omniidl -bpython on files in $TOP/idl/COS
that seem to have some relevance. That works fine.

I am currently trying to go through some working C++ code and do the
same things in python. I am stuck trying to convert

channel = CosNA_EventChannel::_narrow(echannel_ref);

to python. It seems that it should be

channel = echannel_ref._narrow(SomethingOrOther.CosNA_EventChannel)

but I have grepped through all of the idl I can find looking for who
owns CosNA_EventChannel so SomethingOrOther is still unknown.

-- Mark