Source code for autotest.client.shared.test_utils.mock

__author__ = "raphtee@google.com (Travis Miller)"

import StringIO
import collections
import re
import sys


[docs]class StubNotFoundError(Exception): 'Raised when god is asked to unstub an attribute that was not stubbed' pass
[docs]class CheckPlaybackError(Exception): 'Raised when mock playback does not match recorded calls.' pass
[docs]class SaveDataAfterCloseStringIO(StringIO.StringIO): """Saves the contents in a final_data property when close() is called. Useful as a mock output file object to test both that the file was closed and what was written. Properties: final_data: Set to the StringIO's getvalue() data when close() is called. None if close() has not been called. """ final_data = None
[docs] def close(self): self.final_data = self.getvalue() StringIO.StringIO.close(self)
[docs]class argument_comparator(object):
[docs] def is_satisfied_by(self, parameter): raise NotImplementedError
[docs]class equality_comparator(argument_comparator): def __init__(self, value): self.value = value @staticmethod def _types_match(arg1, arg2): if isinstance(arg1, basestring) and isinstance(arg2, basestring): return True return type(arg1) == type(arg2) @classmethod def _compare(cls, actual_arg, expected_arg): if isinstance(expected_arg, argument_comparator): return expected_arg.is_satisfied_by(actual_arg) if not cls._types_match(expected_arg, actual_arg): return False if isinstance(expected_arg, list) or isinstance(expected_arg, tuple): # recurse on lists/tuples if len(actual_arg) != len(expected_arg): return False for actual_item, expected_item in zip(actual_arg, expected_arg): if not cls._compare(actual_item, expected_item): return False elif isinstance(expected_arg, dict): # recurse on dicts if not cls._compare(sorted(actual_arg.keys()), sorted(expected_arg.keys())): return False for key, value in actual_arg.iteritems(): if not cls._compare(value, expected_arg[key]): return False elif actual_arg != expected_arg: return False return True
[docs] def is_satisfied_by(self, parameter): return self._compare(parameter, self.value)
def __str__(self): if isinstance(self.value, argument_comparator): return str(self.value) return repr(self.value)
[docs]class regex_comparator(argument_comparator): def __init__(self, pattern, flags=0): self.regex = re.compile(pattern, flags)
[docs] def is_satisfied_by(self, parameter): return self.regex.search(parameter) is not None
def __str__(self): return self.regex.pattern
[docs]class is_string_comparator(argument_comparator):
[docs] def is_satisfied_by(self, parameter): return isinstance(parameter, basestring)
def __str__(self): return "a string"
[docs]class is_instance_comparator(argument_comparator): def __init__(self, cls): self.cls = cls
[docs] def is_satisfied_by(self, parameter): return isinstance(parameter, self.cls)
def __str__(self): return "is a %s" % self.cls
[docs]class anything_comparator(argument_comparator):
[docs] def is_satisfied_by(self, parameter): return True
def __str__(self): return 'anything'
[docs]class base_mapping(object): def __init__(self, symbol, return_obj, *args, **dargs): self.return_obj = return_obj self.symbol = symbol self.args = [equality_comparator(arg) for arg in args] self.dargs = dict((key, equality_comparator(value)) for key, value in dargs.iteritems()) self.error = None
[docs] def match(self, *args, **dargs): if len(args) != len(self.args) or len(dargs) != len(self.dargs): return False for i, expected_arg in enumerate(self.args): if not expected_arg.is_satisfied_by(args[i]): return False # check for incorrect dargs for key, value in dargs.iteritems(): if key not in self.dargs: return False if not self.dargs[key].is_satisfied_by(value): return False # check for missing dargs for key in self.dargs.iterkeys(): if key not in dargs: return False return True
def __str__(self): return _dump_function_call(self.symbol, self.args, self.dargs)
[docs]class function_mapping(base_mapping): def __init__(self, symbol, return_val, *args, **dargs): super(function_mapping, self).__init__(symbol, return_val, *args, **dargs)
[docs] def and_return(self, return_obj): self.return_obj = return_obj
[docs] def and_raises(self, error): self.error = error
[docs]class function_any_args_mapping(function_mapping): """A mock function mapping that doesn't verify its arguments."""
[docs] def match(self, *args, **dargs): return True
[docs]class mock_function(object): def __init__(self, symbol, default_return_val=None, record=None, playback=None): self.default_return_val = default_return_val self.num_calls = 0 self.args = [] self.dargs = [] self.symbol = symbol self.record = record self.playback = playback self.__name__ = symbol def __call__(self, *args, **dargs): self.num_calls += 1 self.args.append(args) self.dargs.append(dargs) if self.playback: return self.playback(self.symbol, *args, **dargs) else: return self.default_return_val
[docs] def expect_call(self, *args, **dargs): mapping = function_mapping(self.symbol, None, *args, **dargs) if self.record: self.record(mapping) return mapping
[docs] def expect_any_call(self): """Like expect_call but don't give a hoot what arguments are passed.""" mapping = function_any_args_mapping(self.symbol, None) if self.record: self.record(mapping) return mapping
[docs]class mask_function(mock_function): def __init__(self, symbol, original_function, default_return_val=None, record=None, playback=None): super(mask_function, self).__init__(symbol, default_return_val, record, playback) self.original_function = original_function
[docs] def run_original_function(self, *args, **dargs): return self.original_function(*args, **dargs)
[docs]class mock_class(object): def __init__(self, cls, name, default_ret_val=None, record=None, playback=None): self.__name = name self.__record = record self.__playback = playback for symbol in dir(cls): if symbol.startswith("_"): continue orig_symbol = getattr(cls, symbol) if callable(orig_symbol): f_name = "%s.%s" % (self.__name, symbol) func = mock_function(f_name, default_ret_val, self.__record, self.__playback) setattr(self, symbol, func) else: setattr(self, symbol, orig_symbol) def __repr__(self): return '<mock_class: %s>' % self.__name
[docs]class mock_god(object): NONEXISTENT_ATTRIBUTE = object() def __init__(self, debug=False, fail_fast=True, ut=None): """ With debug=True, all recorded method calls will be printed as they happen. With fail_fast=True, unexpected calls will immediately cause an exception to be raised. With False, they will be silently recorded and only reported when check_playback() is called. """ self.recording = collections.deque() self.errors = [] self._stubs = [] self._debug = debug self._fail_fast = fail_fast self._ut = ut
[docs] def set_fail_fast(self, fail_fast): self._fail_fast = fail_fast
[docs] def create_mock_class_obj(self, cls, name, default_ret_val=None): record = self.__record_call playback = self.__method_playback errors = self.errors class cls_sub(cls): cls_count = 0 # overwrite the initializer def __init__(self, *args, **dargs): pass @classmethod def expect_new(typ, *args, **dargs): obj = typ.make_new(*args, **dargs) mapping = base_mapping(name, obj, *args, **dargs) record(mapping) return obj def __new__(typ, *args, **dargs): return playback(name, *args, **dargs) @classmethod def make_new(typ, *args, **dargs): # pylint: disable=E1003 super_class = super(cls_sub, typ) if isinstance(super_class, object): obj = super_class.__new__(typ) else: obj = super(cls_sub, typ).__new__(typ, *args, **dargs) typ.cls_count += 1 obj_name = "%s_%s" % (name, typ.cls_count) for symbol in dir(obj): if (symbol.startswith("__") and symbol.endswith("__")): continue if isinstance(getattr(typ, symbol, None), property): continue orig_symbol = getattr(obj, symbol) if callable(orig_symbol): f_name = ("%s.%s" % (obj_name, symbol)) func = mock_function(f_name, default_ret_val, record, playback) setattr(obj, symbol, func) else: setattr(obj, symbol, orig_symbol) return obj return cls_sub
[docs] def create_mock_class(self, cls, name, default_ret_val=None): """ Given something that defines a namespace cls (class, object, module), and a (hopefully unique) name, will create a mock_class object with that name and that possesses all the public attributes of cls. default_ret_val sets the default_ret_val on all methods of the cls mock. """ return mock_class(cls, name, default_ret_val, self.__record_call, self.__method_playback)
[docs] def create_mock_function(self, symbol, default_return_val=None): """ create a mock_function with name symbol and default return value of default_ret_val. """ return mock_function(symbol, default_return_val, self.__record_call, self.__method_playback)
[docs] def mock_up(self, obj, name, default_ret_val=None): """ Given an object (class instance or module) and a registration name, then replace all its methods with mock function objects (passing the orignal functions to the mock functions). """ for symbol in dir(obj): if symbol.startswith("__"): continue orig_symbol = getattr(obj, symbol) if callable(orig_symbol): f_name = "%s.%s" % (name, symbol) func = mask_function(f_name, orig_symbol, default_ret_val, self.__record_call, self.__method_playback) setattr(obj, symbol, func)
[docs] def stub_with(self, namespace, symbol, new_attribute): original_attribute = getattr(namespace, symbol, self.NONEXISTENT_ATTRIBUTE) # You only want to save the original attribute in cases where it is # directly associated with the object in question. In cases where # the attribute is actually inherited via some sort of hierarchy # you want to delete the stub (restoring the original structure) attribute_is_inherited = (hasattr(namespace, '__dict__') and symbol not in namespace.__dict__) if attribute_is_inherited: original_attribute = self.NONEXISTENT_ATTRIBUTE newstub = (namespace, symbol, original_attribute, new_attribute) self._stubs.append(newstub) setattr(namespace, symbol, new_attribute)
[docs] def stub_function(self, namespace, symbol): mock_attribute = self.create_mock_function(symbol) self.stub_with(namespace, symbol, mock_attribute)
[docs] def stub_class_method(self, cls, symbol): mock_attribute = self.create_mock_function(symbol) self.stub_with(cls, symbol, staticmethod(mock_attribute))
[docs] def stub_class(self, namespace, symbol): attr = getattr(namespace, symbol) mock_class = self.create_mock_class_obj(attr, symbol) self.stub_with(namespace, symbol, mock_class)
[docs] def stub_function_to_return(self, namespace, symbol, object_to_return): """Stub out a function with one that always returns a fixed value. :param namespace The namespace containing the function to stub out. :param symbol The attribute within the namespace to stub out. :param object_to_return The value that the stub should return whenever it is called. """ self.stub_with(namespace, symbol, lambda *args, **dargs: object_to_return)
def _perform_unstub(self, stub): namespace, symbol, orig_attr, new_attr = stub if orig_attr == self.NONEXISTENT_ATTRIBUTE: delattr(namespace, symbol) else: setattr(namespace, symbol, orig_attr)
[docs] def unstub(self, namespace, symbol): for stub in reversed(self._stubs): if (namespace, symbol) == (stub[0], stub[1]): self._perform_unstub(stub) self._stubs.remove(stub) return raise StubNotFoundError()
[docs] def unstub_all(self): self._stubs.reverse() for stub in self._stubs: self._perform_unstub(stub) self._stubs = []
def __method_playback(self, symbol, *args, **dargs): if self._debug: print >> sys.__stdout__, (' * Mock call: ' + _dump_function_call(symbol, args, dargs)) if len(self.recording) != 0: # self.recording is subscriptable (deque), ignore E1136 func_call = self.recording[0] # pylint: disable=E1136 if func_call.symbol != symbol: msg = ("Unexpected call: %s\nExpected: %s" % (_dump_function_call(symbol, args, dargs), func_call)) self._append_error(msg) return None if not func_call.match(*args, **dargs): msg = ("Incorrect call: %s\nExpected: %s" % (_dump_function_call(symbol, args, dargs), func_call)) self._append_error(msg) return None # this is the expected call so pop it and return self.recording.popleft() if func_call.error: raise func_call.error else: return func_call.return_obj else: msg = ("unexpected call: %s" % (_dump_function_call(symbol, args, dargs))) self._append_error(msg) return None def __record_call(self, mapping): self.recording.append(mapping) def _append_error(self, error): if self._debug: print >> sys.__stdout__, ' *** ' + error if self._fail_fast: raise CheckPlaybackError(error) self.errors.append(error)
[docs] def check_playback(self): """ Report any errors that were encounterd during calls to __method_playback(). """ if len(self.errors) > 0: if self._debug: print '\nPlayback errors:' for error in self.errors: print >> sys.__stdout__, error if self._ut: self._ut.fail('\n'.join(self.errors)) raise CheckPlaybackError elif len(self.recording) != 0: errors = [] for func_call in self.recording: error = "%s not called" % (func_call,) errors.append(error) print >> sys.__stdout__, error if self._ut: self._ut.fail('\n'.join(errors)) raise CheckPlaybackError self.recording.clear()
[docs] def mock_io(self): """Mocks and saves the stdout & stderr output""" self.orig_stdout = sys.stdout self.orig_stderr = sys.stderr self.mock_streams_stdout = StringIO.StringIO('') self.mock_streams_stderr = StringIO.StringIO('') sys.stdout = self.mock_streams_stdout sys.stderr = self.mock_streams_stderr
[docs] def unmock_io(self): """Restores the stdout & stderr, and returns both output strings""" sys.stdout = self.orig_stdout sys.stderr = self.orig_stderr values = (self.mock_streams_stdout.getvalue(), self.mock_streams_stderr.getvalue()) self.mock_streams_stdout.close() self.mock_streams_stderr.close() return values
def _arg_to_str(arg): if isinstance(arg, argument_comparator): return str(arg) return repr(arg) def _dump_function_call(symbol, args, dargs): arg_vec = [] for arg in args: arg_vec.append(_arg_to_str(arg)) for key, val in dargs.iteritems(): arg_vec.append("%s=%s" % (key, _arg_to_str(val))) return "%s(%s)" % (symbol, ', '.join(arg_vec))