问题描述
我第一次尝试在python中实现omniORBpy Corba Notification服务。在google上搜索后,我得到了代码并对其进行了修改以供使用。我可以获取频道并可以订阅,但是每当我调用push_structured_event ... Scavenger都可以减少链的空闲计数错误并且不显示事件。我已启用traceLevel = 40,但无法得出结论。需要支持。
omniORB 4.3.0 omniORBpy 4.3.0 操作系统-Linux python3.6 供应商-ECI Light NMS
#--------------------------------------------------------------
# Code
#!/usr/bin/env python
from omniORB import CORBA
import CosNaming
import CosNotification
import CosnotifyComm
import CosnotifyComm__POA
import CosnotifyChannelAdmin
import CosnotifyFilter
import time
import sys
import signal
#-----------------------------------------------------------------------------
# Initialization class
class Initialization (CosnotifyComm__POA.StructuredPushConsumer):
#
def __init__ (self,orb,evName,subFilter="1==1"): # Constructor
#
rootContext=None
try:
obj=orb.resolve_initial_references("NameService")
rootContext=obj._narrow(CosNaming.NamingContext)
if rootContext is None:
raise CORBA.OBJECT_NOT_EXIST(0,CORBA.COMPLETED_NO)
except CORBA.Exception as ex:
if neednameService:
raise ex
else:
sys.stderr.write("Warning - Failed to %s\n"%action)
name = [CosNaming.NameComponent("TMF_MTNM","Class"),CosNaming.NameComponent("ECI","vendor"),CosNaming.NameComponent("ECI:LightSoft_1","EmsInstance"),CosNaming.NameComponent("3_5","Version"),"EmsSessionFactory_I")]
try:
obj = rootContext.resolve(name)
except CosNaming.NamingContext.NotFound as ex:
print("Except->Name not found")
print(ex)
try:
poa = orb.resolve_initial_references("RootPOA")
except Exception as ex:
print('POA initial References Error: {}'.format(ex))
try:
poaManager = poa._get_the_POAManager()
except Exception as ex:
print('POA get manager error : {}'.format(ex))
try:
poaManager.activate()
except Exception as ex:
print('POA manager Activation Error: {}'.format(ex))
### Narrow the object
ems_session = obj._narrow(emsSessionFactory.EmsSessionFactory_I)
if ems_session is None:
print("Object reference is not an EmsSessionFactory_I")
### Get NMS session
nms_session_i = NmsSession_I()
nms_session_o = nms_session_i._this()
if nms_session_o is None:
print("Object reference is not an NmsSession_I")
sys.exit(1)
##Get ems Session ###
try:
session = ems_session.getEmsSession("usr","password123",nms_session_o)
except Exception as ex:
print('System Error: {}'.format(ex))
sys.exit(1)
#print(dir(session))
try:
channel = session.getEventChannel()
except globaldefs.ProcessingFailureException as ex:
print('System Error: {}'.format(ex))
channel.destroy()
session.endSession()
sys.exit(1)
channel = echannel_ref._narrow(CosnotifyChannelAdmin.EventChannel)
#.............................................................................
# Resolve the Consumer Admin.
#
self.cadmin,consID = channel.new_for_consumers (
CosnotifyChannelAdmin.AND_OP)
#.............................................................................
# Create the ProxyPushsupplier.
#
psupp,prxID = self.cadmin.obtain_notification_push_supplier (
CosnotifyChannelAdmin.STRUCTURED_EVENT)
self.psupp = psupp._narrow(
CosnotifyChannelAdmin.StructuredProxyPushsupplier)
#.............................................................................
# Create a default filter and bind it to the supplier.
#
ffp = channel._get_default_filter_factory()
self.filter = ffp.create_filter("EXTENDED_TCL")
exp = [ \
CosnotifyFilter.ConstraintExp (
[ CosNotification.EventType("SPW",evName) ],subFilter
)
]
self.cis1 = self.filter.add_constraints (exp)
#.............................................................................
# Register the push consumer.
#
id1 = self.cadmin.add_filter(self.filter)
self.psupp.connect_structured_push_consumer(self._this())
def disconnect (self):
self.cadmin.remove_all_filters()
self.filter.destroy()
self.psupp.disconnect_structured_push_supplier()
self.cadmin.destroy()
#-----------------------------------------------------------------------------
# Functions
def handleEvent (self,event):
print 'You forgot to override handleEvent.'
def push_structured_event ( self,event ):
self.handleEvent (event)
def disconnect_structured_push_consumer (self):
pass
def offer_change (self,added,removed):
pass
#-----------------------------------------------------------------------------
#
class LogListen (NotChanListen):
def __init__ (self,orb):
Initilization.__init__ (self,'Log')
#-----------------------------------------------------------------------------
#
def handleEvent (self,event):
evdata = {"notificationId":'UnkNown',"objectName":globaldefs.NamingAttributes_T,"nativeEMSName":'UnkNown',"nativeProbableCause":'UnkNown',\
"objectType": notifications.ObjectType_T,"objectTypeQualifier":'UnkNown',"emsTime":globaldefs.Time_T,"neTime":globaldefs.Time_T,\
"isClearable":'UnkNown',"layerRate":transmissionParameters.LayerRate_T,"probableCause":'UnkNown',"probableCauseQualifier":'UnkNown',\
"perceivedSeverity":notifications.PerceivedSeverity_T,"serviceAffecting":notifications.ServiceAffecting_T,\
"affectedTPList":globaldefs.NamingAttributesList_T,"additionalText":'UnkNown'}
for field in event.filterable_data:
evdata[field.name] = field.value.value()
message = event.remainder_of_body.value()
print(evdata)
#---------------------------------------------------------------------------
# Handler
def handler(signum,frame):
listener.disconnect()
orb.shutdown(1)
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
argv = ["-ORBInitRef","NameService=corbaloc:iiop:1.2@NMS1-GJ-NMS:5075/NameService"]
orb=CORBA.ORB_init(argv,CORBA.ORB_ID)
poa = orb.resolve_initial_references("RootPOA")
listener = LogListen(orb)
poaManager = poa._get_the_POAManager()
poaManager.activate()
signal.signal(signal.SIGINT,handler)
orb.run()
解决方法
有两种获取eventChannel的方法
- 从ems_session获取它
- 通过解决evenChannel的NamingComponent问题来解决
看起来您正在混合两种类型
使用2类方法获取事件渠道:https://www.omniorb-support.com/pipermail/omniorb-list/2006-January/027404.html