[omniORB] push_events filterable_data, header and remainder_of_body

Anna Patil patilanna at hotmail.com
Tue Jan 4 19:12:30 UTC 2022


Hi All,

tried push_event scripts with the help of you old posts and mail mailing list. I am stuck at defining filterable_data, header and remainder_of_body. How to proceed further, please suggest. Below is the my code.



class NmsSession_I(nmsSession__POA.NmsSession_I):
    pass

class StructuredPushConsumerImpl(CosNotifyComm__POA.StructuredPushConsumer):
  def __init__(self, orb):
        self.orb = orb

class psconsumer(CosEventComm__POA.PushConsumer):
      def __init__(self):
          pass

def main():
  argv = ["-ORBInitRef", "NameService=corbaloc:iiop:10.10.118.3:12001/NameService"]
#
  # Start orb.
  global orb
  orb=CORBA.ORB_init(argv, CORBA.ORB_ID)

  # Process Options
  verbose=0
  needNameService=0
  channelName="EventChannel"
  factoryName="EventChannelFactory"
  criteria=[]

  action="start"
  try:

    #
    # Get Name Service root context.
       rootContext=None
       try:
           action="resolve initial reference 'NameService'"
           obj=orb.resolve_initial_references("NameService")
           rootContext=obj._narrow(CosNaming.NamingContext)
           if rootContext is None:
               raise CORBA.OBJECT_NOT_EXIST(0,CORBA.COMPLETED_NO)
       except CORBA.Exception as ex:
           if needNameService:
               raise ex
           else:
               sys.stderr.write("Warning - failed to %s\n"%action)
    ### Initilizing POA ###

       name = [CosNaming.NameComponent("TMF_MTNM","Class"),
                CosNaming.NameComponent("HUAWEI","Vendor"),
                CosNaming.NameComponent("Huawei/NCE","EmsInstance"),
                CosNaming.NameComponent("2.0","Version"),
                CosNaming.NameComponent("Huawei/NCE","EmsSessionFactory_I")]


       try:
             obj = rootContext.resolve(name)
       except CosNaming.NamingContext.NotFound as ex:
             print("Except->Name not found")
             print(ex)
       try:
           poa = orb.resolve_initial_references("RootPOA")
       except Exception as ex:
           print('POA initial References Error: {}'.format(ex))

       try:
           poaManager = poa._get_the_POAManager()
       except Exception as ex:
           print('POA get manager error : {}'.format(ex))
       try:
           poaManager.activate()
       except Exception as ex:
           print('POA manager Activation Error: {}'.format(ex))

        ### Narrow the object
       ems_session = obj._narrow(emsSessionFactory.EmsSessionFactory_I)
       if ems_session is None:
           print("Object reference is not an EmsSessionFactory_I")
       ### Get NMS session
       nms_session_i = NmsSession_I()
       nms_session_o = nms_session_i._this()
       if nms_session_o is None:
           print("Object reference is not an NmsSession_I")
           sys.exit(1)

       ##Get ems Session ###
       try:
             session = ems_session.getEmsSession("user1", "passwd", nms_session_o)

        #print(dir(session))
       except Exception as ex:
              print('System Error: {}'.format(ex))
              sys.exit(1)
       #print(dir(session))
       try:
              eventChannel = session.getEventChannel()
       except globaldefs.ProcessingFailureException as ex:
              print('System Error: {}'.format(ex))
              eventChannel.destroy()
              session.endSession()
              sys.exit(1)
       #
       #impl = StructuredPushConsumerImpl(orb)
       #obj = impl._this()
       eventTypeAdd = [ CosNotification.EventType("tmf_mtnm", "NT_ALARM") ]
       eventTypeRemove = [ CosNotification.EventType("tmf_mtnm", "NT_HEARTBEAT") ]
       try:
          consumerAdmin, consID = eventChannel.new_for_consumers(CosNotifyChannelAdmin.AND_OP)
          #print(consumerAdmin[0])
          #print(dir(InterFilterGroupOperator))
       except Exception as e:
           print(e)
       try:
           consumerAdmin.subscription_change(eventTypeAdd, eventTypeRemove)
       except Exception as e:
           print(e)
       try:
           consumerAdmin.remove_all_filters()
       except Exception as e:
           print(e)
       subFilter="1==1"
       ffp = eventChannel._get_default_filter_factory()

       exp = [CosNotifyFilter.ConstraintExp ([ CosNotification.EventType("tmf_mtnm", "NT_ALARM") ] , subFilter)]

       exp1 = [CosNotifyFilter.ConstraintExp ([ CosNotification.EventType("tmf_mtnm", "NT_HEARTBEAT") ] , subFilter)]
       try:
           alarmFilter = eventChannel._get_default_filter_factory()
           channel_filter = alarmFilter.create_filter("TCL")
           channel_filter.add_constraints(exp)
           consumerAdmin.add_filter(channel_filter)
       except Exception as e:
           print(e)

       impl = StructuredPushConsumerImpl(orb)
       obj = impl._this()
       consumer_admin = eventChannel._get_default_consumer_admin()
       supplier_admin = eventChannel._get_default_supplier_admin()
       (proxy_consumer, proxy_id) = supplier_admin.obtain_notification_push_consumer(CosNotifyChannelAdmin.STRUCTURED_EVENT)
       proxy_consumer = proxy_consumer._narrow(CosNotifyChannelAdmin.StructuredProxyPushConsumer)
       proxy_consumer.connect_structured_push_supplier(CosNotifyComm__POA.StructuredPushSupplier()._this())

       if proxy_consumer != None:
           #     Build the filterable data array.

           AlarmSerialNo = 0
           nativeEMSName = ''
           nativeProbableCause = ''
           SlaveShelf = ''
           whatsay = ''
#
           filterable_data = [
                               CosNotification.Property ( "AlarmSerialNo",CORBA.Any(CORBA._tc_ulong,  AlarmSerialNo)),
                               CosNotification.Property ( "nativeEMSName",CORBA.Any(CORBA._tc_string, nativeEMSName)),
                               CosNotification.Property ( "nativeProbableCause",CORBA.Any(CORBA._tc_string, nativeProbableCause)),
                               CosNotification.Property ( "SlaveShelf",CORBA.Any(CORBA._tc_string,  SlaveShelf))
                             ]

           while True:
               try:

                   event = CosNotification.StructuredEvent (header, filterable_data, remainder_of_body)
                   events = proxy_consumer.push_structured_event(event)
                   print(events)
               except Exception as e:
                   print(e)




  except CORBA.Exception as ex:
       sys.stderr.write("CORBA exception, unable to %s.\n"%action)




################################################################################
# If this file is executed directly, then we start here.
if(__name__=="__main__"):
  main()

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://www.omniorb-support.com/pipermail/omniorb-list/attachments/20220104/56475af1/attachment.html>


More information about the omniORB-list mailing list