This page describes how to perform test coverage analysis of omniORBpy servers in conjunction with the Python standard unit test framework (http://pyunit.sourceforge.net/). Test coverage is used to analyze how much of a code base unit test suites are exercising, and this can be used to enhance the unit test suites. For an introduction to coverage analysis, see [http://www.bullseye.com/webCoverage.html]. For a slightly contrarian perspective, see [http://www.c2.com/cgi/wiki?CodeCoverage].

We will use a slightly modified version of Neil Schemenauer's code_coverage.py, originally part of the Quixote web application server framework (http://www.mems-exchange.org/software/python/quixote/), itself a modified version of Skip Montanaro's trace.py test coverage utility (http://musi-cal.mojam.com/~skip/python/trace.py), itself derived from the standard Python profiler. Disclaimer: I have tested this code on Unix (Solaris 8/x86), but not on Windows.code_coverage.py works by setting a trace function using sys.settrace. This function is called each time a line of code is executed, which obviously slows down the process considerably, and should only be done when running tests and not on a production system. We use a special command-line argument -coverage followed by a file name to specify a file in which coverage results are accumulated. The idea is that multiple servers may invoke common code and that if full coverage analysis is performed, we have to aggregate coverage data from multiple servers and test runs before we can analyze the results.

One challenge for server processes like omniORBpy servers is that they run forever and there is no good way to specify when the coverage data that has been accumulated should be dumped to disk. It is possible to catch graceful shutdowns using the atexit module, but for servers that are killed, this won't work. To address this issue, we implement a special Corba instrumentation method that is used just to dump the coverage data.

Another challenge is that the trace function set by sys.settrace is a per-thread setting, and that it needs to be reset for each new worker thread launched by omniORBpy. We use the undocumented function  omniORB.addWThreadHook()  to do this.

The example we used is based on the standard echo_srv.py that is in the directory  omni/src/lib/omniORBpy/examples/echo .


The following is a transcript of a test run. Note that the lines in echoString are preceded by 1:, which means they were executed once (by the first test case). Lines that haven't been executed (or were executed before coverage measurement started) are prefixed with chevrons >>>>>>. Empty lines or comments are prefixed with a dot.

bayazid ~/corba/python/echo_cover>rm coverage.db
bayazid ~/corba/python/echo_cover>./echo_unit_tests.py
Starting server...done
using IOR IOR:010000000d00000049444c3a4563686f3a312e300000000001000000000000002a
000000010100000e0000003139322e3136382e312e32333800768b0e000000fea536b13b00002c23
0000000000
...Terminated

----------------------------------------------------------------------
Ran 3 tests in 0.073s

OK
bayazid ~/corba/python/echo_cover>./report_coverage.py
*
bayazid ~/corba/python/echo_cover>cat summary.txt
                   code_coverage   2.3% (6/264)
                        echo_srv   17.9% (5/28)
bayazid ~/corba/python/echo_cover>cat echo_srv.cover

    . #!/usr/bin/env python
    .
>>>>>> import sys
>>>>>> from omniORB import CORBA, PortableServer, addWThreadHook
    .
>>>>>> import code_coverage
    .
    . # Import the skeletons for the global module
>>>>>> import _GlobalIDL, _GlobalIDL__POA
    .
    . # Define an implementation of the Echo interface
>>>>>> class Echo_i (_GlobalIDL__POA.Echo):
>>>>>>     def __init__(self):
>>>>>>         if '-coverage' in sys.argv:
    .             # this object collects coverage statistics
>>>>>>             self.cover = code_coverage.Coverage()
    .             # filename to which coverage statistics should be dumped
>>>>>>             self.cover_filename = sys.argv[sys.argv.index('-coverage') +
1]
    .             # this is necessary to have the coverage trace function
    .             # re-set whenever a new worker thread is started by omniORBpy
>>>>>>             addWThreadHook(self.coverage)
>>>>>>             self.coverage()
>>>>>>     def coverage(self, flag=0, wt=None):
>>>>>>         if flag == 0: # WTHREAD_CREATED
>>>>>>             sys.settrace(self.cover.trace)
    1:     def echoString(self, mesg):
    1:         print "echoString() called with message:", mesg
    1:         return mesg
    1:     def dump_coverage(self):
    1:         self.cover.accumulate(self.cover_filename)
    .
    . # Initialise the ORB
>>>>>> orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
    .
    . # Find the root POA
>>>>>> poa = orb.resolve_initial_references("RootPOA")
    .
    . # Create an instance of Echo_i
>>>>>> ei = Echo_i()
    .
    . # Create an object reference, and implicitly activate the object
>>>>>> eo = ei._this()
    .
    . # Print out the IOR
>>>>>> print orb.object_to_string(eo)
>>>>>> sys.stdout.flush()
    .
    . # Activate the POA
>>>>>> poaManager = poa._get_the_POAManager()
>>>>>> poaManager.activate()
    .
    . # Everything is running now, but if this thread drops out of the end
    . # of the file, the process will exit. orb.run() just blocks until the
    . # ORB is shut down
>>>>>> orb.run()


echo.idl

// echo.idl
interface Echo {
  string echoString(in string mesg);
  void dump_coverage();
};


echo_srv.py

/usr/bin/env python

import sys
from omniORB import CORBA, PortableServer, addWThreadHook

import code_coverage

# Import the skeletons for the global module
import _GlobalIDL, _GlobalIDL__POA

# Define an implementation of the Echo interface
class Echo_i (_GlobalIDL__POA.Echo):
    def __init__(self):
        if '-coverage' in sys.argv:
            # this object collects coverage statistics
            self.cover = code_coverage.Coverage()
            # filename to which coverage statistics should be dumped
            self.cover_filename = sys.argv[sys.argv.index('-coverage') + 1]
            # this is necessary to have the coverage trace function
            # re-set whenever a new worker thread is started by omniORBpy
            addWThreadHook(self.coverage)
            self.coverage()
    def coverage(self, flag=0, wt=None):
        if flag == 0: # WTHREAD_CREATED
            sys.settrace(self.cover.trace)
    def echoString(self, mesg):
        print "echoString() called with message:", mesg
        return mesg
    def dump_coverage(self):
        self.cover.accumulate(self.cover_filename)

# Initialise the ORB
orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)

# Find the root POA
poa = orb.resolve_initial_references("RootPOA")

# Create an instance of Echo_i
ei = Echo_i()

# Create an object reference, and implicitly activate the object
eo = ei._this()

# Print out the IOR
print orb.object_to_string(eo)
sys.stdout.flush()

# Activate the POA
poaManager = poa._get_the_POAManager()
poaManager.activate()

# Everything is running now, but if this thread drops out of the end
# of the file, the process will exit. orb.run() just blocks until the
# ORB is shut down
orb.run()


echo_unit_tests.py

/usr/bin/env python
import sys, os, unittest

from omniORB import CORBA
import _GlobalIDL

# file to use for the coverage database
cover_filename = os.path.abspath('coverage.db')

# launch Corba server
sys.stderr.write('Starting server...')
f = os.popen('./echo_srv.py -coverage coverage.db', 'r', 1)
sys.stderr.write('done\n')
ior = f.readline().strip()
print 'using IOR', ior

# -----------------------------------------------------------------------------
#
# init the Test Cases
#
# -----------------------------------------------------------------------------
class EchoTestCase(unittest.TestCase):

  def setUp(self):
    self.orb = CORBA.ORB_init(sys.argv, CORBA.ORB_ID)
    # open  connections to Corba servers for debug purposes
    self.obj = self.orb.string_to_object(ior)
    assert(self.obj is not None)
    self.echo = self.obj._narrow(_GlobalIDL.Echo)

  def test1_echo_working(self):
    'Check if the echo server is responding'
    result = self.echo.echoString('hello, world')
    assert result == 'hello, world'

  def test2_dump_coverage(self):
    'force dump of coverage statistics'
    self.echo.dump_coverage()

  def test3_kill(self):
    'kill server'
    # XXX this works on Solaris, but not all other OSes
    os.system('pkill -f echo_srv')

def suite():
  suite = unittest.makeSuite(EchoTestCase, 'test')
  return suite

if __name__ == '__main__':
  unittest.main()


code_coverage.py

"""code_coverage

Provides code coverage analysis for testing
"""

# Modified by Fazal Majid (fmajid@kefta.com) from Neil Schemenauer's
# Quixote code_coverage.py
# $Id: code_coverage.py,v 1.3 2001/09/22 00:27:14 majid Exp $

# ignore modules with a filename starting with the prefix
# this should be the prefix for your Python installation
prefix = '/usr/local'

# created 2000/07/06, Neil Schemenauer based on trace.py

__revision__ = "Id: code_coverage.py,v 1.1 2000/10/20 20:56:22 akuchlin Exp $"

# Example Usage:
#
# import code_coverage
# co = code_coverage.Coverage()
# co.run('main()')
# co.write_results()
#
# or
#
# $ python code_coverage.py "import mymodule; mymodule.main()"

# trace.py
# Copyright 2000, Mojam Media, Inc., all rights reserved.
# Author: Skip Montanaro
#
# Copyright 1999, Bioreason, Inc., all rights reserved.
# Author: Andrew Dalke
#
# Copyright 1995-1997, Automatrix, Inc., all rights reserved.
# Author: Skip Montanaro
#
# Copyright 1991-1995, Stichting Mathematisch Centrum, all rights reserved.
#
#
# Permission to use, copy, modify, and distribute this Python software and
# its associated documentation for any purpose without fee is hereby
# granted, provided that the above copyright notice appears in all copies,
# and that both that copyright notice and this permission notice appear in
# supporting documentation, and that the name of neither Automatrix,
# Bioreason or Mojam Media be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior permission.

import sys, os, cPickle, dis

# Given a code string, return the SET_LINENO information
def _find_LINENO_from_string(co_code):
    """return all of the SET_LINENO information from a code string"""
    import dis
    linenos = {}

    # This code was filched from the `dis' module then modified
    n = len(co_code)
    i = 0
    prev_op = None
    prev_lineno = 0
    while i < n:
        c = co_code[i]
        op = ord(c)
        if op == dis.SET_LINENO:
            if prev_op == op:
                # two SET_LINENO in a row, so the previous didn't
                # indicate anything.  This occurs with triple
                # quoted strings (?).  Remove the old one.
                del linenos[prev_lineno]
            prev_lineno = ord(co_code[i+1]) + ord(co_code[i+2])*256
            linenos[prev_lineno] = 1
        if op >= dis.HAVE_ARGUMENT:
            i += 3
        else:
            i += 1
        prev_op = op
    return linenos

def _find_LINENO(code):
    """return all of the SET_LINENO information from a code object"""
    import types

    # get all of the lineno information from the code of this scope level
    linenos = _find_LINENO_from_string(code.co_code)

    # and check the constants for references to other code objects
    for c in code.co_consts:
        if type(c) == types.CodeType:
            # find another code object, so recurse into it
            linenos.update(_find_LINENO(c))
    return linenos

def find_executable_linenos(filename):
    """return a dict of the line numbers from executable statements in a file

    Works by finding all of the code-like objects in the module then searching
    the byte code for 'SET_LINENO' terms (so this won't work one -O files).

    """
    import parser

    prog = open(filename).read()
    ast = parser.suite(prog)
    code = parser.compileast(ast, filename)

    # The only way I know to find line numbers is to look for the
    # SET_LINENO instructions.  Isn't there some way to get it from
    # the AST?

    return _find_LINENO(code)

class Ignore:
    def __init__(self, modules = None, dirs = None):
        self._mods = modules or []
        self._dirs = dirs or []

        self._ignore = { '<string>': 1 }


    def names(self, filename, modulename):
        if modulename in self._ignore:
            return self._ignore[modulename]

        # haven't seen this one before, so see if the module name is
        # on the ignore list.  Need to take some care since ignoring
        # "cmp" mustn't mean ignoring "cmpcache" but ignoring
        # "Spam" must also mean ignoring "Spam.Eggs".
        for mod in self._mods:
            if mod == modulename:  # Identical names, so ignore
                self._ignore[modulename] = 1
                return 1
            # check if the module is a proper submodule of something on
            # the ignore list
            n = len(mod)
            # (will not overflow since if the first n characters are the
            # same and the name has not already occured, then the size
            # of "name" is greater than that of "mod")
            if mod == modulename[:n] and modulename[n] == '.':
                self._ignore[modulename] = 1
                return 1

        # Now check that __file__ isn't in one of the directories
        if filename is None:
            # must be a built-in, so we must ignore
            self._ignore[modulename] = 1
            return 1

        # Ignore a file when it contains one of the ignorable paths
        for d in self._dirs:
            # The '+ os.sep' is to ensure that d is a parent directory,
            # as compared to cases like:
            #  d = "/usr/local"
            #  filename = "/usr/local.py"
            # or
            #  d = "/usr/local.py"
            #  filename = "/usr/local.py"
            if filename.find(d + os.sep) == 0:
                self._ignore[modulename] = 1
                return 1

        # Tried the different ways, so we don't ignore this module
        self._ignore[modulename] = 0
        return 0

class Coverage:
    def __init__(self, modules=None):
        """modules : list of modules for which to show coverage
        default is all modules"""
        self.modules = {} # module -> filename
        self.counts = {}  # (modulename, linenumber) -> count
        if modules is None:
            self.show_all = True
        else:
            self.show_all = False
            for module in modules:
                self.modules[module] = None

    def trace(self, frame, why, arg):
        if why == 'line':
            try:
                modulename = frame.f_globals["__name__"]
            except:
                print >> sys.stderr, '@' * 72
                print >> sys.stderr, frame.f_globals.keys()
                print >> sys.stderr, frame.f_code.co_filename
                print >> sys.stderr, dis.dis(frame.f_code)
                print >> sys.stderr, '@' * 72
                return self.trace

            if self.show_all or modulename in self.modules:
                # something is fishy about getting the file name
                if self.modules.get(modulename) == None:
                    filename = frame.f_globals.get("__file__", None)
                    if filename is None:
                        filename = frame.f_code.co_filename
                    filename = os.path.abspath(filename)
                    if modulename == '__main__':
                        modulename = os.path.split(filename)[1][:-3]
                    self.modules[modulename] = filename

                lineno = frame.f_lineno

                if not self.modules[modulename].startswith(prefix):
                    # record the file name and line number of every trace
                    key = (modulename, lineno)
                    self.counts[key] = self.counts.get(key, 0) + 1

        return self.trace

    def accumulate(self, filename):
        import fcntl
        f = open(filename, 'a')
        fcntl.flock(f.fileno(), fcntl.LOCK_EX)
        cPickle.dump(self.modules, f)
        cPickle.dump(self.counts, f)
        fcntl.flock(f.fileno(), fcntl.LOCK_UN)
        f.close()

    def run(self, cmd):
        import __main__
        dict = __main__.__dict__
        sys.settrace(self.trace)
        try:
            exec cmd in dict, dict
        finally:
            sys.settrace(None)

    def runctx(self, cmd):
        import __main__
        dict = __main__.__dict__
        sys.settrace(self.trace)
        try:
            exec cmd in dict, dict
        finally:
            sys.settrace(None)

    def runfunc(self, func, *args, **kw):
        result = None
        sys.settrace(self.trace)
        try:
            result = apply(func, args, kw)
        finally:
            sys.settrace(None)
        return result

    def write_results(self, outfile=sys.stdout, show_lines=True):
        import re
        # turn the counts data ("(module, lineno) = count") into something
        # accessible on a per-module basis
        per_module = {}
        for modulename, lineno in self.counts.keys():
            try:
                lines_hit = per_module[modulename]
            except KeyError:
                lines_hit = per_module[modulename] = {}
            lines_hit[lineno] = self.counts[(modulename, lineno)]

        # there are many places where this is insufficient, like a blank
        # line embedded in a multiline string.
        blank = re.compile(r'^\s*(#.*)?$')

        for modulename, filename in self.modules.items():
            if filename[0] == '<':
                continue
            if filename.startswith(prefix):
                continue
            outfile.write('@' * 78 + '\n')
            outfile.write('@' * 78 + '\n')
            outfile.write('%s: ' % modulename)
            if filename is None:
                # module as not executed as all
                outfile.write("not executed\n")
                continue
            if filename.endswith(".pyc") or filename.endswith(".pyo"):
                orig_filename = filename[:-4] + '.py'
            else:
                orig_filename = filename

            if show_lines:
                outfile.write('\n')

            outfile.write('@' * 78 + '\n')
            outfile.write('@' * 78 + '\n')

            # Get the original lines from the .py file
            try:
                lines = open(orig_filename, 'r').readlines()
            except IOError, err:
                sys.stderr.write(
                    "%s: Could not open %r for reading because: %s - skipping\n" % \
                    ("trace", filename, err.strerror))
                continue

            # get a list of the line numbers which represent executable
            # content (returned as a dict for better lookup speed)
            executable_linenos = find_executable_linenos(orig_filename)

            lines_hit = per_module[modulename]
            num_lines = 0
            num_lines_hit = 0
            for i, line in enumerate(lines):
                # do the blank/comment match to try to mark more lines
                # (help the reader find stuff that hasn't been covered)
                if i+1 in lines_hit:
                    num_lines_hit += 1
                    num_lines += 1
                    if show_lines:
                        # count precedes the lines that we captured
                        outfile.write('%5d: ' % lines_hit[i+1])
                elif blank.match(line):
                    # blank lines and comments are preceded by dots
                    if show_lines:
                        outfile.write('    . ')
                elif i+1 in executable_linenos and \
                   '#pragma NO COVER' not in line:
                    # lines preceded by no marks weren't hit
                    # Highlight them if so indicated, unless the line contains
                    # '#pragma: NO COVER' (it is possible to embed this into
                    # the text as a non-comment; no easy fix)
                    num_lines += 1
                    if show_lines:
                        outfile.write('>>>>>> ')
                else:
                    if show_lines:
                        outfile.write(' '*7)
                if show_lines:
                    outfile.write(line.expandtabs(8))
            if num_lines == 0:
                num_lines = 1 # avoid blowup on empty files
            outfile.write("%3.1f%% (%d/%d)\n" %
                    ((100.0*num_lines_hit)/num_lines, num_lines_hit, num_lines))


class Analyze(Coverage):
    def __init__(self, filename):
        self.modules = {}
        self.counts = {}
        f = open(filename, 'r')
        try:
            while True:
                modules = cPickle.load(f)
                counts = cPickle.load(f)
                sys.stderr.write('*')
                self.modules.update(modules)
                for key in counts:
                    self.counts[key] = self.counts.get(key, 0) + counts[key]
        except EOFError:
            sys.stderr.write('\n')
            f.close()
    def write_results(self):
        import re
        # turn the counts data ("(module, lineno) = count") into something
        # accessible on a per-module basis
        per_module = {}
        for modulename, lineno in self.counts.keys():
            try:
                lines_hit = per_module[modulename]
            except KeyError:
                lines_hit = per_module[modulename] = {}
            lines_hit[lineno] = self.counts[(modulename, lineno)]

        # there are many places where this is insufficient, like a blank
        # line embedded in a multiline string.
        blank = re.compile(r'^\s*(#.*)?$')

        summary = open('summary.txt', 'w')
        modulelist = self.modules.items()
        modulelist.sort()
        for modulename, filename in modulelist:
            if modulename[0] == '<':
                continue
            if filename.startswith(prefix):
                continue
            if modulename.endswith('_idl'):
                continue
            if modulename.endswith('__POA'):
                continue
            outfile = open(os.path.split(modulename)[-1] + '.cover', 'w')
            if filename is None:
                # module as not executed as all
                outfile.write("not executed\n")
                continue
            if filename.endswith(".pyc") or filename.endswith(".pyo"):
                orig_filename = filename[:-4] + '.py'
            else:
                orig_filename = filename

            outfile.write('\n')

            # Get the original lines from the .py file
            try:
                lines = open(orig_filename, 'r').readlines()
            except IOError, err:
                print modulename, filename
                sys.stderr.write(
                    "%s: Could not open %r for reading because: %s - skipping\n" % \
                    ("trace", filename, err.strerror))
                continue

            # get a list of the line numbers which represent executable
            # content (returned as a dict for better lookup speed)
            executable_linenos = find_executable_linenos(orig_filename)

            lines_hit = per_module[modulename]
            num_lines = 0
            num_lines_hit = 0
            for i, line in enumerate(lines):
                # do the blank/comment match to try to mark more lines
                # (help the reader find stuff that hasn't been covered)
                if lines_hit.has_key(i+1):
                    num_lines_hit += 1
                    num_lines += 1
                    # count precedes the lines that we captured
                    outfile.write('%5d: ' % lines_hit[i+1])
                elif blank.match(line):
                    # blank lines and comments are preceded by dots
                    outfile.write('    . ')
                else:
                    # lines preceded by no marks weren't hit
                    # Highlight them if so indicated, unless the line contains
                    # '#pragma: NO COVER' (it is possible to embed this into
                    # the text as a non-comment; no easy fix)
                    if executable_linenos.has_key(i+1) and \
                       '#pragma NO COVER' not in line:
                        num_lines += 1
                        outfile.write('>>>>>> ')
                    else:
                        outfile.write(' '*7)
                outfile.write(line.expandtabs(8))
            if num_lines == 0:
                num_lines = 1 # avoid blowup on empty files
            summary.write("%32s   %3.1f%% (%d/%d)\n" %
                    (modulename, (100.0*num_lines_hit)/num_lines,
                     num_lines_hit, num_lines))

def main(stmt):
    co = Coverage()
    co.runctx(stmt)
    co.write_results()


if __name__ == '__main__':
    import sys
    if len(sys.argv) == 2:
        main(sys.argv[1])
    else:
        print "Usage: code_coverage.py <statement>"
        sys.exit(1)


report_coverage.py

/usr/local/bin/python
import code_coverage
a = code_coverage.Analyze('coverage.db')
a.write_results()

TestCoverageAnalysis (last edited 2010-03-01 16:40:05 by s235-148)