Source code for autotest.client.shared.test

# Shell class for a test, inherited by all individual tests
#
# Methods:
#       __init__        initialise
#       initialize      run once for each job
#       setup           run once for each new version of the test installed
#       run             run the test (wrapped by job.run_test())
#
# Data:
#       job             backreference to the job this test instance is part of
#       outputdir       eg. results/<job>/<testname.tag>
#       resultsdir      eg. results/<job>/<testname.tag>/results
#       profdir         eg. results/<job>/<testname.tag>/profiling
#       debugdir        eg. results/<job>/<testname.tag>/debug
#       bindir          eg. tests/<test>
#       src             eg. tests/<test>/src
#       tmpdir          eg. tmp/<tempname>_<testname.tag>

import fcntl
import getpass
import logging
import os
import re
import shutil
import sys
import tempfile
import time
import traceback

from autotest.client import utils
from autotest.client.shared import error, utils_memory
from autotest.client.shared.settings import settings


[docs]class base_test(object): preserve_srcdir = False network_destabilizing = False def __init__(self, job, bindir, outputdir): self.job = job self.pkgmgr = job.pkgmgr self.autodir = job.autodir self.outputdir = outputdir self.tagged_testname = os.path.basename(self.outputdir) self.resultsdir = os.path.join(self.outputdir, 'results') os.mkdir(self.resultsdir) self.profdir = os.path.join(self.outputdir, 'profiling') os.mkdir(self.profdir) self.debugdir = os.path.join(self.outputdir, 'debug') os.mkdir(self.debugdir) if getpass.getuser() == 'root': self.configure_crash_handler() else: self.crash_handling_enabled = False self.bindir = bindir try: autodir = os.path.abspath(os.environ['AUTODIR']) except KeyError: autodir = settings.get_value('COMMON', 'autotest_top_path') tmpdir = os.path.join(autodir, 'tmp') output_config = settings.get_value('COMMON', 'test_output_dir', default=tmpdir) self.srcdir = os.path.join(output_config, os.path.basename(self.bindir), 'src') self.tmpdir = tempfile.mkdtemp("_" + self.tagged_testname, dir=job.tmpdir) self._keyvals = [] self._new_keyval = False self.failed_constraints = [] self.iteration = 0 self.before_iteration_hooks = [] self.after_iteration_hooks = []
[docs] def configure_crash_handler(self): pass
[docs] def crash_handler_report(self): pass
[docs] def assert_(self, expr, msg='Assertion failed.'): if not expr: raise error.TestError(msg)
[docs] def write_test_keyval(self, attr_dict): utils.write_keyval(self.outputdir, attr_dict, tap_report=self.job._tap)
@staticmethod def _append_type_to_keys(dictionary, typename): new_dict = {} for key, value in dictionary.iteritems(): new_key = "%s{%s}" % (key, typename) new_dict[new_key] = value return new_dict
[docs] def write_perf_keyval(self, perf_dict): self.write_iteration_keyval({}, perf_dict, tap_report=self.job._tap)
[docs] def write_attr_keyval(self, attr_dict): self.write_iteration_keyval(attr_dict, {}, tap_report=self.job._tap)
[docs] def write_iteration_keyval(self, attr_dict, perf_dict, tap_report=None): # append the dictionaries before they have the {perf} and {attr} added self._keyvals.append({'attr': attr_dict, 'perf': perf_dict}) self._new_keyval = True if attr_dict: attr_dict = self._append_type_to_keys(attr_dict, "attr") utils.write_keyval(self.resultsdir, attr_dict, type_tag="attr", tap_report=tap_report) if perf_dict: perf_dict = self._append_type_to_keys(perf_dict, "perf") utils.write_keyval(self.resultsdir, perf_dict, type_tag="perf", tap_report=tap_report) keyval_path = os.path.join(self.resultsdir, "keyval") print >> open(keyval_path, "a"), ""
[docs] def analyze_perf_constraints(self, constraints): if not self._new_keyval: return # create a dict from the keyvals suitable as an environment for eval keyval_env = self._keyvals[-1]['perf'].copy() keyval_env['__builtins__'] = None self._new_keyval = False failures = [] # evaluate each constraint using the current keyvals for constraint in constraints: logging.info('___________________ constraint = %s', constraint) logging.info('___________________ keyvals = %s', keyval_env) try: if not eval(constraint, keyval_env): failures.append('%s: constraint was not met' % constraint) except Exception: failures.append('could not evaluate constraint: %s' % constraint) # keep track of the errors for each iteration self.failed_constraints.append(failures)
[docs] def process_failed_constraints(self): msg = '' for i, failures in enumerate(self.failed_constraints): if failures: msg += 'iteration %d:%s ' % (i, ','.join(failures)) if msg: raise error.TestFail(msg)
[docs] def register_before_iteration_hook(self, iteration_hook): """ This is how we expect test writers to register a before_iteration_hook. This adds the method to the list of hooks which are executed before each iteration. :param iteration_hook: Method to run before each iteration. A valid hook accepts a single argument which is the test object. """ self.before_iteration_hooks.append(iteration_hook)
[docs] def register_after_iteration_hook(self, iteration_hook): """ This is how we expect test writers to register an after_iteration_hook. This adds the method to the list of hooks which are executed after each iteration. :param iteration_hook: Method to run after each iteration. A valid hook accepts a single argument which is the test object. """ self.after_iteration_hooks.append(iteration_hook)
[docs] def initialize(self): pass
[docs] def setup(self): pass
[docs] def warmup(self, *args, **dargs): pass
[docs] def drop_caches_between_iterations(self): if self.job.drop_caches_between_iterations: utils_memory.drop_caches()
def _call_run_once(self, constraints, profile_only, postprocess_profiled_run, args, dargs): self.drop_caches_between_iterations() # execute iteration hooks for hook in self.before_iteration_hooks: hook(self) try: if profile_only: if not self.job.profilers.present(): self.job.record('WARN', None, None, 'No profilers have been added but ' 'profile_only is set - nothing ' 'will be run') self.run_once_profiling(postprocess_profiled_run, *args, **dargs) else: self.before_run_once() self.run_once(*args, **dargs) self.after_run_once() self.postprocess_iteration() self.analyze_perf_constraints(constraints) finally: for hook in self.after_iteration_hooks: hook(self)
[docs] def execute(self, iterations=None, test_length=None, profile_only=None, _get_time=time.time, postprocess_profiled_run=None, constraints=(), *args, **dargs): """ This is the basic execute method for the tests inherited from base_test. If you want to implement a benchmark test, it's better to implement the run_once function, to cope with the profiling infrastructure. For other tests, you can just override the default implementation. :param test_length: The minimum test length in seconds. We'll run the run_once function for a number of times large enough to cover the minimum test length. :param iterations: A number of iterations that we'll run the run_once function. This parameter is incompatible with test_length and will be silently ignored if you specify both. :param profile_only: If true run X iterations with profilers enabled. If false run X iterations and one with profiling if profiles are enabled. If None, default to the value of job.default_profile_only. :param _get_time: [time.time] Used for unit test time injection. :param postprocess_profiled_run: Run the postprocessing for the profiled run. """ # For our special class of tests, the benchmarks, we don't want # profilers to run during the test iterations. Let's reserve only # the last iteration for profiling, if needed. So let's stop # all profilers if they are present and active. profilers = self.job.profilers if profilers.active(): profilers.stop(self) if profile_only is None: profile_only = self.job.default_profile_only # If the user called this test in an odd way (specified both iterations # and test_length), let's warn them. if iterations and test_length: logging.debug('Iterations parameter ignored (timed execution)') if test_length: test_start = _get_time() time_elapsed = 0 timed_counter = 0 logging.debug('Test started. Specified %d s as the minimum test ' 'length', test_length) while time_elapsed < test_length: timed_counter = timed_counter + 1 if time_elapsed == 0: logging.debug('Executing iteration %d', timed_counter) elif time_elapsed > 0: logging.debug('Executing iteration %d, time_elapsed %d s', timed_counter, time_elapsed) self._call_run_once(constraints, profile_only, postprocess_profiled_run, args, dargs) test_iteration_finish = _get_time() time_elapsed = test_iteration_finish - test_start logging.debug('Test finished after %d iterations, ' 'time elapsed: %d s', timed_counter, time_elapsed) else: if iterations is None: iterations = 1 if iterations > 1: logging.debug('Test started. Specified %d iterations', iterations) for self.iteration in xrange(1, iterations + 1): if iterations > 1: logging.debug('Executing iteration %d of %d', self.iteration, iterations) self._call_run_once(constraints, profile_only, postprocess_profiled_run, args, dargs) if not profile_only: self.iteration += 1 self.run_once_profiling(postprocess_profiled_run, *args, **dargs) # Do any postprocessing, normally extracting performance keyvals, etc self.postprocess() self.process_failed_constraints()
[docs] def run_once_profiling(self, postprocess_profiled_run, *args, **dargs): profilers = self.job.profilers # Do a profiling run if necessary if profilers.present(): self.drop_caches_between_iterations() profilers.before_start(self) self.before_run_once() profilers.start(self) logging.debug('Profilers present. Profiling run started') try: self.run_once(*args, **dargs) # Priority to the run_once() argument over the attribute. postprocess_attribute = getattr(self, 'postprocess_profiled_run', False) if (postprocess_profiled_run or (postprocess_profiled_run is None and postprocess_attribute)): self.postprocess_iteration() finally: profilers.stop(self) profilers.report(self) self.after_run_once()
[docs] def postprocess(self): pass
[docs] def postprocess_iteration(self): pass
[docs] def cleanup(self): pass
[docs] def before_run_once(self): """ Override in tests that need it, will be called before any run_once() call including the profiling run (when it's called before starting the profilers). """ pass
[docs] def after_run_once(self): """ Called after every run_once (including from a profiled run when it's called after stopping the profilers). """ pass
def _exec(self, args, dargs): self.job.logging.tee_redirect_debug_dir(self.debugdir, log_name=self.tagged_testname) try: if self.network_destabilizing: self.job.disable_warnings("NETWORK") # write out the test attributes into a keyval dargs = dargs.copy() run_cleanup = dargs.pop('run_cleanup', self.job.run_test_cleanup) keyvals = dargs.pop('test_attributes', {}).copy() keyvals['version'] = self.version for i, arg in enumerate(args): keyvals['param-%d' % i] = repr(arg) for name, arg in dargs.iteritems(): keyvals['param-%s' % name] = repr(arg) self.write_test_keyval(keyvals) _validate_args(args, dargs, self.initialize, self.setup, self.execute, self.cleanup) try: # Initialize: _cherry_pick_call(self.initialize, *args, **dargs) lockfile = open(os.path.join(self.job.tmpdir, '.testlock'), 'w') try: fcntl.flock(lockfile, fcntl.LOCK_EX) # Setup: (compile and install the test, if needed) p_args, p_dargs = _cherry_pick_args(self.setup, args, dargs) utils.update_version(self.srcdir, self.preserve_srcdir, self.version, self.setup, *p_args, **p_dargs) finally: fcntl.flock(lockfile, fcntl.LOCK_UN) lockfile.close() # Execute: os.chdir(self.outputdir) # call self.warmup cherry picking the arguments it accepts and # translate exceptions if needed _call_test_function(_cherry_pick_call, self.warmup, *args, **dargs) if hasattr(self, 'run_once'): p_args, p_dargs = _cherry_pick_args(self.run_once, args, dargs) # pull in any non-* and non-** args from self.execute for param in _get_nonstar_args(self.execute): if param in dargs: p_dargs[param] = dargs[param] else: p_args, p_dargs = _cherry_pick_args(self.execute, args, dargs) _call_test_function(self.execute, *p_args, **p_dargs) except Exception: try: logging.exception('Exception escaping from test:') except Exception: pass # don't let logging exceptions here interfere # Save the exception while we run our cleanup() before # reraising it. exc_info = sys.exc_info() try: try: if run_cleanup: _cherry_pick_call(self.cleanup, *args, **dargs) except Exception: logging.error('Ignoring exception during cleanup() phase:') traceback.print_exc() logging.error('Now raising the earlier %s error', exc_info[0]) self.crash_handler_report() finally: self.job.logging.restore() try: raise exc_info[0], exc_info[1], exc_info[2] finally: # http://docs.python.org/library/sys.html#sys.exc_info # Be nice and prevent a circular reference. del exc_info else: try: if run_cleanup: _cherry_pick_call(self.cleanup, *args, **dargs) self.crash_handler_report() finally: self.job.logging.restore() except error.AutotestError: if self.network_destabilizing: self.job.enable_warnings("NETWORK") # Pass already-categorized errors on up. raise except Exception, e: if self.network_destabilizing: self.job.enable_warnings("NETWORK") # Anything else is an ERROR in our own code, not execute(). raise error.UnhandledTestError(e) else: if self.network_destabilizing: self.job.enable_warnings("NETWORK")
[docs]def subtest_fatal(function): """ Decorator which mark test critical. If subtest fails the whole test ends. """ def wrapped(self, *args, **kwds): self._fatal = True self.decored() result = function(self, *args, **kwds) return result wrapped.func_name = function.func_name return wrapped
[docs]def subtest_nocleanup(function): """ Decorator used to disable cleanup function. """ def wrapped(self, *args, **kwds): self._cleanup = False self.decored() result = function(self, *args, **kwds) return result wrapped.func_name = function.func_name return wrapped
[docs]class Subtest(object): """ Collect result of subtest of main test. """ result = [] passed = 0 failed = 0 def __new__(cls, *args, **kargs): self = super(Subtest, cls).__new__(cls) self._fatal = False self._cleanup = True self._num_decored = 0 ret = None if args is None: args = [] res = { 'result': None, 'name': self.__class__.__name__, 'args': args, 'kargs': kargs, 'output': None, } try: try: logging.info("Starting test %s" % self.__class__.__name__) ret = self.test(*args, **kargs) res['result'] = 'PASS' res['output'] = ret try: logging.info(Subtest.result_to_string(res)) except Exception: self._num_decored = 0 raise Subtest.result.append(res) Subtest.passed += 1 except NotImplementedError: raise except Exception: exc_type, exc_value, exc_traceback = sys.exc_info() for _ in range(self._num_decored): exc_traceback = exc_traceback.tb_next logging.error("In function (" + self.__class__.__name__ + "):") logging.error("Call from:\n" + traceback.format_stack()[-2][:-1]) logging.error("Exception from:\n" + "".join(traceback.format_exception( exc_type, exc_value, exc_traceback.tb_next))) # Clean up environment after subTest crash res['result'] = 'FAIL' logging.info(self.result_to_string(res)) Subtest.result.append(res) Subtest.failed += 1 if self._fatal: raise finally: if self._cleanup: self.clean() return ret
[docs] def test(self): """ Check if test is defined. For makes test fatal add before implementation of test method decorator @subtest_fatal """ raise NotImplementedError("Method 'test' must be implemented.")
[docs] def clean(self): """ Check if cleanup is defined. For makes test fatal add before implementation of test method decorator @subtest_nocleanup """ raise NotImplementedError("Method 'cleanup' must be implemented.")
[docs] def decored(self): self._num_decored += 1
@classmethod
[docs] def has_failed(cls): """ :return: If any of subtest not pass return True. """ if cls.failed > 0: return True else: return False
@classmethod
[docs] def get_result(cls): """ :return: Result of subtests. Format: tuple(pass/fail,function_name,call_arguments) """ return cls.result
@staticmethod
[docs] def result_to_string_debug(result): """ :param result: Result of test. """ sargs = "" for arg in result['args']: sargs += str(arg) + "," sargs = sargs[:-1] return ("Subtest (%s(%s)): --> %s") % (result['name'], sargs, result['status'])
@staticmethod
[docs] def result_to_string(result): """ Format of result dict. result = { 'result' : "PASS" / "FAIL", 'name' : class name, 'args' : test's args, 'kargs' : test's kargs, 'output' : return of test function, } :param result: Result of test. """ return ("Subtest (%(name)s): --> %(result)s") % (result)
@classmethod
[docs] def log_append(cls, msg): """ Add log_append to result output. :param msg: Test of log_append """ cls.result.append([msg])
@classmethod def _gen_res(cls, format_func): """ Format result with formatting function :param format_func: Func for formating result. """ result = "" for res in cls.result: if (isinstance(res, dict)): result += format_func(res) + "\n" else: result += str(res[0]) + "\n" return result @classmethod
[docs] def get_full_text_result(cls, format_func=None): """ :return: string with text form of result """ if format_func is None: format_func = cls.result_to_string_debug return cls._gen_res(lambda s: format_func(s))
@classmethod
[docs] def get_text_result(cls, format_func=None): """ :return: string with text form of result """ if format_func is None: format_func = cls.result_to_string return cls._gen_res(lambda s: format_func(s))
[docs] def runsubtest(self, url, *args, **dargs): """ Execute another autotest test from inside the current test's scope. :param test: Parent test. :param url: Url of new test. :param tag: Tag added to test name. :param args: Args for subtest. :param dargs: Dictionary with args for subtest. @iterations: Number of subtest iterations. @profile_only: If true execute one profiled run. """ dargs["profile_only"] = dargs.get("profile_only", False) test_basepath = self.outputdir[len(self.job.resultdir + "/"):] return self.job.run_test(url, master_testpath=test_basepath, *args, **dargs)
def _get_nonstar_args(func): """Extract all the (normal) function parameter names. Given a function, returns a tuple of parameter names, specifically excluding the * and ** parameters, if the function accepts them. :param func: A callable that we want to chose arguments for. :return: A tuple of parameters accepted by the function. """ return func.func_code.co_varnames[:func.func_code.co_argcount] def _cherry_pick_args(func, args, dargs): """Sanitize positional and keyword arguments before calling a function. Given a callable (func), an argument tuple and a dictionary of keyword arguments, pick only those arguments which the function is prepared to accept and return a new argument tuple and keyword argument dictionary. Args: func: A callable that we want to choose arguments for. args: A tuple of positional arguments to consider passing to func. dargs: A dictionary of keyword arguments to consider passing to func. Returns: A tuple of: (args tuple, keyword arguments dictionary) """ # Cherry pick args: if func.func_code.co_flags & 0x04: # func accepts *args, so return the entire args. p_args = args else: p_args = () # Cherry pick dargs: if func.func_code.co_flags & 0x08: # func accepts **dargs, so return the entire dargs. p_dargs = dargs else: # Only return the keyword arguments that func accepts. p_dargs = {} for param in _get_nonstar_args(func): if param in dargs: p_dargs[param] = dargs[param] return p_args, p_dargs def _cherry_pick_call(func, *args, **dargs): """Cherry picks arguments from args/dargs based on what "func" accepts and calls the function with the picked arguments.""" p_args, p_dargs = _cherry_pick_args(func, args, dargs) return func(*p_args, **p_dargs) def _validate_args(args, dargs, *funcs): """Verify that arguments are appropriate for at least one callable. Given a list of callables as additional parameters, verify that the proposed keyword arguments in dargs will each be accepted by at least one of the callables. NOTE: args is currently not supported and must be empty. Args: args: A tuple of proposed positional arguments. dargs: A dictionary of proposed keyword arguments. *funcs: Callables to be searched for acceptance of args and dargs. Raises: error.AutotestError: if an arg won't be accepted by any of *funcs. """ all_co_flags = 0 all_varnames = () for func in funcs: all_co_flags |= func.func_code.co_flags all_varnames += func.func_code.co_varnames[:func.func_code.co_argcount] # Check if given args belongs to at least one of the methods below. if len(args) > 0: # Current implementation doesn't allow the use of args. raise error.TestError('Unnamed arguments not accepted. Please ' 'call job.run_test with named args only') # Check if given dargs belongs to at least one of the methods below. if len(dargs) > 0: if not all_co_flags & 0x08: # no func accepts *dargs, so: for param in dargs: if param not in all_varnames: raise error.AutotestError('Unknown parameter: %s' % param) def _installtest(job, url): (group, name) = job.pkgmgr.get_package_name(url, 'test') # Bail if the test is already installed group_dir = os.path.join(job.testdir, "download", group) if os.path.exists(os.path.join(group_dir, name)): return (group, name) # If the group directory is missing create it and add # an empty __init__.py so that sub-directories are # considered for import. if not os.path.exists(group_dir): os.makedirs(group_dir) f = file(os.path.join(group_dir, '__init__.py'), 'w+') f.close() logging.debug("%s: installing test url=%s", name, url) tarball = os.path.basename(url) tarball_path = os.path.join(group_dir, tarball) test_dir = os.path.join(group_dir, name) job.pkgmgr.fetch_pkg(tarball, tarball_path, repo_url=os.path.dirname(url)) # Create the directory for the test if not os.path.exists(test_dir): os.mkdir(os.path.join(group_dir, name)) job.pkgmgr.untar_pkg(tarball_path, test_dir) os.remove(tarball_path) # For this 'sub-object' to be importable via the name # 'group.name' we need to provide an __init__.py, # so link the main entry point to this. os.symlink(name + '.py', os.path.join(group_dir, name, '__init__.py')) # The test is now installed. return (group, name) def _call_test_function(func, *args, **dargs): """Calls a test function and translates exceptions so that errors inside test code are considered test failures.""" try: return func(*args, **dargs) except error.AutotestError: # Pass already-categorized errors on up as is. raise except Exception, e: # Other exceptions must be treated as a FAIL when # raised during the test functions raise error.UnhandledTestFail(e)
[docs]def runtest(job, url, tag, args, dargs, local_namespace={}, global_namespace={}, before_test_hook=None, after_test_hook=None, before_iteration_hook=None, after_iteration_hook=None): local_namespace = local_namespace.copy() global_namespace = global_namespace.copy() # if this is not a plain test name then download and install the # specified test if url.endswith('.tar.bz2'): (testgroup, testname) = _installtest(job, url) bindir = os.path.join(job.testdir, 'download', testgroup, testname) importdir = os.path.join(job.testdir, 'download') modulename = '%s.%s' % (re.sub('/', '.', testgroup), testname) classname = '%s.%s' % (modulename, testname) path = testname else: # If the test is local, it may be under either testdir or site_testdir. # Tests in site_testdir override tests defined in testdir testname = path = url testgroup = '' path = re.sub(':', '/', testname) modulename = os.path.basename(path) classname = '%s.%s' % (modulename, modulename) # Try installing the test package # The job object may be either a server side job or a client side job. # 'install_pkg' method will be present only if it's a client side job. if hasattr(job, 'install_pkg'): try: bindir = os.path.join(job.site_testdir, testname) job.install_pkg(testname, 'test', bindir) except error.PackageInstallError: # continue as a fall back mechanism and see if the test code # already exists on the machine pass testdir_list = [job.testdir, getattr(job, 'site_testdir', None), job.customtestdir] bindir_config = settings.get_value('COMMON', 'test_dir', default="") if bindir_config: testdir_list.extend(bindir_config.strip().split(',')) bindir = None for t_dir in testdir_list: if t_dir is not None and os.path.exists(os.path.join(t_dir, path)): importdir = bindir = os.path.join(t_dir, path) if not bindir: raise error.TestError(testname + ': test does not exist') subdir = os.path.join(dargs.pop('master_testpath', ""), testname) outputdir = os.path.join(job.resultdir, subdir) if tag: outputdir += '.' + tag local_namespace['job'] = job local_namespace['outputdir'] = outputdir local_namespace['bindir'] = bindir sys.path.insert(0, importdir) try: exec ('import %s' % modulename, local_namespace, global_namespace) exec ("mytest = %s(job, bindir, outputdir)" % classname, local_namespace, global_namespace) finally: sys.path.pop(0) pwd = os.getcwd() os.chdir(outputdir) try: mytest = global_namespace['mytest'] if before_test_hook: before_test_hook(mytest) # we use the register iteration hooks methods to register the passed # in hooks if before_iteration_hook: mytest.register_before_iteration_hook(before_iteration_hook) if after_iteration_hook: mytest.register_after_iteration_hook(after_iteration_hook) mytest._exec(args, dargs) finally: os.chdir(pwd) if after_test_hook: after_test_hook(mytest) shutil.rmtree(mytest.tmpdir, ignore_errors=True)