This page describes how to perform test coverage analysis of omniORBpy servers in conjunction with the Python standard unit test framework (
http://pyunit.sourceforge.net/). Test coverage is used to analyze how much of a code base unit test suites are exercising, and this can be used to enhance the unit test suites. For an introduction to coverage analysis, see
http://www.bullseye.com/webCoverage.html. For a slightly contrarian perspective, see
http://www.c2.com/cgi/wiki?CodeCoverage.
We will use a slightly modified version of Neil Schemenauer's code_coverage.py, originally part of the Quixote web application server framework (
http://www.mems-exchange.org/software/python/quixote/), itself a modified version of Skip Montanaro's trace.py test coverage utility (
http://musi-cal.mojam.com/~skip/python/trace.py), itself derived from the standard Python profiler. Disclaimer: I have tested this code on Unix (Solaris 8/x86), but not on Windows.code_coverage.py works by setting a trace function using sys.settrace. This function is called each time a line of code is executed, which obviously slows down the process considerably, and should only be done when running tests and not on a production system. We use a special command-line argument -coverage followed by a file name to specify a file in which coverage results are accumulated. The idea is that multiple servers may invoke common code and that if full coverage analysis is performed, we have to aggregate coverage data from multiple servers and test runs before we can analyze the results.
One challenge for server processes like omniORBpy servers is that they run forever and there is no good way to specify when the coverage data that has been accumulated should be dumped to disk. It is possible to catch graceful shutdowns using the atexit module, but for servers that are killed, this won't work. To address this issue, we implement a special Corba instrumentation method that is used just to dump the coverage data.
Another challenge is that the trace function set by sys.settrace is a per-thread setting, and that it needs to be reset for each new worker thread launched by omniORBpy. We use the undocumented function omniORB.addWThreadHook() to do this.
The example we used is based on the standard echo_srv.py that is in the directory omni/src/lib/omniORBpy/examples/echo .
The following is a transcript of a test run. Note that the lines in echoString are preceded by 1:, which means they were executed once (by the first test case). Lines that haven't been executed (or were executed before coverage measurement started) are prefixed with chevrons >>>>>>. Empty lines or comments are prefixed with a dot.
bayazid ~/corba/python/echo_cover>rm coverage.db
bayazid ~/corba/python/echo_cover>./echo_unit_tests.py
Starting server...done
using IOR IOR:010000000d00000049444c3a4563686f3a312e300000000001000000000000002a
000000010100000e0000003139322e3136382e312e32333800768b0e000000fea536b13b00002c23
0000000000
...Terminated
----------------------------------------------------------------------
Ran 3 tests in 0.073s
OK
bayazid ~/corba/python/echo_cover>./report_coverage.py
*
bayazid ~/corba/python/echo_cover>cat summary.txt
code_coverage 2.3% (6/264)
echo_srv 17.9% (5/28)
bayazid ~/corba/python/echo_cover>cat echo_srv.cover
. #!/usr/bin/env python
.
>>>>>> import sys
>>>>>> from omniORB import CORBA, PortableServer, addWThreadHook
.
>>>>>> import code_coverage
.
. # Import the skeletons for the global module
>>>>>> import _GlobalIDL, _GlobalIDL__POA
.
. # Define an implementation of the Echo interface
>>>>>> class Echo_i (_GlobalIDL__POA.Echo):
>>>>>> def __init__(self):
>>>>>> if '-coverage' in sys.argv:
. # this object collects coverage statistics
>>>>>> self.cover = code_coverage.Coverage()
. # filename to which coverage statistics should be dumped
>>>>>> self.cover_filename = sys.argv[sys.argv.index('-coverage') +
1]
. # this is necessary to have the coverage trace function
. # re-set whenever a new worker thread is started by omniORBpy
>>>>>> addWThreadHook(self.coverage)
>>>>>> self.coverage()
>>>>>> def coverage(self, flag=0, wt=None):
>>>>>> if flag == 0: # WTHREAD_CREATED
>>>>>> sys.settrace(self.cover.trace)
1: def echoString(self, mesg):
1: print "echoString() called with message:", mesg
1: return mesg
1: def dump_coverage(self):
1: self.cover.accumulate(self.cover_filename)
.
. # Initialise the ORB
>>>>>> orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
.
. # Find the root POA
>>>>>> poa = orb.resolve_initial_references("RootPOA")
.
. # Create an instance of Echo_i
>>>>>> ei = Echo_i()
.
. # Create an object reference, and implicitly activate the object
>>>>>> eo = ei._this()
.
. # Print out the IOR
>>>>>> print orb.object_to_string(eo)
>>>>>> sys.stdout.flush()
.
. # Activate the POA
>>>>>> poaManager = poa._get_the_POAManager()
>>>>>> poaManager.activate()
.
. # Everything is running now, but if this thread drops out of the end
. # of the file, the process will exit. orb.run() just blocks until the
. # ORB is shut down
>>>>>> orb.run()
echo.idl
// echo.idl
interface Echo {
string echoString(in string mesg);
void dump_coverage();
};
echo_srv.py
/usr/bin/env python
import sys
from omniORB import CORBA, PortableServer, addWThreadHook
import code_coverage
# Import the skeletons for the global module
import _GlobalIDL, _GlobalIDL__POA
# Define an implementation of the Echo interface
class Echo_i (_GlobalIDL__POA.Echo):
def __init__(self):
if '-coverage' in sys.argv:
# this object collects coverage statistics
self.cover = code_coverage.Coverage()
# filename to which coverage statistics should be dumped
self.cover_filename = sys.argv[sys.argv.index('-coverage') + 1]
# this is necessary to have the coverage trace function
# re-set whenever a new worker thread is started by omniORBpy
addWThreadHook(self.coverage)
self.coverage()
def coverage(self, flag=0, wt=None):
if flag == 0: # WTHREAD_CREATED
sys.settrace(self.cover.trace)
def echoString(self, mesg):
print "echoString() called with message:", mesg
return mesg
def dump_coverage(self):
self.cover.accumulate(self.cover_filename)
# Initialise the ORB
orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
# Find the root POA
poa = orb.resolve_initial_references("RootPOA")
# Create an instance of Echo_i
ei = Echo_i()
# Create an object reference, and implicitly activate the object
eo = ei._this()
# Print out the IOR
print orb.object_to_string(eo)
sys.stdout.flush()
# Activate the POA
poaManager = poa._get_the_POAManager()
poaManager.activate()
# Everything is running now, but if this thread drops out of the end
# of the file, the process will exit. orb.run() just blocks until the
# ORB is shut down
orb.run()
echo_unit_tests.py
/usr/bin/env python
import sys, os, unittest
from omniORB import CORBA
import _GlobalIDL
# file to use for the coverage database
cover_filename = os.path.abspath('coverage.db')
# launch Corba server
sys.stderr.write('Starting server...')
f = os.popen('./echo_srv.py -coverage coverage.db', 'r', 1)
sys.stderr.write('done\n')
ior = f.readline().strip()
print 'using IOR', ior
# -----------------------------------------------------------------------------
#
# init the Test Cases
#
# -----------------------------------------------------------------------------
class EchoTestCase(unittest.TestCase):
def setUp(self):
self.orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
# open connections to Corba servers for debug purposes
self.obj = self.orb.string_to_object(ior)
assert(self.obj is not None)
self.echo = self.obj._narrow(_GlobalIDL.Echo)
def test1_echo_working(self):
'Check if the echo server is responding'
result = self.echo.echoString('hello, world')
assert result == 'hello, world'
def test2_dump_coverage(self):
'force dump of coverage statistics'
self.echo.dump_coverage()
def test3_kill(self):
'kill server'
# XXX this works on Solaris, but not all other OSes
os.system('pkill -f echo_srv')
def suite():
suite = unittest.makeSuite(EchoTestCase, 'test')
return suite
if __name__ == '__main__':
unittest.main()
code_coverage.py
"""code_coverage
Provides code coverage analysis for testing
"""
# Modified by Fazal Majid (fmajid@kefta.com) from Neil Schemenauer's
# Quixote code_coverage.py
# $Id: code_coverage.py,v 1.3 2001/09/22 00:27:14 majid Exp $
# ignore modules with a filename starting with the prefix
# this should be the prefix for your Python installation
prefix = '/usr/local'
# created 2000/07/06, Neil Schemenauer based on trace.py
__revision__ = "Id: code_coverage.py,v 1.1 2000/10/20 20:56:22 akuchlin Exp $"
# Example Usage:
#
# import code_coverage
# co = code_coverage.Coverage()
# co.run('main()')
# co.write_results()
#
# or
#
# $ python code_coverage.py "import mymodule; mymodule.main()"
# trace.py
# Copyright 2000, Mojam Media, Inc., all rights reserved.
# Author: Skip Montanaro
#
# Copyright 1999, Bioreason, Inc., all rights reserved.
# Author: Andrew Dalke
#
# Copyright 1995-1997, Automatrix, Inc., all rights reserved.
# Author: Skip Montanaro
#
# Copyright 1991-1995, Stichting Mathematisch Centrum, all rights reserved.
#
#
# Permission to use, copy, modify, and distribute this Python software and
# its associated documentation for any purpose without fee is hereby
# granted, provided that the above copyright notice appears in all copies,
# and that both that copyright notice and this permission notice appear in
# supporting documentation, and that the name of neither Automatrix,
# Bioreason or Mojam Media be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior permission.
import sys, os, string, cPickle, dis
# Given a code string, return the SET_LINENO information
def _find_LINENO_from_string(co_code):
"""return all of the SET_LINENO information from a code string"""
import dis
linenos = {}
# This code was filched from the `dis' module then modified
n = len(co_code)
i = 0
prev_op = None
prev_lineno = 0
while i < n:
c = co_code[i]
op = ord(c)
if op == dis.SET_LINENO:
if prev_op == op:
# two SET_LINENO in a row, so the previous didn't
# indicate anything. This occurs with triple
# quoted strings (?). Remove the old one.
del linenos[prev_lineno]
prev_lineno = ord(co_code[i+1]) + ord(co_code[i+2])*256
linenos[prev_lineno] = 1
if op >= dis.HAVE_ARGUMENT:
i = i + 3
else:
i = i + 1
prev_op = op
return linenos
def _find_LINENO(code):
"""return all of the SET_LINENO information from a code object"""
import types
# get all of the lineno information from the code of this scope level
linenos = _find_LINENO_from_string(code.co_code)
# and check the constants for references to other code objects
for c in code.co_consts:
if type(c) == types.CodeType:
# find another code object, so recurse into it
linenos.update(_find_LINENO(c))
return linenos
def find_executable_linenos(filename):
"""return a dict of the line numbers from executable statements in a file
Works by finding all of the code-like objects in the module then searching
the byte code for 'SET_LINENO' terms (so this won't work one -O files).
"""
import parser
prog = open(filename).read()
ast = parser.suite(prog)
code = parser.compileast(ast, filename)
# The only way I know to find line numbers is to look for the
# SET_LINENO instructions. Isn't there some way to get it from
# the AST?
return _find_LINENO(code)
class Ignore:
def __init__(self, modules = None, dirs = None):
self._mods = modules or []
self._dirs = dirs or []
self._ignore = { '<string>': 1 }
def names(self, filename, modulename):
if self._ignore.has_key(modulename):
return self._ignore[modulename]
# haven't seen this one before, so see if the module name is
# on the ignore list. Need to take some care since ignoring
# "cmp" musn't mean ignoring "cmpcache" but ignoring
# "Spam" must also mean ignoring "Spam.Eggs".
for mod in self._mods:
if mod == modulename: # Identical names, so ignore
self._ignore[modulename] = 1
return 1
# check if the module is a proper submodule of something on
# the ignore list
n = len(mod)
# (will not overflow since if the first n characters are the
# same and the name has not already occured, then the size
# of "name" is greater than that of "mod")
if mod == modulename[:n] and modulename[n] == '.':
self._ignore[modulename] = 1
return 1
# Now check that __file__ isn't in one of the directories
if filename is None:
# must be a built-in, so we must ignore
self._ignore[modulename] = 1
return 1
# Ignore a file when it contains one of the ignorable paths
for d in self._dirs:
# The '+ os.sep' is to ensure that d is a parent directory,
# as compared to cases like:
# d = "/usr/local"
# filename = "/usr/local.py"
# or
# d = "/usr/local.py"
# filename = "/usr/local.py"
if string.find(filename, d + os.sep) == 0:
self._ignore[modulename] = 1
return 1
# Tried the different ways, so we don't ignore this module
self._ignore[modulename] = 0
return 0
class Coverage:
def __init__(self, modules=None):
"""modules : list of modules for which to show coverage
default is all modules"""
self.modules = {} # module -> filename
self.counts = {} # (modulename, linenumber) -> count
if modules is None:
self.show_all = 1
else:
self.show_all = 0
for module in modules:
self.modules[module] = None
def trace(self, frame, why, arg):
if why == 'line':
try:
modulename = frame.f_globals["__name__"]
except:
print >> sys.stderr, '@' * 72
print >> sys.stderr, frame.f_globals.keys()
print >> sys.stderr, frame.f_code.co_filename
print >> sys.stderr, dis.dis(frame.f_code)
print >> sys.stderr, '@' * 72
return self.trace
if self.show_all or self.modules.has_key(modulename):
# something is fishy about getting the file name
if not self.modules.has_key(modulename) \
or self.modules[modulename] == None:
filename = frame.f_globals.get("__file__", None)
if filename is None:
filename = frame.f_code.co_filename
filename = os.path.abspath(filename)
if modulename == '__main__':
modulename = os.path.split(filename)[1][:-3]
self.modules[modulename] = filename
lineno = frame.f_lineno
if not self.modules[modulename].startswith(prefix):
# record the file name and line number of every trace
key = (modulename, lineno)
self.counts[key] = self.counts.get(key, 0) + 1
return self.trace
def accumulate(self, filename):
import fcntl
f = open(filename, 'a')
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
cPickle.dump(self.modules, f)
cPickle.dump(self.counts, f)
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
f.close()
def run(self, cmd):
import __main__
dict = __main__.__dict__
sys.settrace(self.trace)
try:
exec cmd in dict, dict
finally:
sys.settrace(None)
def runctx(self, cmd):
import __main__
dict = __main__.__dict__
sys.settrace(self.trace)
try:
exec cmd in dict, dict
finally:
sys.settrace(None)
def runfunc(self, func, *args, **kw):
result = None
sys.settrace(self.trace)
try:
result = apply(func, args, kw)
finally:
sys.settrace(None)
return result
def write_results(self, outfile=sys.stdout, show_lines=1):
import re
# turn the counts data ("(module, lineno) = count") into something
# accessible on a per-module basis
per_module = {}
for modulename, lineno in self.counts.keys():
try:
lines_hit = per_module[modulename]
except KeyError:
lines_hit = per_module[modulename] = {}
lines_hit[lineno] = self.counts[(modulename, lineno)]
# there are many places where this is insufficient, like a blank
# line embedded in a multiline string.
blank = re.compile(r'^\s*(#.*)?$')
for modulename, filename in self.modules.items():
if filename[0] == '<':
continue
if filename.startswith(prefix):
continue
outfile.write('@' * 78 + '\n')
outfile.write('@' * 78 + '\n')
outfile.write('%s: ' % modulename)
if filename is None:
# module as not executed as all
outfile.write("not executed\n")
continue
if filename[-4:] == ".pyc" or filename[-4:] == ".pyo":
orig_filename = filename[:-4] + '.py'
else:
orig_filename = filename
if show_lines:
outfile.write('\n')
outfile.write('@' * 78 + '\n')
outfile.write('@' * 78 + '\n')
# Get the original lines from the .py file
try:
lines = open(orig_filename, 'r').readlines()
except IOError, err:
sys.stderr.write(
"%s: Could not open %s for reading because: %s - skipping\n" % \
("trace", `filename`, err.strerror))
continue
# get a list of the line numbers which represent executable
# content (returned as a dict for better lookup speed)
executable_linenos = find_executable_linenos(orig_filename)
lines_hit = per_module[modulename]
num_lines = 0
num_lines_hit = 0
for i in range(len(lines)):
line = lines[i]
# do the blank/comment match to try to mark more lines
# (help the reader find stuff that hasn't been covered)
if lines_hit.has_key(i+1):
num_lines_hit = num_lines_hit + 1
num_lines = num_lines + 1
if show_lines:
# count precedes the lines that we captured
outfile.write('%5d: ' % lines_hit[i+1])
elif blank.match(line):
# blank lines and comments are preceded by dots
if show_lines:
outfile.write(' . ')
else:
# lines preceded by no marks weren't hit
# Highlight them if so indicated, unless the line contains
# '#pragma: NO COVER' (it is possible to embed this into
# the text as a non-comment; no easy fix)
if executable_linenos.has_key(i+1) and \
string.find(lines[i], '#pragma NO COVER') == -1:
num_lines = num_lines + 1
if show_lines:
outfile.write('>>>>>> ')
else:
if show_lines:
outfile.write(' '*7)
if show_lines:
outfile.write(string.expandtabs(lines[i], 8))
if num_lines == 0:
num_lines = 1 # avoid blowup on empty files
outfile.write("%3.1f%% (%d/%d)\n" %
((100.0*num_lines_hit)/num_lines, num_lines_hit, num_lines))
class Analyze(Coverage):
def __init__(self, filename):
self.modules = {}
self.counts = {}
f = open(filename, 'r')
try:
while 1:
modules = cPickle.load(f)
counts = cPickle.load(f)
sys.stderr.write('*')
self.modules.update(modules)
for key in counts.keys():
self.counts[key] = self.counts.get(key, 0) + counts[key]
except EOFError:
sys.stderr.write('\n')
f.close()
def write_results(self):
import re
# turn the counts data ("(module, lineno) = count") into something
# accessible on a per-module basis
per_module = {}
for modulename, lineno in self.counts.keys():
try:
lines_hit = per_module[modulename]
except KeyError:
lines_hit = per_module[modulename] = {}
lines_hit[lineno] = self.counts[(modulename, lineno)]
# there are many places where this is insufficient, like a blank
# line embedded in a multiline string.
blank = re.compile(r'^\s*(#.*)?$')
summary = open('summary.txt', 'w')
modulelist = self.modules.items()
modulelist.sort()
for modulename, filename in modulelist:
if modulename[0] == '<':
continue
if filename.startswith(prefix):
continue
if modulename.endswith('_idl'):
continue
if modulename.endswith('__POA'):
continue
outfile = open(os.path.split(modulename)[-1] + '.cover', 'w')
if filename is None:
# module as not executed as all
outfile.write("not executed\n")
continue
if filename[-4:] == ".pyc" or filename[-4:] == ".pyo":
orig_filename = filename[:-4] + '.py'
else:
orig_filename = filename
outfile.write('\n')
# Get the original lines from the .py file
try:
lines = open(orig_filename, 'r').readlines()
except IOError, err:
print modulename, filename
sys.stderr.write(
"%s: Could not open %s for reading because: %s - skipping\n" % \
("trace", `filename`, err.strerror))
continue
# get a list of the line numbers which represent executable
# content (returned as a dict for better lookup speed)
executable_linenos = find_executable_linenos(orig_filename)
lines_hit = per_module[modulename]
num_lines = 0
num_lines_hit = 0
for i in range(len(lines)):
line = lines[i]
# do the blank/comment match to try to mark more lines
# (help the reader find stuff that hasn't been covered)
if lines_hit.has_key(i+1):
num_lines_hit = num_lines_hit + 1
num_lines = num_lines + 1
# count precedes the lines that we captured
outfile.write('%5d: ' % lines_hit[i+1])
elif blank.match(line):
# blank lines and comments are preceded by dots
outfile.write(' . ')
else:
# lines preceded by no marks weren't hit
# Highlight them if so indicated, unless the line contains
# '#pragma: NO COVER' (it is possible to embed this into
# the text as a non-comment; no easy fix)
if executable_linenos.has_key(i+1) and \
string.find(lines[i], '#pragma NO COVER') == -1:
num_lines = num_lines + 1
outfile.write('>>>>>> ')
else:
outfile.write(' '*7)
outfile.write(string.expandtabs(lines[i], 8))
if num_lines == 0:
num_lines = 1 # avoid blowup on empty files
summary.write("%32s %3.1f%% (%d/%d)\n" %
(modulename, (100.0*num_lines_hit)/num_lines,
num_lines_hit, num_lines))
def main(stmt):
co = Coverage()
co.runctx(stmt)
co.write_results()
if __name__ == '__main__':
import sys
if len(sys.argv) == 2:
main(sys.argv[1])
else:
print "Usage: code_coverage.py <statement>"
sys.exit(1)
report_coverage.py
/usr/local/bin/python
import code_coverage
a = code_coverage.Analyze('coverage.db')
a.write_results()
