diff options
author | Ronny Pfannschmidt <Ronny.Pfannschmidt@gmx.de> | 2012-01-21 17:03:12 +0100 |
---|---|---|
committer | Ronny Pfannschmidt <Ronny.Pfannschmidt@gmx.de> | 2012-01-21 17:03:12 +0100 |
commit | 7b691ee889f8f8b158e82c0c045d06ced871fca2 (patch) | |
tree | 12225340135e6216d5a4e89822610627010bbb8c /_pytest | |
parent | resuffle pypy.tool.version and add support for .hg_archival.txt, fixes issue952 (diff) | |
download | pypy-7b691ee889f8f8b158e82c0c045d06ced871fca2.tar.gz pypy-7b691ee889f8f8b158e82c0c045d06ced871fca2.tar.bz2 pypy-7b691ee889f8f8b158e82c0c045d06ced871fca2.zip |
syncronize pylib and pytest with current hg versions
Diffstat (limited to '_pytest')
-rw-r--r-- | _pytest/__init__.py | 2 | ||||
-rw-r--r-- | _pytest/assertion/__init__.py | 165 | ||||
-rw-r--r-- | _pytest/assertion/newinterpret.py | 2 | ||||
-rw-r--r-- | _pytest/assertion/oldinterpret.py | 2 | ||||
-rw-r--r-- | _pytest/assertion/rewrite.py | 322 | ||||
-rw-r--r-- | _pytest/capture.py | 56 | ||||
-rw-r--r-- | _pytest/config.py | 36 | ||||
-rw-r--r-- | _pytest/core.py | 36 | ||||
-rw-r--r-- | _pytest/helpconfig.py | 29 | ||||
-rw-r--r-- | _pytest/hookspec.py | 36 | ||||
-rw-r--r-- | _pytest/junitxml.py | 139 | ||||
-rw-r--r-- | _pytest/main.py | 80 | ||||
-rw-r--r-- | _pytest/mark.py | 102 | ||||
-rw-r--r-- | _pytest/monkeypatch.py | 23 | ||||
-rw-r--r-- | _pytest/nose.py | 1 | ||||
-rw-r--r-- | _pytest/pastebin.py | 6 | ||||
-rw-r--r-- | _pytest/pdb.py | 20 | ||||
-rw-r--r-- | _pytest/pytester.py | 119 | ||||
-rw-r--r-- | _pytest/python.py | 206 | ||||
-rw-r--r-- | _pytest/resultlog.py | 7 | ||||
-rw-r--r-- | _pytest/runner.py | 134 | ||||
-rw-r--r-- | _pytest/skipping.py | 25 | ||||
-rw-r--r-- | _pytest/terminal.py | 48 | ||||
-rw-r--r-- | _pytest/tmpdir.py | 4 | ||||
-rw-r--r-- | _pytest/unittest.py | 34 |
25 files changed, 1129 insertions, 505 deletions
diff --git a/_pytest/__init__.py b/_pytest/__init__.py index 596e2bcdf2..0a695c2035 100644 --- a/_pytest/__init__.py +++ b/_pytest/__init__.py @@ -1,2 +1,2 @@ # -__version__ = '2.1.0.dev4' +__version__ = '2.2.2.dev6' diff --git a/_pytest/assertion/__init__.py b/_pytest/assertion/__init__.py index e20e4e4b55..bac5f653d3 100644 --- a/_pytest/assertion/__init__.py +++ b/_pytest/assertion/__init__.py @@ -2,35 +2,25 @@ support for presenting detailed information in failing assertions. """ import py -import imp -import marshal -import struct import sys import pytest from _pytest.monkeypatch import monkeypatch -from _pytest.assertion import reinterpret, util - -try: - from _pytest.assertion.rewrite import rewrite_asserts -except ImportError: - rewrite_asserts = None -else: - import ast +from _pytest.assertion import util def pytest_addoption(parser): group = parser.getgroup("debugconfig") - group.addoption('--assertmode', action="store", dest="assertmode", - choices=("on", "old", "off", "default"), default="default", - metavar="on|old|off", + group.addoption('--assert', action="store", dest="assertmode", + choices=("rewrite", "reinterp", "plain",), + default="rewrite", metavar="MODE", help="""control assertion debugging tools. -'off' performs no assertion debugging. -'old' reinterprets the expressions in asserts to glean information. -'on' (the default) rewrites the assert statements in test modules to provide -sub-expression results.""") +'plain' performs no assertion debugging. +'reinterp' reinterprets assert statements after they failed to provide assertion expression information. +'rewrite' (the default) rewrites assert statements in test modules on import +to provide assert expression information. """) group.addoption('--no-assert', action="store_true", default=False, - dest="noassert", help="DEPRECATED equivalent to --assertmode=off") + dest="noassert", help="DEPRECATED equivalent to --assert=plain") group.addoption('--nomagic', action="store_true", default=False, - dest="nomagic", help="DEPRECATED equivalent to --assertmode=off") + dest="nomagic", help="DEPRECATED equivalent to --assert=plain") class AssertionState: """State for the assertion plugin.""" @@ -40,89 +30,90 @@ class AssertionState: self.trace = config.trace.root.get("assertion") def pytest_configure(config): - warn_about_missing_assertion() mode = config.getvalue("assertmode") if config.getvalue("noassert") or config.getvalue("nomagic"): - if mode not in ("off", "default"): - raise pytest.UsageError("assertion options conflict") - mode = "off" - elif mode == "default": - mode = "on" - if mode != "off": - def callbinrepr(op, left, right): - hook_result = config.hook.pytest_assertrepr_compare( - config=config, op=op, left=left, right=right) - for new_expl in hook_result: - if new_expl: - return '\n~'.join(new_expl) + mode = "plain" + if mode == "rewrite": + try: + import ast + except ImportError: + mode = "reinterp" + else: + if sys.platform.startswith('java'): + mode = "reinterp" + if mode != "plain": + _load_modules(mode) m = monkeypatch() config._cleanup.append(m.undo) m.setattr(py.builtin.builtins, 'AssertionError', reinterpret.AssertionError) - m.setattr(util, '_reprcompare', callbinrepr) - if mode == "on" and rewrite_asserts is None: - mode = "old" + hook = None + if mode == "rewrite": + hook = rewrite.AssertionRewritingHook() + sys.meta_path.append(hook) + warn_about_missing_assertion(mode) config._assertstate = AssertionState(config, mode) + config._assertstate.hook = hook config._assertstate.trace("configured with mode set to %r" % (mode,)) -def _write_pyc(co, source_path): - if hasattr(imp, "cache_from_source"): - # Handle PEP 3147 pycs. - pyc = py.path.local(imp.cache_from_source(str(source_path))) - pyc.ensure() - else: - pyc = source_path + "c" - mtime = int(source_path.mtime()) - fp = pyc.open("wb") - try: - fp.write(imp.get_magic()) - fp.write(struct.pack("<l", mtime)) - marshal.dump(co, fp) - finally: - fp.close() - return pyc +def pytest_unconfigure(config): + hook = config._assertstate.hook + if hook is not None: + sys.meta_path.remove(hook) -def before_module_import(mod): - if mod.config._assertstate.mode != "on": - return - # Some deep magic: load the source, rewrite the asserts, and write a - # fake pyc, so that it'll be loaded when the module is imported. - source = mod.fspath.read() - try: - tree = ast.parse(source) - except SyntaxError: - # Let this pop up again in the real import. - mod.config._assertstate.trace("failed to parse: %r" % (mod.fspath,)) - return - rewrite_asserts(tree) - try: - co = compile(tree, str(mod.fspath), "exec") - except SyntaxError: - # It's possible that this error is from some bug in the assertion - # rewriting, but I don't know of a fast way to tell. - mod.config._assertstate.trace("failed to compile: %r" % (mod.fspath,)) - return - mod._pyc = _write_pyc(co, mod.fspath) - mod.config._assertstate.trace("wrote pyc: %r" % (mod._pyc,)) +def pytest_collection(session): + # this hook is only called when test modules are collected + # so for example not in the master process of pytest-xdist + # (which does not collect test modules) + hook = session.config._assertstate.hook + if hook is not None: + hook.set_session(session) -def after_module_import(mod): - if not hasattr(mod, "_pyc"): - return - state = mod.config._assertstate - try: - mod._pyc.remove() - except py.error.ENOENT: - state.trace("couldn't find pyc: %r" % (mod._pyc,)) - else: - state.trace("removed pyc: %r" % (mod._pyc,)) +def pytest_runtest_setup(item): + def callbinrepr(op, left, right): + hook_result = item.ihook.pytest_assertrepr_compare( + config=item.config, op=op, left=left, right=right) + for new_expl in hook_result: + if new_expl: + res = '\n~'.join(new_expl) + if item.config.getvalue("assertmode") == "rewrite": + # The result will be fed back a python % formatting + # operation, which will fail if there are extraneous + # '%'s in the string. Escape them here. + res = res.replace("%", "%%") + return res + util._reprcompare = callbinrepr + +def pytest_runtest_teardown(item): + util._reprcompare = None + +def pytest_sessionfinish(session): + hook = session.config._assertstate.hook + if hook is not None: + hook.session = None -def warn_about_missing_assertion(): +def _load_modules(mode): + """Lazily import assertion related code.""" + global rewrite, reinterpret + from _pytest.assertion import reinterpret + if mode == "rewrite": + from _pytest.assertion import rewrite + +def warn_about_missing_assertion(mode): try: assert False except AssertionError: pass else: - sys.stderr.write("WARNING: failing tests may report as passing because " - "assertions are turned off! (are you using python -O?)\n") + if mode == "rewrite": + specifically = ("assertions which are not in test modules " + "will be ignored") + else: + specifically = "failing tests may report as passing" + + sys.stderr.write("WARNING: " + specifically + + " because assert statements are not executed " + "by the underlying Python interpreter " + "(are you using python -O?)\n") pytest_assertrepr_compare = util.assertrepr_compare diff --git a/_pytest/assertion/newinterpret.py b/_pytest/assertion/newinterpret.py index 2696abda74..de03eaf8b0 100644 --- a/_pytest/assertion/newinterpret.py +++ b/_pytest/assertion/newinterpret.py @@ -53,7 +53,7 @@ def interpret(source, frame, should_fail=False): if should_fail: return ("(assertion failed, but when it was re-run for " "printing intermediate values, it did not fail. Suggestions: " - "compute assert expression before the assert or use --no-assert)") + "compute assert expression before the assert or use --assert=plain)") def run(offending_line, frame=None): if frame is None: diff --git a/_pytest/assertion/oldinterpret.py b/_pytest/assertion/oldinterpret.py index e109871415..3ca12ec2b0 100644 --- a/_pytest/assertion/oldinterpret.py +++ b/_pytest/assertion/oldinterpret.py @@ -482,7 +482,7 @@ def interpret(source, frame, should_fail=False): if should_fail: return ("(assertion failed, but when it was re-run for " "printing intermediate values, it did not fail. Suggestions: " - "compute assert expression before the assert or use --nomagic)") + "compute assert expression before the assert or use --assert=plain)") else: return None diff --git a/_pytest/assertion/rewrite.py b/_pytest/assertion/rewrite.py index 6c9067525a..cf295ef1be 100644 --- a/_pytest/assertion/rewrite.py +++ b/_pytest/assertion/rewrite.py @@ -2,13 +2,258 @@ import ast import collections +import errno import itertools +import imp +import marshal +import os +import struct import sys +import types import py from _pytest.assertion import util +# Windows gives ENOENT in places *nix gives ENOTDIR. +if sys.platform.startswith("win"): + PATH_COMPONENT_NOT_DIR = errno.ENOENT +else: + PATH_COMPONENT_NOT_DIR = errno.ENOTDIR + +# py.test caches rewritten pycs in __pycache__. +if hasattr(imp, "get_tag"): + PYTEST_TAG = imp.get_tag() + "-PYTEST" +else: + if hasattr(sys, "pypy_version_info"): + impl = "pypy" + elif sys.platform == "java": + impl = "jython" + else: + impl = "cpython" + ver = sys.version_info + PYTEST_TAG = "%s-%s%s-PYTEST" % (impl, ver[0], ver[1]) + del ver, impl + +PYC_EXT = ".py" + "c" if __debug__ else "o" +PYC_TAIL = "." + PYTEST_TAG + PYC_EXT + +REWRITE_NEWLINES = sys.version_info[:2] != (2, 7) and sys.version_info < (3, 2) + +class AssertionRewritingHook(object): + """Import hook which rewrites asserts.""" + + def __init__(self): + self.session = None + self.modules = {} + + def set_session(self, session): + self.fnpats = session.config.getini("python_files") + self.session = session + + def find_module(self, name, path=None): + if self.session is None: + return None + sess = self.session + state = sess.config._assertstate + state.trace("find_module called for: %s" % name) + names = name.rsplit(".", 1) + lastname = names[-1] + pth = None + if path is not None and len(path) == 1: + pth = path[0] + if pth is None: + try: + fd, fn, desc = imp.find_module(lastname, path) + except ImportError: + return None + if fd is not None: + fd.close() + tp = desc[2] + if tp == imp.PY_COMPILED: + if hasattr(imp, "source_from_cache"): + fn = imp.source_from_cache(fn) + else: + fn = fn[:-1] + elif tp != imp.PY_SOURCE: + # Don't know what this is. + return None + else: + fn = os.path.join(pth, name.rpartition(".")[2] + ".py") + fn_pypath = py.path.local(fn) + # Is this a test file? + if not sess.isinitpath(fn): + # We have to be very careful here because imports in this code can + # trigger a cycle. + self.session = None + try: + for pat in self.fnpats: + if fn_pypath.fnmatch(pat): + state.trace("matched test file %r" % (fn,)) + break + else: + return None + finally: + self.session = sess + else: + state.trace("matched test file (was specified on cmdline): %r" % (fn,)) + # The requested module looks like a test file, so rewrite it. This is + # the most magical part of the process: load the source, rewrite the + # asserts, and load the rewritten source. We also cache the rewritten + # module code in a special pyc. We must be aware of the possibility of + # concurrent py.test processes rewriting and loading pycs. To avoid + # tricky race conditions, we maintain the following invariant: The + # cached pyc is always a complete, valid pyc. Operations on it must be + # atomic. POSIX's atomic rename comes in handy. + write = not sys.dont_write_bytecode + cache_dir = os.path.join(fn_pypath.dirname, "__pycache__") + if write: + try: + os.mkdir(cache_dir) + except OSError: + e = sys.exc_info()[1].errno + if e == errno.EEXIST: + # Either the __pycache__ directory already exists (the + # common case) or it's blocked by a non-dir node. In the + # latter case, we'll ignore it in _write_pyc. + pass + elif e == PATH_COMPONENT_NOT_DIR: + # One of the path components was not a directory, likely + # because we're in a zip file. + write = False + elif e == errno.EACCES: + state.trace("read only directory: %r" % (fn_pypath.dirname,)) + write = False + else: + raise + cache_name = fn_pypath.basename[:-3] + PYC_TAIL + pyc = os.path.join(cache_dir, cache_name) + # Notice that even if we're in a read-only directory, I'm going to check + # for a cached pyc. This may not be optimal... + co = _read_pyc(fn_pypath, pyc) + if co is None: + state.trace("rewriting %r" % (fn,)) + co = _rewrite_test(state, fn_pypath) + if co is None: + # Probably a SyntaxError in the test. + return None + if write: + _make_rewritten_pyc(state, fn_pypath, pyc, co) + else: + state.trace("found cached rewritten pyc for %r" % (fn,)) + self.modules[name] = co, pyc + return self + + def load_module(self, name): + co, pyc = self.modules.pop(name) + # I wish I could just call imp.load_compiled here, but __file__ has to + # be set properly. In Python 3.2+, this all would be handled correctly + # by load_compiled. + mod = sys.modules[name] = imp.new_module(name) + try: + mod.__file__ = co.co_filename + # Normally, this attribute is 3.2+. + mod.__cached__ = pyc + py.builtin.exec_(co, mod.__dict__) + except: + del sys.modules[name] + raise + return sys.modules[name] + +def _write_pyc(co, source_path, pyc): + # Technically, we don't have to have the same pyc format as (C)Python, since + # these "pycs" should never be seen by builtin import. However, there's + # little reason deviate, and I hope sometime to be able to use + # imp.load_compiled to load them. (See the comment in load_module above.) + mtime = int(source_path.mtime()) + try: + fp = open(pyc, "wb") + except IOError: + err = sys.exc_info()[1].errno + if err == PATH_COMPONENT_NOT_DIR: + # This happens when we get a EEXIST in find_module creating the + # __pycache__ directory and __pycache__ is by some non-dir node. + return False + raise + try: + fp.write(imp.get_magic()) + fp.write(struct.pack("<l", mtime)) + marshal.dump(co, fp) + finally: + fp.close() + return True + +RN = "\r\n".encode("utf-8") +N = "\n".encode("utf-8") + +def _rewrite_test(state, fn): + """Try to read and rewrite *fn* and return the code object.""" + try: + source = fn.read("rb") + except EnvironmentError: + return None + # On Python versions which are not 2.7 and less than or equal to 3.1, the + # parser expects *nix newlines. + if REWRITE_NEWLINES: + source = source.replace(RN, N) + N + try: + tree = ast.parse(source) + except SyntaxError: + # Let this pop up again in the real import. + state.trace("failed to parse: %r" % (fn,)) + return None + rewrite_asserts(tree) + try: + co = compile(tree, fn.strpath, "exec") + except SyntaxError: + # It's possible that this error is from some bug in the + # assertion rewriting, but I don't know of a fast way to tell. + state.trace("failed to compile: %r" % (fn,)) + return None + return co + +def _make_rewritten_pyc(state, fn, pyc, co): + """Try to dump rewritten code to *pyc*.""" + if sys.platform.startswith("win"): + # Windows grants exclusive access to open files and doesn't have atomic + # rename, so just write into the final file. + _write_pyc(co, fn, pyc) + else: + # When not on windows, assume rename is atomic. Dump the code object + # into a file specific to this process and atomically replace it. + proc_pyc = pyc + "." + str(os.getpid()) + if _write_pyc(co, fn, proc_pyc): + os.rename(proc_pyc, pyc) + +def _read_pyc(source, pyc): + """Possibly read a py.test pyc containing rewritten code. + + Return rewritten code if successful or None if not. + """ + try: + fp = open(pyc, "rb") + except IOError: + return None + try: + try: + mtime = int(source.mtime()) + data = fp.read(8) + except EnvironmentError: + return None + # Check for invalid or out of date pyc file. + if (len(data) != 8 or + data[:4] != imp.get_magic() or + struct.unpack("<l", data[4:])[0] != mtime): + return None + co = marshal.load(fp) + if not isinstance(co, types.CodeType): + # That's interesting.... + return None + return co + finally: + fp.close() + + def rewrite_asserts(mod): """Rewrite the assert statements in mod.""" AssertionRewriter().run(mod) @@ -17,13 +262,8 @@ def rewrite_asserts(mod): _saferepr = py.io.saferepr from _pytest.assertion.util import format_explanation as _format_explanation -def _format_boolop(operands, explanations, is_or): - show_explanations = [] - for operand, expl in zip(operands, explanations): - show_explanations.append(expl) - if operand == is_or: - break - return "(" + (is_or and " or " or " and ").join(show_explanations) + ")" +def _format_boolop(explanations, is_or): + return "(" + (is_or and " or " or " and ").join(explanations) + ")" def _call_reprcompare(ops, results, expls, each_obj): for i, res, expl in zip(range(len(ops)), results, expls): @@ -109,8 +349,8 @@ class AssertionRewriter(ast.NodeVisitor): return lineno += len(doc) - 1 expect_docstring = False - elif (not isinstance(item, ast.ImportFrom) or item.level > 0 and - item.identifier != "__future__"): + elif (not isinstance(item, ast.ImportFrom) or item.level > 0 or + item.module != "__future__"): lineno = item.lineno break pos += 1 @@ -118,9 +358,9 @@ class AssertionRewriter(ast.NodeVisitor): for alias in aliases] mod.body[pos:pos] = imports # Collect asserts. - nodes = collections.deque([mod]) + nodes = [mod] while nodes: - node = nodes.popleft() + node = nodes.pop() for name, field in ast.iter_fields(node): if isinstance(field, list): new = [] @@ -143,7 +383,7 @@ class AssertionRewriter(ast.NodeVisitor): """Get a new variable.""" # Use a character invalid in python identifiers to avoid clashing. name = "@py_assert" + str(next(self.variable_counter)) - self.variables.add(name) + self.variables.append(name) return name def assign(self, expr): @@ -198,7 +438,8 @@ class AssertionRewriter(ast.NodeVisitor): # There's already a message. Don't mess with it. return [assert_] self.statements = [] - self.variables = set() + self.cond_chain = () + self.variables = [] self.variable_counter = itertools.count() self.stack = [] self.on_failure = [] @@ -220,11 +461,11 @@ class AssertionRewriter(ast.NodeVisitor): else: raise_ = ast.Raise(exc, None, None) body.append(raise_) - # Delete temporary variables. - names = [ast.Name(name, ast.Del()) for name in self.variables] - if names: - delete = ast.Delete(names) - self.statements.append(delete) + # Clear temporary variables by setting them to None. + if self.variables: + variables = [ast.Name(name, ast.Store()) for name in self.variables] + clear = ast.Assign(variables, ast.Name("None", ast.Load())) + self.statements.append(clear) # Fix line numbers. for stmt in self.statements: set_location(stmt, assert_.lineno, assert_.col_offset) @@ -240,21 +481,38 @@ class AssertionRewriter(ast.NodeVisitor): return name, self.explanation_param(expr) def visit_BoolOp(self, boolop): - operands = [] - explanations = [] + res_var = self.variable() + expl_list = self.assign(ast.List([], ast.Load())) + app = ast.Attribute(expl_list, "append", ast.Load()) + is_or = int(isinstance(boolop.op, ast.Or)) + body = save = self.statements + fail_save = self.on_failure + levels = len(boolop.values) - 1 self.push_format_context() - for operand in boolop.values: - res, explanation = self.visit(operand) - operands.append(res) - explanations.append(explanation) - expls = ast.Tuple([ast.Str(expl) for expl in explanations], ast.Load()) - is_or = ast.Num(isinstance(boolop.op, ast.Or)) - expl_template = self.helper("format_boolop", - ast.Tuple(operands, ast.Load()), expls, - is_or) + # Process each operand, short-circuting if needed. + for i, v in enumerate(boolop.values): + if i: + fail_inner = [] + self.on_failure.append(ast.If(cond, fail_inner, [])) + self.on_failure = fail_inner + self.push_format_context() + res, expl = self.visit(v) + body.append(ast.Assign([ast.Name(res_var, ast.Store())], res)) + expl_format = self.pop_format_context(ast.Str(expl)) + call = ast.Call(app, [expl_format], [], None, None) + self.on_failure.append(ast.Expr(call)) + if i < levels: + cond = res + if is_or: + cond = ast.UnaryOp(ast.Not(), cond) + inner = [] + self.statements.append(ast.If(cond, inner, [])) + self.statements = body = inner + self.statements = save + self.on_failure = fail_save + expl_template = self.helper("format_boolop", expl_list, ast.Num(is_or)) expl = self.pop_format_context(expl_template) - res = self.assign(ast.BoolOp(boolop.op, operands)) - return res, self.explanation_param(expl) + return ast.Name(res_var, ast.Load()), self.explanation_param(expl) def visit_UnaryOp(self, unary): pattern = unary_map[unary.op.__class__] @@ -288,7 +546,7 @@ class AssertionRewriter(ast.NodeVisitor): new_star, expl = self.visit(call.starargs) arg_expls.append("*" + expl) if call.kwargs: - new_kwarg, expl = self.visit(call.kwarg) + new_kwarg, expl = self.visit(call.kwargs) arg_expls.append("**" + expl) expl = "%s(%s)" % (func_expl, ', '.join(arg_expls)) new_call = ast.Call(new_func, new_args, new_kwargs, new_star, new_kwarg) diff --git a/_pytest/capture.py b/_pytest/capture.py index b7594be319..dad9c73fe7 100644 --- a/_pytest/capture.py +++ b/_pytest/capture.py @@ -11,22 +11,22 @@ def pytest_addoption(parser): group._addoption('-s', action="store_const", const="no", dest="capture", help="shortcut for --capture=no.") +@pytest.mark.tryfirst +def pytest_cmdline_parse(pluginmanager, args): + # we want to perform capturing already for plugin/conftest loading + if '-s' in args or "--capture=no" in args: + method = "no" + elif hasattr(os, 'dup') and '--capture=sys' not in args: + method = "fd" + else: + method = "sys" + capman = CaptureManager(method) + pluginmanager.register(capman, "capturemanager") + def addouterr(rep, outerr): - repr = getattr(rep, 'longrepr', None) - if not hasattr(repr, 'addsection'): - return for secname, content in zip(["out", "err"], outerr): if content: - repr.addsection("Captured std%s" % secname, content.rstrip()) - -def pytest_unconfigure(config): - # registered in config.py during early conftest.py loading - capman = config.pluginmanager.getplugin('capturemanager') - while capman._method2capture: - name, cap = capman._method2capture.popitem() - # XXX logging module may wants to close it itself on process exit - # otherwise we could do finalization here and call "reset()". - cap.suspend() + rep.sections.append(("Captured std%s" % secname, content)) class NoCapture: def startall(self): @@ -39,8 +39,9 @@ class NoCapture: return "", "" class CaptureManager: - def __init__(self): + def __init__(self, defaultmethod=None): self._method2capture = {} + self._defaultmethod = defaultmethod def _maketempfile(self): f = py.std.tempfile.TemporaryFile() @@ -65,14 +66,6 @@ class CaptureManager: else: raise ValueError("unknown capturing method: %r" % method) - def _getmethod_preoptionparse(self, args): - if '-s' in args or "--capture=no" in args: - return "no" - elif hasattr(os, 'dup') and '--capture=sys' not in args: - return "fd" - else: - return "sys" - def _getmethod(self, config, fspath): if config.option.capture: method = config.option.capture @@ -85,16 +78,22 @@ class CaptureManager: method = "sys" return method + def reset_capturings(self): + for name, cap in self._method2capture.items(): + cap.reset() + def resumecapture_item(self, item): method = self._getmethod(item.config, item.fspath) if not hasattr(item, 'outerr'): item.outerr = ('', '') # we accumulate outerr on the item return self.resumecapture(method) - def resumecapture(self, method): + def resumecapture(self, method=None): if hasattr(self, '_capturing'): raise ValueError("cannot resume, already capturing with %r" % (self._capturing,)) + if method is None: + method = self._defaultmethod cap = self._method2capture.get(method) self._capturing = method if cap is None: @@ -164,17 +163,6 @@ class CaptureManager: def pytest_runtest_teardown(self, item): self.resumecapture_item(item) - def pytest__teardown_final(self, __multicall__, session): - method = self._getmethod(session.config, None) - self.resumecapture(method) - try: - rep = __multicall__.execute() - finally: - outerr = self.suspendcapture() - if rep: - addouterr(rep, outerr) - return rep - def pytest_keyboard_interrupt(self, excinfo): if hasattr(self, '_capturing'): self.suspendcapture() diff --git a/_pytest/config.py b/_pytest/config.py index 6e0cfea1a3..da595fef4b 100644 --- a/_pytest/config.py +++ b/_pytest/config.py @@ -8,13 +8,15 @@ import pytest def pytest_cmdline_parse(pluginmanager, args): config = Config(pluginmanager) config.parse(args) - if config.option.debug: - config.trace.root.setwriter(sys.stderr.write) return config def pytest_unconfigure(config): - for func in config._cleanup: - func() + while 1: + try: + fin = config._cleanup.pop() + except IndexError: + break + fin() class Parser: """ Parser for command line arguments. """ @@ -81,6 +83,7 @@ class Parser: self._inidict[name] = (help, type, default) self._ininames.append(name) + class OptionGroup: def __init__(self, name, description="", parser=None): self.name = name @@ -256,11 +259,14 @@ class Config(object): self.hook = self.pluginmanager.hook self._inicache = {} self._cleanup = [] - + @classmethod def fromdictargs(cls, option_dict, args): """ constructor useable for subprocesses. """ config = cls() + # XXX slightly crude way to initialize capturing + import _pytest.capture + _pytest.capture.pytest_cmdline_parse(config.pluginmanager, args) config._preparse(args, addopts=False) config.option.__dict__.update(option_dict) for x in config.option.plugins: @@ -285,11 +291,10 @@ class Config(object): def _setinitialconftest(self, args): # capture output during conftest init (#issue93) - from _pytest.capture import CaptureManager - capman = CaptureManager() - self.pluginmanager.register(capman, 'capturemanager') - # will be unregistered in capture.py's unconfigure() - capman.resumecapture(capman._getmethod_preoptionparse(args)) + # XXX introduce load_conftest hook to avoid needing to know + # about capturing plugin here + capman = self.pluginmanager.getplugin("capturemanager") + capman.resumecapture() try: try: self._conftest.setinitial(args) @@ -334,6 +339,7 @@ class Config(object): # Note that this can only be called once per testing process. assert not hasattr(self, 'args'), ( "can only parse cmdline args at most once per Config object") + self._origargs = args self._preparse(args) self._parser.hints.extend(self.pluginmanager._hints) args = self._parser.parse_setoption(args, self.option) @@ -341,6 +347,14 @@ class Config(object): args.append(py.std.os.getcwd()) self.args = args + def addinivalue_line(self, name, line): + """ add a line to an ini-file option. The option must have been + declared but might not yet be set in which case the line becomes the + the first line in its value. """ + x = self.getini(name) + assert isinstance(x, list) + x.append(line) # modifies the cached list inline + def getini(self, name): """ return configuration value from an ini file. If the specified name hasn't been registered through a prior ``parse.addini`` @@ -422,7 +436,7 @@ class Config(object): def getcfg(args, inibasenames): - args = [x for x in args if str(x)[0] != "-"] + args = [x for x in args if not str(x).startswith("-")] if not args: args = [py.path.local()] for arg in args: diff --git a/_pytest/core.py b/_pytest/core.py index 2087a84015..eabade7ec4 100644 --- a/_pytest/core.py +++ b/_pytest/core.py @@ -16,11 +16,10 @@ default_plugins = ( "junitxml resultlog doctest").split() class TagTracer: - def __init__(self, prefix="[pytest] "): + def __init__(self): self._tag2proc = {} self.writer = None self.indent = 0 - self.prefix = prefix def get(self, name): return TagTracerSub(self, (name,)) @@ -30,7 +29,7 @@ class TagTracer: if args: indent = " " * self.indent content = " ".join(map(str, args)) - self.writer("%s%s%s\n" %(self.prefix, indent, content)) + self.writer("%s%s [%s]\n" %(indent, content, ":".join(tags))) try: self._tag2proc[tags](tags, args) except KeyError: @@ -212,6 +211,14 @@ class PluginManager(object): self.register(mod, modname) self.consider_module(mod) + def pytest_configure(self, config): + config.addinivalue_line("markers", + "tryfirst: mark a hook implementation function such that the " + "plugin machinery will try to call it first/as early as possible.") + config.addinivalue_line("markers", + "trylast: mark a hook implementation function such that the " + "plugin machinery will try to call it last/as late as possible.") + def pytest_plugin_registered(self, plugin): import pytest dic = self.call_plugin(plugin, "pytest_namespace", {}) or {} @@ -432,10 +439,7 @@ _preinit = [] def _preloadplugins(): _preinit.append(PluginManager(load=True)) -def main(args=None, plugins=None): - """ returned exit code integer, after an in-process testing run - with the given command line arguments, preloading an optional list - of passed in plugin objects. """ +def _prepareconfig(args=None, plugins=None): if args is None: args = sys.argv[1:] elif isinstance(args, py.path.local): @@ -449,13 +453,19 @@ def main(args=None, plugins=None): else: # subsequent calls to main will create a fresh instance _pluginmanager = PluginManager(load=True) hook = _pluginmanager.hook + if plugins: + for plugin in plugins: + _pluginmanager.register(plugin) + return hook.pytest_cmdline_parse( + pluginmanager=_pluginmanager, args=args) + +def main(args=None, plugins=None): + """ returned exit code integer, after an in-process testing run + with the given command line arguments, preloading an optional list + of passed in plugin objects. """ try: - if plugins: - for plugin in plugins: - _pluginmanager.register(plugin) - config = hook.pytest_cmdline_parse( - pluginmanager=_pluginmanager, args=args) - exitstatus = hook.pytest_cmdline_main(config=config) + config = _prepareconfig(args, plugins) + exitstatus = config.hook.pytest_cmdline_main(config=config) except UsageError: e = sys.exc_info()[1] sys.stderr.write("ERROR: %s\n" %(e.args[0],)) diff --git a/_pytest/helpconfig.py b/_pytest/helpconfig.py index fa81f87e9f..29a3f5b2e9 100644 --- a/_pytest/helpconfig.py +++ b/_pytest/helpconfig.py @@ -1,7 +1,7 @@ """ version info, help messages, tracing configuration. """ import py import pytest -import inspect, sys +import os, inspect, sys from _pytest.core import varnames def pytest_addoption(parser): @@ -18,7 +18,29 @@ def pytest_addoption(parser): help="trace considerations of conftest.py files."), group.addoption('--debug', action="store_true", dest="debug", default=False, - help="generate and show internal debugging information.") + help="store internal tracing debug information in 'pytestdebug.log'.") + + +def pytest_cmdline_parse(__multicall__): + config = __multicall__.execute() + if config.option.debug: + path = os.path.abspath("pytestdebug.log") + f = open(path, 'w') + config._debugfile = f + f.write("versions pytest-%s, py-%s, python-%s\ncwd=%s\nargs=%s\n\n" %( + pytest.__version__, py.__version__, ".".join(map(str, sys.version_info)), + os.getcwd(), config._origargs)) + config.trace.root.setwriter(f.write) + sys.stderr.write("writing pytestdebug information to %s\n" % path) + return config + +@pytest.mark.trylast +def pytest_unconfigure(config): + if hasattr(config, '_debugfile'): + config._debugfile.close() + sys.stderr.write("wrote pytestdebug information to %s\n" % + config._debugfile.name) + config.trace.root.setwriter(None) def pytest_cmdline_main(config): @@ -34,6 +56,7 @@ def pytest_cmdline_main(config): elif config.option.help: config.pluginmanager.do_configure(config) showhelp(config) + config.pluginmanager.do_unconfigure(config) return 0 def showhelp(config): @@ -91,7 +114,7 @@ def pytest_report_header(config): verinfo = getpluginversioninfo(config) if verinfo: lines.extend(verinfo) - + if config.option.traceconfig: lines.append("active plugins:") plugins = [] diff --git a/_pytest/hookspec.py b/_pytest/hookspec.py index 580ab27997..b05e33421f 100644 --- a/_pytest/hookspec.py +++ b/_pytest/hookspec.py @@ -121,16 +121,23 @@ def pytest_generate_tests(metafunc): def pytest_itemstart(item, node=None): """ (deprecated, use pytest_runtest_logstart). """ -def pytest_runtest_protocol(item): - """ implements the standard runtest_setup/call/teardown protocol including - capturing exceptions and calling reporting hooks on the results accordingly. +def pytest_runtest_protocol(item, nextitem): + """ implements the runtest_setup/call/teardown protocol for + the given test item, including capturing exceptions and calling + reporting hooks. + + :arg item: test item for which the runtest protocol is performed. + + :arg nexitem: the scheduled-to-be-next test item (or None if this + is the end my friend). This argument is passed on to + :py:func:`pytest_runtest_teardown`. :return boolean: True if no further hook implementations should be invoked. """ pytest_runtest_protocol.firstresult = True def pytest_runtest_logstart(nodeid, location): - """ signal the start of a test run. """ + """ signal the start of running a single test item. """ def pytest_runtest_setup(item): """ called before ``pytest_runtest_call(item)``. """ @@ -138,8 +145,14 @@ def pytest_runtest_setup(item): def pytest_runtest_call(item): """ called to execute the test ``item``. """ -def pytest_runtest_teardown(item): - """ called after ``pytest_runtest_call``. """ +def pytest_runtest_teardown(item, nextitem): + """ called after ``pytest_runtest_call``. + + :arg nexitem: the scheduled-to-be-next test item (None if no further + test item is scheduled). This argument can be used to + perform exact teardowns, i.e. calling just enough finalizers + so that nextitem only needs to call setup-functions. + """ def pytest_runtest_makereport(item, call): """ return a :py:class:`_pytest.runner.TestReport` object @@ -149,15 +162,8 @@ def pytest_runtest_makereport(item, call): pytest_runtest_makereport.firstresult = True def pytest_runtest_logreport(report): - """ process item test report. """ - -# special handling for final teardown - somewhat internal for now -def pytest__teardown_final(session): - """ called before test session finishes. """ -pytest__teardown_final.firstresult = True - -def pytest__teardown_final_logerror(report, session): - """ called if runtest_teardown_final failed. """ + """ process a test setup/call/teardown report relating to + the respective phase of executing a test. """ # ------------------------------------------------------------------------- # test session related hooks diff --git a/_pytest/junitxml.py b/_pytest/junitxml.py index c7a089cce5..1c4bbd82e7 100644 --- a/_pytest/junitxml.py +++ b/_pytest/junitxml.py @@ -25,6 +25,10 @@ except NameError: long = int +class Junit(py.xml.Namespace): + pass + + # We need to get the subset of the invalid unicode ranges according to # XML 1.0 which are valid in this python build. Hence we calculate # this dynamically instead of hardcoding it. The spec range of valid @@ -40,6 +44,14 @@ illegal_xml_re = re.compile(unicode('[%s]') % del _illegal_unichrs del _illegal_ranges +def bin_xml_escape(arg): + def repl(matchobj): + i = ord(matchobj.group()) + if i <= 0xFF: + return unicode('#x%02X') % i + else: + return unicode('#x%04X') % i + return illegal_xml_re.sub(repl, py.xml.escape(arg)) def pytest_addoption(parser): group = parser.getgroup("terminal reporting") @@ -68,117 +80,97 @@ class LogXML(object): logfile = os.path.expanduser(os.path.expandvars(logfile)) self.logfile = os.path.normpath(logfile) self.prefix = prefix - self.test_logs = [] + self.tests = [] self.passed = self.skipped = 0 self.failed = self.errors = 0 - self._durations = {} def _opentestcase(self, report): names = report.nodeid.split("::") names[0] = names[0].replace("/", '.') - names = tuple(names) - d = {'time': self._durations.pop(report.nodeid, "0")} names = [x.replace(".py", "") for x in names if x != "()"] classnames = names[:-1] if self.prefix: classnames.insert(0, self.prefix) - d['classname'] = ".".join(classnames) - d['name'] = py.xml.escape(names[-1]) - attrs = ['%s="%s"' % item for item in sorted(d.items())] - self.test_logs.append("\n<testcase %s>" % " ".join(attrs)) - - def _closetestcase(self): - self.test_logs.append("</testcase>") - - def appendlog(self, fmt, *args): - def repl(matchobj): - i = ord(matchobj.group()) - if i <= 0xFF: - return unicode('#x%02X') % i - else: - return unicode('#x%04X') % i - args = tuple([illegal_xml_re.sub(repl, py.xml.escape(arg)) - for arg in args]) - self.test_logs.append(fmt % args) + self.tests.append(Junit.testcase( + classname=".".join(classnames), + name=names[-1], + time=getattr(report, 'duration', 0) + )) + + def append(self, obj): + self.tests[-1].append(obj) def append_pass(self, report): self.passed += 1 - self._opentestcase(report) - self._closetestcase() def append_failure(self, report): - self._opentestcase(report) #msg = str(report.longrepr.reprtraceback.extraline) if "xfail" in report.keywords: - self.appendlog( - '<skipped message="xfail-marked test passes unexpectedly"/>') + self.append( + Junit.skipped(message="xfail-marked test passes unexpectedly")) self.skipped += 1 else: - self.appendlog('<failure message="test failure">%s</failure>', - report.longrepr) + sec = dict(report.sections) + fail = Junit.failure(message="test failure") + fail.append(str(report.longrepr)) + self.append(fail) + for name in ('out', 'err'): + content = sec.get("Captured std%s" % name) + if content: + tag = getattr(Junit, 'system-'+name) + self.append(tag(bin_xml_escape(content))) self.failed += 1 - self._closetestcase() def append_collect_failure(self, report): - self._opentestcase(report) #msg = str(report.longrepr.reprtraceback.extraline) - self.appendlog('<failure message="collection failure">%s</failure>', - report.longrepr) - self._closetestcase() + self.append(Junit.failure(str(report.longrepr), + message="collection failure")) self.errors += 1 def append_collect_skipped(self, report): - self._opentestcase(report) #msg = str(report.longrepr.reprtraceback.extraline) - self.appendlog('<skipped message="collection skipped">%s</skipped>', - report.longrepr) - self._closetestcase() + self.append(Junit.skipped(str(report.longrepr), + message="collection skipped")) self.skipped += 1 def append_error(self, report): - self._opentestcase(report) - self.appendlog('<error message="test setup failure">%s</error>', - report.longrepr) - self._closetestcase() + self.append(Junit.error(str(report.longrepr), + message="test setup failure")) self.errors += 1 def append_skipped(self, report): - self._opentestcase(report) if "xfail" in report.keywords: - self.appendlog( - '<skipped message="expected test failure">%s</skipped>', - report.keywords['xfail']) + self.append(Junit.skipped(str(report.keywords['xfail']), + message="expected test failure")) else: filename, lineno, skipreason = report.longrepr if skipreason.startswith("Skipped: "): skipreason = skipreason[9:] - self.appendlog('<skipped type="pytest.skip" ' - 'message="%s">%s</skipped>', - skipreason, "%s:%s: %s" % report.longrepr, - ) - self._closetestcase() + self.append( + Junit.skipped("%s:%s: %s" % report.longrepr, + type="pytest.skip", + message=skipreason + )) self.skipped += 1 def pytest_runtest_logreport(self, report): if report.passed: - self.append_pass(report) + if report.when == "call": # ignore setup/teardown + self._opentestcase(report) + self.append_pass(report) elif report.failed: + self._opentestcase(report) if report.when != "call": self.append_error(report) else: self.append_failure(report) elif report.skipped: + self._opentestcase(report) self.append_skipped(report) - def pytest_runtest_call(self, item, __multicall__): - start = time.time() - try: - return __multicall__.execute() - finally: - self._durations[item.nodeid] = time.time() - start - def pytest_collectreport(self, report): if not report.passed: + self._opentestcase(report) if report.failed: self.append_collect_failure(report) else: @@ -187,10 +179,11 @@ class LogXML(object): def pytest_internalerror(self, excrepr): self.errors += 1 data = py.xml.escape(excrepr) - self.test_logs.append( - '\n<testcase classname="pytest" name="internal">' - ' <error message="internal error">' - '%s</error></testcase>' % data) + self.tests.append( + Junit.testcase( + Junit.error(data, message="internal error"), + classname="pytest", + name="internal")) def pytest_sessionstart(self, session): self.suite_start_time = time.time() @@ -204,17 +197,17 @@ class LogXML(object): suite_stop_time = time.time() suite_time_delta = suite_stop_time - self.suite_start_time numtests = self.passed + self.failed + logfile.write('<?xml version="1.0" encoding="utf-8"?>') - logfile.write('<testsuite ') - logfile.write('name="" ') - logfile.write('errors="%i" ' % self.errors) - logfile.write('failures="%i" ' % self.failed) - logfile.write('skips="%i" ' % self.skipped) - logfile.write('tests="%i" ' % numtests) - logfile.write('time="%.3f"' % suite_time_delta) - logfile.write(' >') - logfile.writelines(self.test_logs) - logfile.write('</testsuite>') + logfile.write(Junit.testsuite( + self.tests, + name="", + errors=self.errors, + failures=self.failed, + skips=self.skipped, + tests=numtests, + time="%.3f" % suite_time_delta, + ).unicode(indent=0)) logfile.close() def pytest_terminal_summary(self, terminalreporter): diff --git a/_pytest/main.py b/_pytest/main.py index ac306a867a..f6354205b0 100644 --- a/_pytest/main.py +++ b/_pytest/main.py @@ -2,7 +2,7 @@ import py import pytest, _pytest -import os, sys +import os, sys, imp tracebackcutdir = py.path.local(_pytest.__file__).dirpath() # exitcodes for the command line @@ -11,6 +11,8 @@ EXIT_TESTSFAILED = 1 EXIT_INTERRUPTED = 2 EXIT_INTERNALERROR = 3 +name_re = py.std.re.compile("^[a-zA-Z_]\w*$") + def pytest_addoption(parser): parser.addini("norecursedirs", "directory patterns to avoid for recursion", type="args", default=('.*', 'CVS', '_darcs', '{arch}')) @@ -27,6 +29,9 @@ def pytest_addoption(parser): action="store", type="int", dest="maxfail", default=0, help="exit after first num failures or errors.") + group._addoption('--strict', action="store_true", + help="run pytest in strict mode, warnings become errors.") + group = parser.getgroup("collect", "collection") group.addoption('--collectonly', action="store_true", dest="collectonly", @@ -48,7 +53,7 @@ def pytest_addoption(parser): def pytest_namespace(): collect = dict(Item=Item, Collector=Collector, File=File, Session=Session) return dict(collect=collect) - + def pytest_configure(config): py.test.config = config # compatibiltiy if config.option.exitfirst: @@ -77,11 +82,11 @@ def wrap_session(config, doit): session.exitstatus = EXIT_INTERNALERROR if excinfo.errisinstance(SystemExit): sys.stderr.write("mainloop: caught Spurious SystemExit!\n") - if not session.exitstatus and session._testsfailed: - session.exitstatus = EXIT_TESTSFAILED if initstate >= 2: config.hook.pytest_sessionfinish(session=session, - exitstatus=session.exitstatus) + exitstatus=session.exitstatus or (session._testsfailed and 1)) + if not session.exitstatus and session._testsfailed: + session.exitstatus = EXIT_TESTSFAILED if initstate >= 1: config.pluginmanager.do_unconfigure(config) return session.exitstatus @@ -101,8 +106,12 @@ def pytest_collection(session): def pytest_runtestloop(session): if session.config.option.collectonly: return True - for item in session.session.items: - item.config.hook.pytest_runtest_protocol(item=item) + for i, item in enumerate(session.items): + try: + nextitem = session.items[i+1] + except IndexError: + nextitem = None + item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem) if session.shouldstop: raise session.Interrupted(session.shouldstop) return True @@ -132,7 +141,7 @@ def compatproperty(name): return getattr(pytest, name) return property(fget, None, None, "deprecated attribute %r, use pytest.%s" % (name,name)) - + class Node(object): """ base class for all Nodes in the collection tree. Collector subclasses have children, Items are terminal nodes.""" @@ -143,13 +152,13 @@ class Node(object): #: the parent collector node. self.parent = parent - + #: the test config object self.config = config or parent.config #: the collection this node is part of self.session = session or parent.session - + #: filesystem path where this node was collected from self.fspath = getattr(parent, 'fspath', None) self.ihook = self.session.gethookproxy(self.fspath) @@ -224,13 +233,13 @@ class Node(object): def listchain(self): """ return list of all parent collectors up to self, starting from root of collection tree. """ - l = [self] - while 1: - x = l[0] - if x.parent is not None: # and x.parent.parent is not None: - l.insert(0, x.parent) - else: - return l + chain = [] + item = self + while item is not None: + chain.append(item) + item = item.parent + chain.reverse() + return chain def listnames(self): return [x.name for x in self.listchain()] @@ -325,6 +334,8 @@ class Item(Node): """ a basic test invocation item. Note that for a single function there might be multiple test invocation items. """ + nextitem = None + def reportinfo(self): return self.fspath, None, "" @@ -469,16 +480,29 @@ class Session(FSCollector): return True def _tryconvertpyarg(self, x): - try: - mod = __import__(x, None, None, ['__doc__']) - except (ValueError, ImportError): - return x - p = py.path.local(mod.__file__) - if p.purebasename == "__init__": - p = p.dirpath() - else: - p = p.new(basename=p.purebasename+".py") - return str(p) + mod = None + path = [os.path.abspath('.')] + sys.path + for name in x.split('.'): + # ignore anything that's not a proper name here + # else something like --pyargs will mess up '.' + # since imp.find_module will actually sometimes work for it + # but it's supposed to be considered a filesystem path + # not a package + if name_re.match(name) is None: + return x + try: + fd, mod, type_ = imp.find_module(name, path) + except ImportError: + return x + else: + if fd is not None: + fd.close() + + if type_[2] != imp.PKG_DIRECTORY: + path = [os.path.dirname(mod)] + else: + path = [mod] + return mod def _parsearg(self, arg): """ return (fspath, names) tuple after checking the file exists. """ @@ -496,7 +520,7 @@ class Session(FSCollector): raise pytest.UsageError(msg + arg) parts[0] = path return parts - + def matchnodes(self, matching, names): self.trace("matchnodes", matching, names) self.trace.root.indent += 1 diff --git a/_pytest/mark.py b/_pytest/mark.py index 6cc8edcdba..9ab62caf71 100644 --- a/_pytest/mark.py +++ b/_pytest/mark.py @@ -14,12 +14,37 @@ def pytest_addoption(parser): "Terminate expression with ':' to make the first match match " "all subsequent tests (usually file-order). ") + group._addoption("-m", + action="store", dest="markexpr", default="", metavar="MARKEXPR", + help="only run tests matching given mark expression. " + "example: -m 'mark1 and not mark2'." + ) + + group.addoption("--markers", action="store_true", help= + "show markers (builtin, plugin and per-project ones).") + + parser.addini("markers", "markers for test functions", 'linelist') + +def pytest_cmdline_main(config): + if config.option.markers: + config.pluginmanager.do_configure(config) + tw = py.io.TerminalWriter() + for line in config.getini("markers"): + name, rest = line.split(":", 1) + tw.write("@pytest.mark.%s:" % name, bold=True) + tw.line(rest) + tw.line() + config.pluginmanager.do_unconfigure(config) + return 0 +pytest_cmdline_main.tryfirst = True + def pytest_collection_modifyitems(items, config): keywordexpr = config.option.keyword - if not keywordexpr: + matchexpr = config.option.markexpr + if not keywordexpr and not matchexpr: return selectuntil = False - if keywordexpr[-1] == ":": + if keywordexpr[-1:] == ":": selectuntil = True keywordexpr = keywordexpr[:-1] @@ -29,21 +54,38 @@ def pytest_collection_modifyitems(items, config): if keywordexpr and skipbykeyword(colitem, keywordexpr): deselected.append(colitem) else: - remaining.append(colitem) if selectuntil: keywordexpr = None + if matchexpr: + if not matchmark(colitem, matchexpr): + deselected.append(colitem) + continue + remaining.append(colitem) if deselected: config.hook.pytest_deselected(items=deselected) items[:] = remaining +class BoolDict: + def __init__(self, mydict): + self._mydict = mydict + def __getitem__(self, name): + return name in self._mydict + +def matchmark(colitem, matchexpr): + return eval(matchexpr, {}, BoolDict(colitem.obj.__dict__)) + +def pytest_configure(config): + if config.option.strict: + pytest.mark._config = config + def skipbykeyword(colitem, keywordexpr): """ return True if they given keyword expression means to skip this collector/item. """ if not keywordexpr: return - + itemkeywords = getkeywords(colitem) for key in filter(None, keywordexpr.split()): eor = key[:1] == '-' @@ -77,15 +119,31 @@ class MarkGenerator: @py.test.mark.slowtest def test_function(): pass - + will set a 'slowtest' :class:`MarkInfo` object on the ``test_function`` object. """ def __getattr__(self, name): if name[0] == "_": raise AttributeError(name) + if hasattr(self, '_config'): + self._check(name) return MarkDecorator(name) + def _check(self, name): + try: + if name in self._markers: + return + except AttributeError: + pass + self._markers = l = set() + for line in self._config.getini("markers"): + beginning = line.split(":", 1) + x = beginning[0].split("(", 1)[0] + l.add(x) + if name not in self._markers: + raise AttributeError("%r not a registered marker" % (name,)) + class MarkDecorator: """ A decorator for test functions and test classes. When applied it will create :class:`MarkInfo` objects which may be @@ -133,8 +191,7 @@ class MarkDecorator: holder = MarkInfo(self.markname, self.args, self.kwargs) setattr(func, self.markname, holder) else: - holder.kwargs.update(self.kwargs) - holder.args += self.args + holder.add(self.args, self.kwargs) return func kw = self.kwargs.copy() kw.update(kwargs) @@ -150,27 +207,20 @@ class MarkInfo: self.args = args #: keyword argument dictionary, empty if nothing specified self.kwargs = kwargs + self._arglist = [(args, kwargs.copy())] def __repr__(self): return "<MarkInfo %r args=%r kwargs=%r>" % ( self.name, self.args, self.kwargs) -def pytest_itemcollected(item): - if not isinstance(item, pytest.Function): - return - try: - func = item.obj.__func__ - except AttributeError: - func = getattr(item.obj, 'im_func', item.obj) - pyclasses = (pytest.Class, pytest.Module) - for node in item.listchain(): - if isinstance(node, pyclasses): - marker = getattr(node.obj, 'pytestmark', None) - if marker is not None: - if isinstance(marker, list): - for mark in marker: - mark(func) - else: - marker(func) - node = node.parent - item.keywords.update(py.builtin._getfuncdict(func)) + def add(self, args, kwargs): + """ add a MarkInfo with the given args and kwargs. """ + self._arglist.append((args, kwargs)) + self.args += args + self.kwargs.update(kwargs) + + def __iter__(self): + """ yield MarkInfo objects each relating to a marking-call. """ + for args, kwargs in self._arglist: + yield MarkInfo(self.name, args, kwargs) + diff --git a/_pytest/monkeypatch.py b/_pytest/monkeypatch.py index 0a530b717b..8629044313 100644 --- a/_pytest/monkeypatch.py +++ b/_pytest/monkeypatch.py @@ -13,6 +13,7 @@ def pytest_funcarg__monkeypatch(request): monkeypatch.setenv(name, value, prepend=False) monkeypatch.delenv(name, value, raising=True) monkeypatch.syspath_prepend(path) + monkeypatch.chdir(path) All modifications will be undone after the requesting test function has finished. The ``raising`` @@ -30,6 +31,7 @@ class monkeypatch: def __init__(self): self._setattr = [] self._setitem = [] + self._cwd = None def setattr(self, obj, name, value, raising=True): """ set attribute ``name`` on ``obj`` to ``value``, by default @@ -83,6 +85,17 @@ class monkeypatch: self._savesyspath = sys.path[:] sys.path.insert(0, str(path)) + def chdir(self, path): + """ change the current working directory to the specified path + path can be a string or a py.path.local object + """ + if self._cwd is None: + self._cwd = os.getcwd() + if hasattr(path, "chdir"): + path.chdir() + else: + os.chdir(path) + def undo(self): """ undo previous changes. This call consumes the undo stack. Calling it a second time has no effect unless @@ -95,9 +108,17 @@ class monkeypatch: self._setattr[:] = [] for dictionary, name, value in self._setitem: if value is notset: - del dictionary[name] + try: + del dictionary[name] + except KeyError: + pass # was already deleted, so we have the desired state else: dictionary[name] = value self._setitem[:] = [] if hasattr(self, '_savesyspath'): sys.path[:] = self._savesyspath + del self._savesyspath + + if self._cwd is not None: + os.chdir(self._cwd) + self._cwd = None diff --git a/_pytest/nose.py b/_pytest/nose.py index 2f90a93f8d..5560e473d6 100644 --- a/_pytest/nose.py +++ b/_pytest/nose.py @@ -13,6 +13,7 @@ def pytest_runtest_makereport(__multicall__, item, call): call.excinfo = call2.excinfo +@pytest.mark.trylast def pytest_runtest_setup(item): if isinstance(item, (pytest.Function)): if isinstance(item.parent, pytest.Generator): diff --git a/_pytest/pastebin.py b/_pytest/pastebin.py index 52d7a65e60..6369ee749a 100644 --- a/_pytest/pastebin.py +++ b/_pytest/pastebin.py @@ -38,7 +38,11 @@ def pytest_unconfigure(config): del tr._tw.__dict__['write'] def getproxy(): - return py.std.xmlrpclib.ServerProxy(url.xmlrpc).pastes + if sys.version_info < (3, 0): + from xmlrpclib import ServerProxy + else: + from xmlrpc.client import ServerProxy + return ServerProxy(url.xmlrpc).pastes def pytest_terminal_summary(terminalreporter): if terminalreporter.config.option.pastebin != "failed": diff --git a/_pytest/pdb.py b/_pytest/pdb.py index d9b4c2de87..7516b14fbb 100644 --- a/_pytest/pdb.py +++ b/_pytest/pdb.py @@ -19,11 +19,13 @@ def pytest_configure(config): class pytestPDB: """ Pseudo PDB that defers to the real pdb. """ item = None + collector = None def set_trace(self): """ invoke PDB set_trace debugging, dropping any IO capturing. """ frame = sys._getframe().f_back - item = getattr(self, 'item', None) + item = self.item or self.collector + if item is not None: capman = item.config.pluginmanager.getplugin("capturemanager") out, err = capman.suspendcapture() @@ -38,6 +40,14 @@ def pdbitem(item): pytestPDB.item = item pytest_runtest_setup = pytest_runtest_call = pytest_runtest_teardown = pdbitem +@pytest.mark.tryfirst +def pytest_make_collect_report(__multicall__, collector): + try: + pytestPDB.collector = collector + return __multicall__.execute() + finally: + pytestPDB.collector = None + def pytest_runtest_makereport(): pytestPDB.item = None @@ -60,7 +70,13 @@ class PdbInvoke: tw.sep(">", "traceback") rep.toterminal(tw) tw.sep(">", "entering PDB") - post_mortem(call.excinfo._excinfo[2]) + # A doctest.UnexpectedException is not useful for post_mortem. + # Use the underlying exception instead: + if isinstance(call.excinfo.value, py.std.doctest.UnexpectedException): + tb = call.excinfo.value.exc_info[2] + else: + tb = call.excinfo._excinfo[2] + post_mortem(tb) rep._pdbshown = True return rep diff --git a/_pytest/pytester.py b/_pytest/pytester.py index 8bfa3d37bd..a7aee998f3 100644 --- a/_pytest/pytester.py +++ b/_pytest/pytester.py @@ -25,6 +25,7 @@ def pytest_configure(config): _pytest_fullpath except NameError: _pytest_fullpath = os.path.abspath(pytest.__file__.rstrip("oc")) + _pytest_fullpath = _pytest_fullpath.replace("$py.class", ".py") def pytest_funcarg___pytest(request): return PytestArg(request) @@ -313,16 +314,6 @@ class TmpTestdir: result.extend(session.genitems(colitem)) return result - def inline_genitems(self, *args): - #config = self.parseconfig(*args) - config = self.parseconfigure(*args) - rec = self.getreportrecorder(config) - session = Session(config) - config.hook.pytest_sessionstart(session=session) - session.perform_collect() - config.hook.pytest_sessionfinish(session=session, exitstatus=EXIT_OK) - return session.items, rec - def runitem(self, source): # used from runner functional tests item = self.getitem(source) @@ -343,64 +334,57 @@ class TmpTestdir: l = list(args) + [p] reprec = self.inline_run(*l) reports = reprec.getreports("pytest_runtest_logreport") - assert len(reports) == 1, reports - return reports[0] + assert len(reports) == 3, reports # setup/call/teardown + return reports[1] - def inline_run(self, *args): - args = ("-s", ) + args # otherwise FD leakage - config = self.parseconfig(*args) - reprec = self.getreportrecorder(config) - #config.pluginmanager.do_configure(config) - config.hook.pytest_cmdline_main(config=config) - #config.pluginmanager.do_unconfigure(config) - return reprec + def inline_genitems(self, *args): + return self.inprocess_run(list(args) + ['--collectonly']) - def config_preparse(self): - config = self.Config() - for plugin in self.plugins: - if isinstance(plugin, str): - config.pluginmanager.import_plugin(plugin) - else: - if isinstance(plugin, dict): - plugin = PseudoPlugin(plugin) - if not config.pluginmanager.isregistered(plugin): - config.pluginmanager.register(plugin) - return config + def inline_run(self, *args): + items, rec = self.inprocess_run(args) + return rec + + def inprocess_run(self, args, plugins=None): + rec = [] + items = [] + class Collect: + def pytest_configure(x, config): + rec.append(self.getreportrecorder(config)) + def pytest_itemcollected(self, item): + items.append(item) + if not plugins: + plugins = [] + plugins.append(Collect()) + ret = self.pytestmain(list(args), plugins=[Collect()]) + reprec = rec[0] + reprec.ret = ret + assert len(rec) == 1 + return items, reprec def parseconfig(self, *args): - if not args: - args = (self.tmpdir,) - config = self.config_preparse() - args = list(args) + args = [str(x) for x in args] for x in args: if str(x).startswith('--basetemp'): break else: args.append("--basetemp=%s" % self.tmpdir.dirpath('basetemp')) - config.parse(args) + import _pytest.core + config = _pytest.core._prepareconfig(args, self.plugins) + # the in-process pytest invocation needs to avoid leaking FDs + # so we register a "reset_capturings" callmon the capturing manager + # and make sure it gets called + config._cleanup.append( + config.pluginmanager.getplugin("capturemanager").reset_capturings) + import _pytest.config + self.request.addfinalizer( + lambda: _pytest.config.pytest_unconfigure(config)) return config - def reparseconfig(self, args=None): - """ this is used from tests that want to re-invoke parse(). """ - if not args: - args = [self.tmpdir] - oldconfig = getattr(py.test, 'config', None) - try: - c = py.test.config = self.Config() - c.basetemp = py.path.local.make_numbered_dir(prefix="reparse", - keep=0, rootdir=self.tmpdir, lock_timeout=None) - c.parse(args) - c.pluginmanager.do_configure(c) - self.request.addfinalizer(lambda: c.pluginmanager.do_unconfigure(c)) - return c - finally: - py.test.config = oldconfig - def parseconfigure(self, *args): config = self.parseconfig(*args) config.pluginmanager.do_configure(config) self.request.addfinalizer(lambda: - config.pluginmanager.do_unconfigure(config)) + config.pluginmanager.do_unconfigure(config)) return config def getitem(self, source, funcname="test_func"): @@ -420,7 +404,6 @@ class TmpTestdir: self.makepyfile(__init__ = "#") self.config = config = self.parseconfigure(path, *configargs) node = self.getnode(config, path) - #config.pluginmanager.do_unconfigure(config) return node def collect_by_name(self, modcol, name): @@ -437,9 +420,16 @@ class TmpTestdir: return py.std.subprocess.Popen(cmdargs, stdout=stdout, stderr=stderr, **kw) def pytestmain(self, *args, **kwargs): - ret = pytest.main(*args, **kwargs) - if ret == 2: - raise KeyboardInterrupt() + class ResetCapturing: + @pytest.mark.trylast + def pytest_unconfigure(self, config): + capman = config.pluginmanager.getplugin("capturemanager") + capman.reset_capturings() + plugins = kwargs.setdefault("plugins", []) + rc = ResetCapturing() + plugins.append(rc) + return pytest.main(*args, **kwargs) + def run(self, *cmdargs): return self._run(*cmdargs) @@ -528,6 +518,8 @@ class TmpTestdir: pexpect = py.test.importorskip("pexpect", "2.4") if hasattr(sys, 'pypy_version_info') and '64' in py.std.platform.machine(): pytest.skip("pypy-64 bit not supported") + if sys.platform == "darwin": + pytest.xfail("pexpect does not work reliably on darwin?!") logfile = self.tmpdir.join("spawn.out") child = pexpect.spawn(cmd, logfile=logfile.open("w")) child.timeout = expect_timeout @@ -540,10 +532,6 @@ def getdecoded(out): return "INTERNAL not-utf8-decodeable, truncated string:\n%s" % ( py.io.saferepr(out),) -class PseudoPlugin: - def __init__(self, vars): - self.__dict__.update(vars) - class ReportRecorder(object): def __init__(self, hook): self.hook = hook @@ -565,10 +553,17 @@ class ReportRecorder(object): def getreports(self, names="pytest_runtest_logreport pytest_collectreport"): return [x.report for x in self.getcalls(names)] - def matchreport(self, inamepart="", names="pytest_runtest_logreport pytest_collectreport", when=None): + def matchreport(self, inamepart="", + names="pytest_runtest_logreport pytest_collectreport", when=None): """ return a testreport whose dotted import path matches """ l = [] for rep in self.getreports(names=names): + try: + if not when and rep.when != "call" and rep.passed: + # setup/teardown passing reports - let's ignore those + continue + except AttributeError: + pass if when and getattr(rep, 'when', None) != when: continue if not inamepart or inamepart in rep.nodeid.split("::"): diff --git a/_pytest/python.py b/_pytest/python.py index 35b85485c0..7705249846 100644 --- a/_pytest/python.py +++ b/_pytest/python.py @@ -4,6 +4,7 @@ import inspect import sys import pytest from py._code.code import TerminalRepr +from _pytest.monkeypatch import monkeypatch import _pytest cutdir = py.path.local(_pytest.__file__).dirpath() @@ -26,6 +27,24 @@ def pytest_cmdline_main(config): showfuncargs(config) return 0 + +def pytest_generate_tests(metafunc): + try: + param = metafunc.function.parametrize + except AttributeError: + return + for p in param: + metafunc.parametrize(*p.args, **p.kwargs) + +def pytest_configure(config): + config.addinivalue_line("markers", + "parametrize(argnames, argvalues): call a test function multiple " + "times passing in multiple different argument value sets. Example: " + "@parametrize('arg1', [1,2]) would lead to two calls of the decorated " + "test function, one with arg1=1 and another with arg1=2." + ) + + @pytest.mark.trylast def pytest_namespace(): raises.Exception = pytest.fail.Exception @@ -138,6 +157,7 @@ class PyobjMixin(object): obj = obj.place_as self._fslineno = py.code.getfslineno(obj) + assert isinstance(self._fslineno[1], int), obj return self._fslineno def reportinfo(self): @@ -155,6 +175,7 @@ class PyobjMixin(object): else: fspath, lineno = self._getfslineno() modpath = self.getmodpath() + assert isinstance(lineno, int) return fspath, lineno, modpath class PyCollectorMixin(PyobjMixin, pytest.Collector): @@ -200,6 +221,7 @@ class PyCollectorMixin(PyobjMixin, pytest.Collector): module = self.getparent(Module).obj clscol = self.getparent(Class) cls = clscol and clscol.obj or None + transfer_markers(funcobj, cls, module) metafunc = Metafunc(funcobj, config=self.config, cls=cls, module=module) gentesthook = self.config.hook.pytest_generate_tests @@ -219,6 +241,19 @@ class PyCollectorMixin(PyobjMixin, pytest.Collector): l.append(function) return l +def transfer_markers(funcobj, cls, mod): + # XXX this should rather be code in the mark plugin or the mark + # plugin should merge with the python plugin. + for holder in (cls, mod): + try: + pytestmark = holder.pytestmark + except AttributeError: + continue + if isinstance(pytestmark, list): + for mark in pytestmark: + mark(funcobj) + else: + pytestmark(funcobj) class Module(pytest.File, PyCollectorMixin): def _getobj(self): @@ -226,13 +261,8 @@ class Module(pytest.File, PyCollectorMixin): def _importtestmodule(self): # we assume we are only called once per module - from _pytest import assertion - assertion.before_module_import(self) try: - try: - mod = self.fspath.pyimport(ensuresyspath=True) - finally: - assertion.after_module_import(self) + mod = self.fspath.pyimport(ensuresyspath=True) except SyntaxError: excinfo = py.code.ExceptionInfo() raise self.CollectError(excinfo.getrepr(style="short")) @@ -244,7 +274,8 @@ class Module(pytest.File, PyCollectorMixin): " %s\n" "which is not the same as the test file we want to collect:\n" " %s\n" - "HINT: use a unique basename for your test file modules" + "HINT: remove __pycache__ / .pyc files and/or use a " + "unique basename for your test file modules" % e.args ) #print "imported test module", mod @@ -374,6 +405,7 @@ class FuncargLookupErrorRepr(TerminalRepr): tw.line() tw.line("%s:%d" % (self.filename, self.firstlineno+1)) + class Generator(FunctionMixin, PyCollectorMixin, pytest.Collector): def collect(self): # test generators are seen as collectors but they also @@ -430,6 +462,7 @@ class Function(FunctionMixin, pytest.Item): "yielded functions (deprecated) cannot have funcargs") else: if callspec is not None: + self.callspec = callspec self.funcargs = callspec.funcargs or {} self._genid = callspec.id if hasattr(callspec, "param"): @@ -506,15 +539,59 @@ def fillfuncargs(function): request._fillfuncargs() _notexists = object() -class CallSpec: - def __init__(self, funcargs, id, param): - self.funcargs = funcargs - self.id = id + +class CallSpec2(object): + def __init__(self, metafunc): + self.metafunc = metafunc + self.funcargs = {} + self._idlist = [] + self.params = {} + self._globalid = _notexists + self._globalid_args = set() + self._globalparam = _notexists + + def copy(self, metafunc): + cs = CallSpec2(self.metafunc) + cs.funcargs.update(self.funcargs) + cs.params.update(self.params) + cs._idlist = list(self._idlist) + cs._globalid = self._globalid + cs._globalid_args = self._globalid_args + cs._globalparam = self._globalparam + return cs + + def _checkargnotcontained(self, arg): + if arg in self.params or arg in self.funcargs: + raise ValueError("duplicate %r" %(arg,)) + + def getparam(self, name): + try: + return self.params[name] + except KeyError: + if self._globalparam is _notexists: + raise ValueError(name) + return self._globalparam + + @property + def id(self): + return "-".join(map(str, filter(None, self._idlist))) + + def setmulti(self, valtype, argnames, valset, id): + for arg,val in zip(argnames, valset): + self._checkargnotcontained(arg) + getattr(self, valtype)[arg] = val + self._idlist.append(id) + + def setall(self, funcargs, id, param): + for x in funcargs: + self._checkargnotcontained(x) + self.funcargs.update(funcargs) + if id is not _notexists: + self._idlist.append(id) if param is not _notexists: - self.param = param - def __repr__(self): - return "<CallSpec id=%r param=%r funcargs=%r>" %( - self.id, getattr(self, 'param', '?'), self.funcargs) + assert self._globalparam is _notexists + self._globalparam = param + class Metafunc: def __init__(self, function, config=None, cls=None, module=None): @@ -528,31 +605,69 @@ class Metafunc: self._calls = [] self._ids = py.builtin.set() + def parametrize(self, argnames, argvalues, indirect=False, ids=None): + """ Add new invocations to the underlying test function using the list + of argvalues for the given argnames. Parametrization is performed + during the collection phase. If you need to setup expensive resources + you may pass indirect=True and implement a funcarg factory which can + perform the expensive setup just before a test is actually run. + + :arg argnames: an argument name or a list of argument names + + :arg argvalues: a list of values for the argname or a list of tuples of + values for the list of argument names. + + :arg indirect: if True each argvalue corresponding to an argument will + be passed as request.param to its respective funcarg factory so + that it can perform more expensive setups during the setup phase of + a test rather than at collection time. + + :arg ids: list of string ids each corresponding to the argvalues so + that they are part of the test id. If no ids are provided they will + be generated automatically from the argvalues. + """ + if not isinstance(argnames, (tuple, list)): + argnames = (argnames,) + argvalues = [(val,) for val in argvalues] + for arg in argnames: + if arg not in self.funcargnames: + raise ValueError("%r has no argument %r" %(self.function, arg)) + valtype = indirect and "params" or "funcargs" + if not ids: + idmaker = IDMaker() + ids = list(map(idmaker, argvalues)) + newcalls = [] + for callspec in self._calls or [CallSpec2(self)]: + for i, valset in enumerate(argvalues): + assert len(valset) == len(argnames) + newcallspec = callspec.copy(self) + newcallspec.setmulti(valtype, argnames, valset, ids[i]) + newcalls.append(newcallspec) + self._calls = newcalls + def addcall(self, funcargs=None, id=_notexists, param=_notexists): - """ add a new call to the underlying test function during the - collection phase of a test run. Note that request.addcall() is - called during the test collection phase prior and independently - to actual test execution. Therefore you should perform setup - of resources in a funcarg factory which can be instrumented - with the ``param``. + """ (deprecated, use parametrize) Add a new call to the underlying + test function during the collection phase of a test run. Note that + request.addcall() is called during the test collection phase prior and + independently to actual test execution. You should only use addcall() + if you need to specify multiple arguments of a test function. :arg funcargs: argument keyword dictionary used when invoking the test function. :arg id: used for reporting and identification purposes. If you - don't supply an `id` the length of the currently - list of calls to the test function will be used. + don't supply an `id` an automatic unique id will be generated. - :arg param: will be exposed to a later funcarg factory invocation - through the ``request.param`` attribute. It allows to - defer test fixture setup activities to when an actual - test is run. + :arg param: a parameter which will be exposed to a later funcarg factory + invocation through the ``request.param`` attribute. """ assert funcargs is None or isinstance(funcargs, dict) if funcargs is not None: for name in funcargs: if name not in self.funcargnames: pytest.fail("funcarg %r not used in this function." % name) + else: + funcargs = {} if id is None: raise ValueError("id=None not allowed") if id is _notexists: @@ -561,11 +676,26 @@ class Metafunc: if id in self._ids: raise ValueError("duplicate id %r" % id) self._ids.add(id) - self._calls.append(CallSpec(funcargs, id, param)) + + cs = CallSpec2(self) + cs.setall(funcargs, id, param) + self._calls.append(cs) + +class IDMaker: + def __init__(self): + self.counter = 0 + def __call__(self, valset): + l = [] + for val in valset: + if not isinstance(val, (int, str)): + val = "."+str(self.counter) + self.counter += 1 + l.append(str(val)) + return "-".join(l) class FuncargRequest: """ A request for function arguments from a test function. - + Note that there is an optional ``param`` attribute in case there was an invocation to metafunc.addcall(param=...). If no such call was done in a ``pytest_generate_tests`` @@ -637,7 +767,7 @@ class FuncargRequest: def applymarker(self, marker): - """ apply a marker to a single test function invocation. + """ Apply a marker to a single test function invocation. This method is useful if you don't want to have a keyword/marker on all function invocations. @@ -649,7 +779,7 @@ class FuncargRequest: self._pyfuncitem.keywords[marker.markname] = marker def cached_setup(self, setup, teardown=None, scope="module", extrakey=None): - """ return a testing resource managed by ``setup`` & + """ Return a testing resource managed by ``setup`` & ``teardown`` calls. ``scope`` and ``extrakey`` determine when the ``teardown`` function will be called so that subsequent calls to ``setup`` would recreate the resource. @@ -698,11 +828,18 @@ class FuncargRequest: self._raiselookupfailed(argname) funcargfactory = self._name2factory[argname].pop() oldarg = self._currentarg - self._currentarg = argname + mp = monkeypatch() + mp.setattr(self, '_currentarg', argname) + try: + param = self._pyfuncitem.callspec.getparam(argname) + except (AttributeError, ValueError): + pass + else: + mp.setattr(self, 'param', param, raising=False) try: self._funcargs[argname] = res = funcargfactory(request=self) finally: - self._currentarg = oldarg + mp.undo() return res def _getscopeitem(self, scope): @@ -817,8 +954,7 @@ def raises(ExpectedException, *args, **kwargs): >>> raises(ZeroDivisionError, f, x=0) <ExceptionInfo ...> - A third possibility is to use a string which which will - be executed:: + A third possibility is to use a string to be executed:: >>> raises(ZeroDivisionError, "f(0)") <ExceptionInfo ...> diff --git a/_pytest/resultlog.py b/_pytest/resultlog.py index 7f879cce59..94ac67a7dc 100644 --- a/_pytest/resultlog.py +++ b/_pytest/resultlog.py @@ -63,6 +63,8 @@ class ResultLog(object): self.write_log_entry(testpath, lettercode, longrepr) def pytest_runtest_logreport(self, report): + if report.when != "call" and report.passed: + return res = self.config.hook.pytest_report_teststatus(report=report) code = res[1] if code == 'x': @@ -89,5 +91,8 @@ class ResultLog(object): self.log_outcome(report, code, longrepr) def pytest_internalerror(self, excrepr): - path = excrepr.reprcrash.path + reprcrash = getattr(excrepr, 'reprcrash', None) + path = getattr(reprcrash, "path", None) + if path is None: + path = "cwd:%s" % py.path.local() self.write_log_entry(path, '!', str(excrepr)) diff --git a/_pytest/runner.py b/_pytest/runner.py index c1b73e94c7..048ca1dd9b 100644 --- a/_pytest/runner.py +++ b/_pytest/runner.py @@ -1,6 +1,6 @@ """ basic collect and runtest protocol implementations """ -import py, sys +import py, sys, time from py._code.code import TerminalRepr def pytest_namespace(): @@ -14,33 +14,58 @@ def pytest_namespace(): # # pytest plugin hooks +def pytest_addoption(parser): + group = parser.getgroup("terminal reporting", "reporting", after="general") + group.addoption('--durations', + action="store", type="int", default=None, metavar="N", + help="show N slowest setup/test durations (N=0 for all)."), + +def pytest_terminal_summary(terminalreporter): + durations = terminalreporter.config.option.durations + if durations is None: + return + tr = terminalreporter + dlist = [] + for replist in tr.stats.values(): + for rep in replist: + if hasattr(rep, 'duration'): + dlist.append(rep) + if not dlist: + return + dlist.sort(key=lambda x: x.duration) + dlist.reverse() + if not durations: + tr.write_sep("=", "slowest test durations") + else: + tr.write_sep("=", "slowest %s test durations" % durations) + dlist = dlist[:durations] + + for rep in dlist: + nodeid = rep.nodeid.replace("::()::", "::") + tr.write_line("%02.2fs %-8s %s" % + (rep.duration, rep.when, nodeid)) + def pytest_sessionstart(session): session._setupstate = SetupState() -def pytest_sessionfinish(session, exitstatus): - hook = session.config.hook - rep = hook.pytest__teardown_final(session=session) - if rep: - hook.pytest__teardown_final_logerror(session=session, report=rep) - session.exitstatus = 1 - class NodeInfo: def __init__(self, location): self.location = location -def pytest_runtest_protocol(item): +def pytest_runtest_protocol(item, nextitem): item.ihook.pytest_runtest_logstart( nodeid=item.nodeid, location=item.location, ) - runtestprotocol(item) + runtestprotocol(item, nextitem=nextitem) return True -def runtestprotocol(item, log=True): +def runtestprotocol(item, log=True, nextitem=None): rep = call_and_report(item, "setup", log) reports = [rep] if rep.passed: reports.append(call_and_report(item, "call", log)) - reports.append(call_and_report(item, "teardown", log)) + reports.append(call_and_report(item, "teardown", log, + nextitem=nextitem)) return reports def pytest_runtest_setup(item): @@ -49,16 +74,8 @@ def pytest_runtest_setup(item): def pytest_runtest_call(item): item.runtest() -def pytest_runtest_teardown(item): - item.session._setupstate.teardown_exact(item) - -def pytest__teardown_final(session): - call = CallInfo(session._setupstate.teardown_all, when="teardown") - if call.excinfo: - ntraceback = call.excinfo.traceback .cut(excludepath=py._pydir) - call.excinfo.traceback = ntraceback.filter() - longrepr = call.excinfo.getrepr(funcargs=True) - return TeardownErrorReport(longrepr) +def pytest_runtest_teardown(item, nextitem): + item.session._setupstate.teardown_exact(item, nextitem) def pytest_report_teststatus(report): if report.when in ("setup", "teardown"): @@ -74,18 +91,18 @@ def pytest_report_teststatus(report): # # Implementation -def call_and_report(item, when, log=True): - call = call_runtest_hook(item, when) +def call_and_report(item, when, log=True, **kwds): + call = call_runtest_hook(item, when, **kwds) hook = item.ihook report = hook.pytest_runtest_makereport(item=item, call=call) - if log and (when == "call" or not report.passed): + if log: hook.pytest_runtest_logreport(report=report) return report -def call_runtest_hook(item, when): +def call_runtest_hook(item, when, **kwds): hookname = "pytest_runtest_" + when ihook = getattr(item.ihook, hookname) - return CallInfo(lambda: ihook(item=item), when=when) + return CallInfo(lambda: ihook(item=item, **kwds), when=when) class CallInfo: """ Result/Exception info a function invocation. """ @@ -95,12 +112,16 @@ class CallInfo: #: context of invocation: one of "setup", "call", #: "teardown", "memocollect" self.when = when + self.start = time.time() try: - self.result = func() - except KeyboardInterrupt: - raise - except: - self.excinfo = py.code.ExceptionInfo() + try: + self.result = func() + except KeyboardInterrupt: + raise + except: + self.excinfo = py.code.ExceptionInfo() + finally: + self.stop = time.time() def __repr__(self): if self.excinfo: @@ -120,6 +141,10 @@ def getslaveinfoline(node): return s class BaseReport(object): + + def __init__(self, **kw): + self.__dict__.update(kw) + def toterminal(self, out): longrepr = self.longrepr if hasattr(self, 'node'): @@ -139,6 +164,7 @@ class BaseReport(object): def pytest_runtest_makereport(item, call): when = call.when + duration = call.stop-call.start keywords = dict([(x,1) for x in item.keywords]) excinfo = call.excinfo if not call.excinfo: @@ -160,14 +186,15 @@ def pytest_runtest_makereport(item, call): else: # exception in setup or teardown longrepr = item._repr_failure_py(excinfo) return TestReport(item.nodeid, item.location, - keywords, outcome, longrepr, when) + keywords, outcome, longrepr, when, + duration=duration) class TestReport(BaseReport): """ Basic test report object (also used for setup and teardown calls if they fail). """ def __init__(self, nodeid, location, - keywords, outcome, longrepr, when): + keywords, outcome, longrepr, when, sections=(), duration=0, **extra): #: normalized collection node id self.nodeid = nodeid @@ -179,16 +206,25 @@ class TestReport(BaseReport): #: a name -> value dictionary containing all keywords and #: markers associated with a test invocation. self.keywords = keywords - + #: test outcome, always one of "passed", "failed", "skipped". self.outcome = outcome #: None or a failure representation. self.longrepr = longrepr - + #: one of 'setup', 'call', 'teardown' to indicate runtest phase. self.when = when + #: list of (secname, data) extra information which needs to + #: marshallable + self.sections = list(sections) + + #: time it took to run just the test + self.duration = duration + + self.__dict__.update(extra) + def __repr__(self): return "<TestReport %r when=%r outcome=%r>" % ( self.nodeid, self.when, self.outcome) @@ -196,8 +232,10 @@ class TestReport(BaseReport): class TeardownErrorReport(BaseReport): outcome = "failed" when = "teardown" - def __init__(self, longrepr): + def __init__(self, longrepr, **extra): self.longrepr = longrepr + self.sections = [] + self.__dict__.update(extra) def pytest_make_collect_report(collector): call = CallInfo(collector._memocollect, "memocollect") @@ -219,11 +257,13 @@ def pytest_make_collect_report(collector): getattr(call, 'result', None)) class CollectReport(BaseReport): - def __init__(self, nodeid, outcome, longrepr, result): + def __init__(self, nodeid, outcome, longrepr, result, sections=(), **extra): self.nodeid = nodeid self.outcome = outcome self.longrepr = longrepr self.result = result or [] + self.sections = list(sections) + self.__dict__.update(extra) @property def location(self): @@ -277,20 +317,22 @@ class SetupState(object): self._teardown_with_finalization(None) assert not self._finalizers - def teardown_exact(self, item): - if self.stack and item == self.stack[-1]: + def teardown_exact(self, item, nextitem): + needed_collectors = nextitem and nextitem.listchain() or [] + self._teardown_towards(needed_collectors) + + def _teardown_towards(self, needed_collectors): + while self.stack: + if self.stack == needed_collectors[:len(self.stack)]: + break self._pop_and_teardown() - else: - self._callfinalizers(item) def prepare(self, colitem): """ setup objects along the collector chain to the test-method and teardown previously setup objects.""" needed_collectors = colitem.listchain() - while self.stack: - if self.stack == needed_collectors[:len(self.stack)]: - break - self._pop_and_teardown() + self._teardown_towards(needed_collectors) + # check if the last collection node has raised an error for col in self.stack: if hasattr(col, '_prepare_exc'): diff --git a/_pytest/skipping.py b/_pytest/skipping.py index fe69f295e7..6098c81a4b 100644 --- a/_pytest/skipping.py +++ b/_pytest/skipping.py @@ -9,6 +9,21 @@ def pytest_addoption(parser): action="store_true", dest="runxfail", default=False, help="run tests even if they are marked xfail") +def pytest_configure(config): + config.addinivalue_line("markers", + "skipif(*conditions): skip the given test function if evaluation " + "of all conditions has a True value. Evaluation happens within the " + "module global context. Example: skipif('sys.platform == \"win32\"') " + "skips the test if we are on the win32 platform. " + ) + config.addinivalue_line("markers", + "xfail(*conditions, reason=None, run=True): mark the the test function " + "as an expected failure. Optionally specify a reason and run=False " + "if you don't even want to execute the test function. Any positional " + "condition strings will be evaluated (like with skipif) and if one is " + "False the marker will not be applied." + ) + def pytest_namespace(): return dict(xfail=xfail) @@ -169,21 +184,23 @@ def pytest_terminal_summary(terminalreporter): elif char == "X": show_xpassed(terminalreporter, lines) elif char in "fF": - show_failed(terminalreporter, lines) + show_simple(terminalreporter, lines, 'failed', "FAIL %s") elif char in "sS": show_skipped(terminalreporter, lines) + elif char == "E": + show_simple(terminalreporter, lines, 'error', "ERROR %s") if lines: tr._tw.sep("=", "short test summary info") for line in lines: tr._tw.line(line) -def show_failed(terminalreporter, lines): +def show_simple(terminalreporter, lines, stat, format): tw = terminalreporter._tw - failed = terminalreporter.stats.get("failed") + failed = terminalreporter.stats.get(stat) if failed: for rep in failed: pos = rep.nodeid - lines.append("FAIL %s" %(pos, )) + lines.append(format %(pos, )) def show_xfailed(terminalreporter, lines): xfailed = terminalreporter.stats.get("xfailed") diff --git a/_pytest/terminal.py b/_pytest/terminal.py index c96e635199..5245f339bb 100644 --- a/_pytest/terminal.py +++ b/_pytest/terminal.py @@ -15,7 +15,7 @@ def pytest_addoption(parser): group._addoption('-r', action="store", dest="reportchars", default=None, metavar="chars", help="show extra test summary info as specified by chars (f)ailed, " - "(s)skipped, (x)failed, (X)passed.") + "(E)error, (s)skipped, (x)failed, (X)passed.") group._addoption('-l', '--showlocals', action="store_true", dest="showlocals", default=False, help="show locals in tracebacks (disabled by default).") @@ -43,7 +43,8 @@ def pytest_configure(config): pass else: stdout = os.fdopen(newfd, stdout.mode, 1) - config._toclose = stdout + config._cleanup.append(lambda: stdout.close()) + reporter = TerminalReporter(config, stdout) config.pluginmanager.register(reporter, 'terminalreporter') if config.option.debug or config.option.traceconfig: @@ -52,11 +53,6 @@ def pytest_configure(config): reporter.write_line("[traceconfig] " + msg) config.trace.root.setprocessor("pytest:config", mywriter) -def pytest_unconfigure(config): - if hasattr(config, '_toclose'): - #print "closing", config._toclose, config._toclose.fileno() - config._toclose.close() - def getreportopt(config): reportopts = "" optvalue = config.option.report @@ -165,9 +161,6 @@ class TerminalReporter: def pytest_deselected(self, items): self.stats.setdefault('deselected', []).extend(items) - def pytest__teardown_final_logerror(self, report): - self.stats.setdefault("error", []).append(report) - def pytest_runtest_logstart(self, nodeid, location): # ensure that the path is printed before the # 1st test of a module starts running @@ -259,7 +252,7 @@ class TerminalReporter: msg = "platform %s -- Python %s" % (sys.platform, verinfo) if hasattr(sys, 'pypy_version_info'): verinfo = ".".join(map(str, sys.pypy_version_info[:3])) - msg += "[pypy-%s]" % verinfo + msg += "[pypy-%s-%s]" % (verinfo, sys.pypy_version_info[3]) msg += " -- pytest-%s" % (py.test.__version__) if self.verbosity > 0 or self.config.option.debug or \ getattr(self.config.option, 'pastebin', None): @@ -318,12 +311,17 @@ class TerminalReporter: self.config.hook.pytest_terminal_summary(terminalreporter=self) if exitstatus == 2: self._report_keyboardinterrupt() + del self._keyboardinterrupt_memo self.summary_deselected() self.summary_stats() def pytest_keyboard_interrupt(self, excinfo): self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True) + def pytest_unconfigure(self): + if hasattr(self, '_keyboardinterrupt_memo'): + self._report_keyboardinterrupt() + def _report_keyboardinterrupt(self): excrepr = self._keyboardinterrupt_memo msg = excrepr.reprcrash.message @@ -388,7 +386,7 @@ class TerminalReporter: else: msg = self._getfailureheadline(rep) self.write_sep("_", msg) - rep.toterminal(self._tw) + self._outrep_summary(rep) def summary_errors(self): if self.config.option.tbstyle != "no": @@ -406,7 +404,15 @@ class TerminalReporter: elif rep.when == "teardown": msg = "ERROR at teardown of " + msg self.write_sep("_", msg) - rep.toterminal(self._tw) + self._outrep_summary(rep) + + def _outrep_summary(self, rep): + rep.toterminal(self._tw) + for secname, content in rep.sections: + self._tw.sep("-", secname) + if content[-1:] == "\n": + content = content[:-1] + self._tw.line(content) def summary_stats(self): session_duration = py.std.time.time() - self._sessionstarttime @@ -417,9 +423,10 @@ class TerminalReporter: keys.append(key) parts = [] for key in keys: - val = self.stats.get(key, None) - if val: - parts.append("%d %s" %(len(val), key)) + if key: # setup/teardown reports have an empty key, ignore them + val = self.stats.get(key, None) + if val: + parts.append("%d %s" %(len(val), key)) line = ", ".join(parts) # XXX coloring msg = "%s in %.2f seconds" %(line, session_duration) @@ -430,8 +437,15 @@ class TerminalReporter: def summary_deselected(self): if 'deselected' in self.stats: + l = [] + k = self.config.option.keyword + if k: + l.append("-k%s" % k) + m = self.config.option.markexpr + if m: + l.append("-m %r" % m) self.write_sep("=", "%d tests deselected by %r" %( - len(self.stats['deselected']), self.config.option.keyword), bold=True) + len(self.stats['deselected']), " ".join(l)), bold=True) def repr_pythonversion(v=None): if v is None: diff --git a/_pytest/tmpdir.py b/_pytest/tmpdir.py index e46c713eb0..c67f4389eb 100644 --- a/_pytest/tmpdir.py +++ b/_pytest/tmpdir.py @@ -46,7 +46,7 @@ class TempdirHandler: def finish(self): self.trace("finish") - + def pytest_configure(config): mp = monkeypatch() t = TempdirHandler(config) @@ -64,5 +64,5 @@ def pytest_funcarg__tmpdir(request): name = request._pyfuncitem.name name = py.std.re.sub("[\W]", "_", name) x = request.config._tmpdirhandler.mktemp(name, numbered=True) - return x.realpath() + return x diff --git a/_pytest/unittest.py b/_pytest/unittest.py index 3bcf91e901..024c2998d8 100644 --- a/_pytest/unittest.py +++ b/_pytest/unittest.py @@ -2,6 +2,9 @@ import pytest, py import sys, pdb +# for transfering markers +from _pytest.python import transfer_markers + def pytest_pycollect_makeitem(collector, name, obj): unittest = sys.modules.get('unittest') if unittest is None: @@ -19,7 +22,14 @@ def pytest_pycollect_makeitem(collector, name, obj): class UnitTestCase(pytest.Class): def collect(self): loader = py.std.unittest.TestLoader() + module = self.getparent(pytest.Module).obj + cls = self.obj for name in loader.getTestCaseNames(self.obj): + x = getattr(self.obj, name) + funcobj = getattr(x, 'im_func', x) + transfer_markers(funcobj, cls, module) + if hasattr(funcobj, 'todo'): + pytest.mark.xfail(reason=str(funcobj.todo))(funcobj) yield TestCaseFunction(name, parent=self) def setup(self): @@ -37,15 +47,13 @@ class UnitTestCase(pytest.Class): class TestCaseFunction(pytest.Function): _excinfo = None - def __init__(self, name, parent): - super(TestCaseFunction, self).__init__(name, parent) - if hasattr(self._obj, 'todo'): - getattr(self._obj, 'im_func', self._obj).xfail = \ - pytest.mark.xfail(reason=str(self._obj.todo)) - def setup(self): self._testcase = self.parent.obj(self.name) self._obj = getattr(self._testcase, self.name) + if hasattr(self._testcase, 'skip'): + pytest.skip(self._testcase.skip) + if hasattr(self._obj, 'skip'): + pytest.skip(self._obj.skip) if hasattr(self._testcase, 'setup_method'): self._testcase.setup_method(self._obj) @@ -104,7 +112,10 @@ class TestCaseFunction(pytest.Function): def _prunetraceback(self, excinfo): pytest.Function._prunetraceback(self, excinfo) - excinfo.traceback = excinfo.traceback.filter(lambda x:not x.frame.f_globals.get('__unittest')) + traceback = excinfo.traceback.filter( + lambda x:not x.frame.f_globals.get('__unittest')) + if traceback: + excinfo.traceback = traceback @pytest.mark.tryfirst def pytest_runtest_makereport(item, call): @@ -120,14 +131,19 @@ def pytest_runtest_protocol(item, __multicall__): ut = sys.modules['twisted.python.failure'] Failure__init__ = ut.Failure.__init__.im_func check_testcase_implements_trial_reporter() - def excstore(self, exc_value=None, exc_type=None, exc_tb=None): + def excstore(self, exc_value=None, exc_type=None, exc_tb=None, + captureVars=None): if exc_value is None: self._rawexcinfo = sys.exc_info() else: if exc_type is None: exc_type = type(exc_value) self._rawexcinfo = (exc_type, exc_value, exc_tb) - Failure__init__(self, exc_value, exc_type, exc_tb) + try: + Failure__init__(self, exc_value, exc_type, exc_tb, + captureVars=captureVars) + except TypeError: + Failure__init__(self, exc_value, exc_type, exc_tb) ut.Failure.__init__ = excstore try: return __multicall__.execute() |