1382 lines
52 KiB
Python
1382 lines
52 KiB
Python
|
# Copyright 2002-2005 Vladimir Prus.
|
||
|
# Copyright 2002-2003 Dave Abrahams.
|
||
|
# Copyright 2006 Rene Ferdinand Rivera Morell.
|
||
|
# Distributed under the Boost Software License, Version 1.0.
|
||
|
# (See accompanying file LICENSE.txt or copy at
|
||
|
# https://www.bfgroup.xyz/b2/LICENSE.txt)
|
||
|
|
||
|
from __future__ import print_function
|
||
|
|
||
|
import TestCmd
|
||
|
|
||
|
import copy
|
||
|
import fnmatch
|
||
|
import glob
|
||
|
import math
|
||
|
import os
|
||
|
import os.path
|
||
|
import re
|
||
|
import shutil
|
||
|
try:
|
||
|
from StringIO import StringIO
|
||
|
except:
|
||
|
from io import StringIO
|
||
|
import subprocess
|
||
|
import sys
|
||
|
import tempfile
|
||
|
import time
|
||
|
import traceback
|
||
|
import tree
|
||
|
import types
|
||
|
|
||
|
from xml.sax.saxutils import escape
|
||
|
|
||
|
try:
|
||
|
from functools import reduce
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
|
||
|
def isstr(data):
|
||
|
return isinstance(data, (type(''), type(u'')))
|
||
|
|
||
|
|
||
|
class TestEnvironmentError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
annotations = []
|
||
|
|
||
|
|
||
|
def print_annotation(name, value, xml):
|
||
|
"""Writes some named bits of information about the current test run."""
|
||
|
if xml:
|
||
|
print(escape(name) + " {{{")
|
||
|
print(escape(value))
|
||
|
print("}}}")
|
||
|
else:
|
||
|
print(name + " {{{")
|
||
|
print(value)
|
||
|
print("}}}")
|
||
|
|
||
|
|
||
|
def flush_annotations(xml=0):
|
||
|
global annotations
|
||
|
for ann in annotations:
|
||
|
print_annotation(ann[0], ann[1], xml)
|
||
|
annotations = []
|
||
|
|
||
|
|
||
|
def clear_annotations():
|
||
|
global annotations
|
||
|
annotations = []
|
||
|
|
||
|
|
||
|
defer_annotations = 0
|
||
|
|
||
|
def set_defer_annotations(n):
|
||
|
global defer_annotations
|
||
|
defer_annotations = n
|
||
|
|
||
|
|
||
|
def annotate_stack_trace(tb=None):
|
||
|
if tb:
|
||
|
trace = TestCmd.caller(traceback.extract_tb(tb), 0)
|
||
|
else:
|
||
|
trace = TestCmd.caller(traceback.extract_stack(), 1)
|
||
|
annotation("stacktrace", trace)
|
||
|
|
||
|
|
||
|
def annotation(name, value):
|
||
|
"""Records an annotation about the test run."""
|
||
|
annotations.append((name, value))
|
||
|
if not defer_annotations:
|
||
|
flush_annotations()
|
||
|
|
||
|
|
||
|
def get_toolset():
|
||
|
toolset = None
|
||
|
for arg in sys.argv[1:]:
|
||
|
if not arg.startswith("-"):
|
||
|
toolset = arg
|
||
|
return toolset or "gcc"
|
||
|
|
||
|
|
||
|
# Detect the host OS.
|
||
|
cygwin = hasattr(os, "uname") and os.uname()[0].lower().startswith("cygwin")
|
||
|
windows = cygwin or os.environ.get("OS", "").lower().startswith("windows")
|
||
|
|
||
|
if cygwin:
|
||
|
default_os = "cygwin"
|
||
|
elif windows:
|
||
|
default_os = "windows"
|
||
|
elif hasattr(os, "uname"):
|
||
|
default_os = os.uname()[0].lower()
|
||
|
|
||
|
|
||
|
def expand_toolset(toolset, target_os=default_os):
|
||
|
match = re.match(r'^(clang|intel)(-[\d\.]+|)$', toolset)
|
||
|
if match:
|
||
|
if match.group(1) == "intel" and target_os == "windows":
|
||
|
return match.expand(r'\1-win\2')
|
||
|
elif target_os == "darwin":
|
||
|
return match.expand(r'\1-darwin\2')
|
||
|
else:
|
||
|
return match.expand(r'\1-linux\2')
|
||
|
|
||
|
return toolset
|
||
|
|
||
|
|
||
|
def prepare_prefixes_and_suffixes(toolset, target_os=default_os):
|
||
|
ind = toolset.find('-')
|
||
|
if ind == -1:
|
||
|
rtoolset = toolset
|
||
|
else:
|
||
|
rtoolset = toolset[:ind]
|
||
|
prepare_suffix_map(rtoolset, target_os)
|
||
|
prepare_library_prefix(rtoolset, target_os)
|
||
|
|
||
|
|
||
|
def prepare_suffix_map(toolset, target_os=default_os):
|
||
|
"""
|
||
|
Set up suffix translation performed by the Boost Build testing framework
|
||
|
to accommodate different toolsets generating targets of the same type using
|
||
|
different filename extensions (suffixes).
|
||
|
|
||
|
"""
|
||
|
global suffixes
|
||
|
suffixes = {}
|
||
|
if target_os == "cygwin":
|
||
|
suffixes[".lib"] = ".a"
|
||
|
suffixes[".obj"] = ".o"
|
||
|
suffixes[".implib"] = ".lib.a"
|
||
|
elif target_os == "windows":
|
||
|
if toolset == "gcc":
|
||
|
# MinGW
|
||
|
suffixes[".lib"] = ".a"
|
||
|
suffixes[".obj"] = ".o"
|
||
|
suffixes[".implib"] = ".dll.a"
|
||
|
else:
|
||
|
# Everything else Windows
|
||
|
suffixes[".implib"] = ".lib"
|
||
|
else:
|
||
|
suffixes[".exe"] = ""
|
||
|
suffixes[".dll"] = ".so"
|
||
|
suffixes[".lib"] = ".a"
|
||
|
suffixes[".obj"] = ".o"
|
||
|
suffixes[".implib"] = ".no_implib_files_on_this_platform"
|
||
|
|
||
|
if target_os == "darwin":
|
||
|
suffixes[".dll"] = ".dylib"
|
||
|
|
||
|
|
||
|
def prepare_library_prefix(toolset, target_os=default_os):
|
||
|
"""
|
||
|
Setup whether Boost Build is expected to automatically prepend prefixes
|
||
|
to its built library targets.
|
||
|
|
||
|
"""
|
||
|
global lib_prefix
|
||
|
lib_prefix = "lib"
|
||
|
|
||
|
global dll_prefix
|
||
|
if target_os == "cygwin":
|
||
|
dll_prefix = "cyg"
|
||
|
elif target_os == "windows" and toolset != "gcc":
|
||
|
dll_prefix = None
|
||
|
else:
|
||
|
dll_prefix = "lib"
|
||
|
|
||
|
|
||
|
def re_remove(sequence, regex):
|
||
|
me = re.compile(regex)
|
||
|
result = list(filter(lambda x: me.match(x), sequence))
|
||
|
if not result:
|
||
|
raise ValueError()
|
||
|
for r in result:
|
||
|
sequence.remove(r)
|
||
|
|
||
|
|
||
|
def glob_remove(sequence, pattern):
|
||
|
result = list(fnmatch.filter(sequence, pattern))
|
||
|
if not result:
|
||
|
raise ValueError()
|
||
|
for r in result:
|
||
|
sequence.remove(r)
|
||
|
|
||
|
|
||
|
class Tester(TestCmd.TestCmd):
|
||
|
"""Main tester class for Boost Build.
|
||
|
|
||
|
Optional arguments:
|
||
|
|
||
|
`arguments` - Arguments passed to the run executable.
|
||
|
`executable` - Name of the executable to invoke.
|
||
|
`match` - Function to use for compating actual and
|
||
|
expected file contents.
|
||
|
`boost_build_path` - Boost build path to be passed to the run
|
||
|
executable.
|
||
|
`translate_suffixes` - Whether to update suffixes on the the file
|
||
|
names passed from the test script so they
|
||
|
match those actually created by the current
|
||
|
toolset. For example, static library files
|
||
|
are specified by using the .lib suffix but
|
||
|
when the "gcc" toolset is used it actually
|
||
|
creates them using the .a suffix.
|
||
|
`pass_toolset` - Whether the test system should pass the
|
||
|
specified toolset to the run executable.
|
||
|
`use_test_config` - Whether the test system should tell the run
|
||
|
executable to read in the test_config.jam
|
||
|
configuration file.
|
||
|
`ignore_toolset_requirements` - Whether the test system should tell the run
|
||
|
executable to ignore toolset requirements.
|
||
|
`workdir` - Absolute directory where the test will be
|
||
|
run from.
|
||
|
`pass_d0` - If set, when tests are not explicitly run
|
||
|
in verbose mode, they are run as silent
|
||
|
(-d0 & --quiet Boost Jam options).
|
||
|
|
||
|
Optional arguments inherited from the base class:
|
||
|
|
||
|
`description` - Test description string displayed in case
|
||
|
of a failed test.
|
||
|
`subdir` - List of subdirectories to automatically
|
||
|
create under the working directory. Each
|
||
|
subdirectory needs to be specified
|
||
|
separately, parent coming before its child.
|
||
|
`verbose` - Flag that may be used to enable more
|
||
|
verbose test system output. Note that it
|
||
|
does not also enable more verbose build
|
||
|
system output like the --verbose command
|
||
|
line option does.
|
||
|
"""
|
||
|
def __init__(self, arguments=None, executable=None,
|
||
|
match=TestCmd.match_exact, boost_build_path=None,
|
||
|
translate_suffixes=True, pass_toolset=True, use_test_config=True,
|
||
|
ignore_toolset_requirements=False, workdir="", pass_d0=False,
|
||
|
**keywords):
|
||
|
|
||
|
if not executable:
|
||
|
executable = os.getenv('B2')
|
||
|
if not executable:
|
||
|
executable = 'b2'
|
||
|
|
||
|
assert arguments.__class__ is not str
|
||
|
self.original_workdir = os.path.dirname(__file__)
|
||
|
if workdir and not os.path.isabs(workdir):
|
||
|
raise ("Parameter workdir <%s> must point to an absolute "
|
||
|
"directory: " % workdir)
|
||
|
|
||
|
self.last_build_timestamp = 0
|
||
|
self.translate_suffixes = translate_suffixes
|
||
|
self.use_test_config = use_test_config
|
||
|
|
||
|
self.toolset = get_toolset()
|
||
|
self.expanded_toolset = expand_toolset(self.toolset)
|
||
|
self.pass_toolset = pass_toolset
|
||
|
self.ignore_toolset_requirements = ignore_toolset_requirements
|
||
|
|
||
|
prepare_prefixes_and_suffixes(pass_toolset and self.toolset or "gcc")
|
||
|
|
||
|
use_default_bjam = "--default-bjam" in sys.argv
|
||
|
|
||
|
if not use_default_bjam:
|
||
|
jam_build_dir = ""
|
||
|
|
||
|
# Find where jam_src is located. Try for the debug version if it is
|
||
|
# lying around.
|
||
|
srcdir = os.path.join(os.path.dirname(__file__), "..", "src")
|
||
|
dirs = [os.path.join(srcdir, "engine", jam_build_dir + ".debug"),
|
||
|
os.path.join(srcdir, "engine", jam_build_dir)]
|
||
|
for d in dirs:
|
||
|
if os.path.exists(d):
|
||
|
jam_build_dir = d
|
||
|
break
|
||
|
else:
|
||
|
print("Cannot find built Boost.Jam")
|
||
|
sys.exit(1)
|
||
|
|
||
|
verbosity = ["-d0", "--quiet"]
|
||
|
if not pass_d0:
|
||
|
verbosity = []
|
||
|
if "--verbose" in sys.argv:
|
||
|
keywords["verbose"] = True
|
||
|
verbosity = ["-d2"]
|
||
|
self.verbosity = verbosity
|
||
|
|
||
|
if boost_build_path is None:
|
||
|
boost_build_path = self.original_workdir + "/.."
|
||
|
|
||
|
program_list = []
|
||
|
if use_default_bjam:
|
||
|
program_list.append(executable)
|
||
|
else:
|
||
|
program_list.append(os.path.join(jam_build_dir, executable))
|
||
|
program_list.append('-sBOOST_BUILD_PATH="' + boost_build_path + '"')
|
||
|
if arguments:
|
||
|
program_list += arguments
|
||
|
|
||
|
TestCmd.TestCmd.__init__(self, program=program_list, match=match,
|
||
|
workdir=workdir, inpath=use_default_bjam, **keywords)
|
||
|
|
||
|
os.chdir(self.workdir)
|
||
|
|
||
|
def cleanup(self):
|
||
|
try:
|
||
|
TestCmd.TestCmd.cleanup(self)
|
||
|
os.chdir(self.original_workdir)
|
||
|
except AttributeError:
|
||
|
# When this is called during TestCmd.TestCmd.__del__ we can have
|
||
|
# both 'TestCmd' and 'os' unavailable in our scope. Do nothing in
|
||
|
# this case.
|
||
|
pass
|
||
|
|
||
|
def set_toolset(self, toolset, target_os=default_os):
|
||
|
self.toolset = toolset
|
||
|
self.expanded_toolset = expand_toolset(toolset, target_os)
|
||
|
self.pass_toolset = True
|
||
|
prepare_prefixes_and_suffixes(toolset, target_os)
|
||
|
|
||
|
|
||
|
#
|
||
|
# Methods that change the working directory's content.
|
||
|
#
|
||
|
def set_tree(self, tree_location):
|
||
|
# It is not possible to remove the current directory.
|
||
|
d = os.getcwd()
|
||
|
os.chdir(os.path.dirname(self.workdir))
|
||
|
shutil.rmtree(self.workdir, ignore_errors=False)
|
||
|
|
||
|
if not os.path.isabs(tree_location):
|
||
|
tree_location = os.path.join(self.original_workdir, tree_location)
|
||
|
shutil.copytree(tree_location, self.workdir)
|
||
|
|
||
|
os.chdir(d)
|
||
|
def make_writable(unused, dir, entries):
|
||
|
for e in entries:
|
||
|
name = os.path.join(dir, e)
|
||
|
os.chmod(name, os.stat(name).st_mode | 0o222)
|
||
|
for root, _, files in os.walk("."):
|
||
|
make_writable(None, root, files)
|
||
|
|
||
|
def write(self, file, content, wait=True):
|
||
|
nfile = self.native_file_name(file)
|
||
|
self.__makedirs(os.path.dirname(nfile), wait)
|
||
|
if not type(content) == bytes:
|
||
|
content = content.encode()
|
||
|
f = open(nfile, "wb")
|
||
|
try:
|
||
|
f.write(content)
|
||
|
finally:
|
||
|
f.close()
|
||
|
self.__ensure_newer_than_last_build(nfile)
|
||
|
|
||
|
def rename(self, src, dst):
|
||
|
src_name = self.native_file_name(src)
|
||
|
dst_name = self.native_file_name(dst)
|
||
|
os.rename(src_name, dst_name)
|
||
|
|
||
|
def copy(self, src, dst):
|
||
|
try:
|
||
|
self.write(dst, self.read(src, binary=True))
|
||
|
except:
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def copy_timestamp(self, src, dst):
|
||
|
src_name = self.native_file_name(src)
|
||
|
dst_name = self.native_file_name(dst)
|
||
|
shutil.copystat(src_name, dst_name)
|
||
|
|
||
|
def copy_preserving_timestamp(self, src, dst):
|
||
|
src_name = self.native_file_name(src)
|
||
|
dst_name = self.native_file_name(dst)
|
||
|
shutil.copy2(src_name, dst_name)
|
||
|
|
||
|
def touch(self, names, wait=True):
|
||
|
if isstr(names):
|
||
|
names = [names]
|
||
|
for name in names:
|
||
|
path = self.native_file_name(name)
|
||
|
if wait:
|
||
|
self.__ensure_newer_than_last_build(path)
|
||
|
else:
|
||
|
os.utime(path, None)
|
||
|
|
||
|
def rm(self, names):
|
||
|
if not type(names) == list:
|
||
|
names = [names]
|
||
|
|
||
|
if names == ["."]:
|
||
|
# If we are deleting the entire workspace, there is no need to wait
|
||
|
# for a clock tick.
|
||
|
self.last_build_timestamp = 0
|
||
|
|
||
|
# Avoid attempts to remove the current directory.
|
||
|
os.chdir(self.original_workdir)
|
||
|
for name in names:
|
||
|
n = glob.glob(self.native_file_name(name))
|
||
|
if n: n = n[0]
|
||
|
if not n:
|
||
|
n = self.glob_file(name.replace("$toolset", self.expanded_toolset + "*")
|
||
|
)
|
||
|
if n:
|
||
|
if os.path.isdir(n):
|
||
|
shutil.rmtree(n, ignore_errors=False)
|
||
|
else:
|
||
|
os.unlink(n)
|
||
|
|
||
|
# Create working dir root again in case we removed it.
|
||
|
if not os.path.exists(self.workdir):
|
||
|
os.mkdir(self.workdir)
|
||
|
os.chdir(self.workdir)
|
||
|
|
||
|
def expand_toolset(self, name):
|
||
|
"""
|
||
|
Expands $toolset placeholder in the given file to the name of the
|
||
|
toolset currently being tested.
|
||
|
|
||
|
"""
|
||
|
self.write(name, self.read(name).replace("$toolset", self.expanded_toolset))
|
||
|
|
||
|
def dump_stdio(self):
|
||
|
annotation("STDOUT", self.stdout())
|
||
|
annotation("STDERR", self.stderr())
|
||
|
|
||
|
def run_build_system(self, extra_args=None, subdir="", stdout=None,
|
||
|
stderr="", status=0, match=None, pass_toolset=None,
|
||
|
use_test_config=None, ignore_toolset_requirements=None,
|
||
|
expected_duration=None, **kw):
|
||
|
|
||
|
assert extra_args.__class__ is not str
|
||
|
|
||
|
if os.path.isabs(subdir):
|
||
|
raise ValueError(
|
||
|
"You must pass a relative directory to subdir <%s>." % subdir)
|
||
|
|
||
|
self.previous_tree, dummy = tree.build_tree(self.workdir)
|
||
|
self.wait_for_time_change_since_last_build()
|
||
|
|
||
|
if match is None:
|
||
|
match = self.match
|
||
|
|
||
|
if pass_toolset is None:
|
||
|
pass_toolset = self.pass_toolset
|
||
|
|
||
|
if use_test_config is None:
|
||
|
use_test_config = self.use_test_config
|
||
|
|
||
|
if ignore_toolset_requirements is None:
|
||
|
ignore_toolset_requirements = self.ignore_toolset_requirements
|
||
|
|
||
|
try:
|
||
|
kw["program"] = []
|
||
|
kw["program"] += self.program
|
||
|
if extra_args:
|
||
|
kw["program"] += extra_args
|
||
|
if not extra_args or not any(a.startswith("-j") for a in extra_args):
|
||
|
kw["program"] += ["-j1"]
|
||
|
if stdout is None and not any(a.startswith("-d") for a in kw["program"]):
|
||
|
kw["program"] += self.verbosity
|
||
|
if pass_toolset:
|
||
|
kw["program"].append("toolset=" + self.toolset)
|
||
|
if use_test_config:
|
||
|
kw["program"].append('--test-config="%s"' % os.path.join(
|
||
|
self.original_workdir, "test-config.jam"))
|
||
|
if ignore_toolset_requirements:
|
||
|
kw["program"].append("--ignore-toolset-requirements")
|
||
|
if "--python" in sys.argv:
|
||
|
# -z disables Python optimization mode.
|
||
|
# this enables type checking (all assert
|
||
|
# and if __debug__ statements).
|
||
|
kw["program"].extend(["--python", "-z"])
|
||
|
if "--stacktrace" in sys.argv:
|
||
|
kw["program"].append("--stacktrace")
|
||
|
kw["chdir"] = subdir
|
||
|
self.last_program_invocation = kw["program"]
|
||
|
build_time_start = time.time()
|
||
|
TestCmd.TestCmd.run(self, **kw)
|
||
|
build_time_finish = time.time()
|
||
|
except:
|
||
|
self.dump_stdio()
|
||
|
raise
|
||
|
|
||
|
old_last_build_timestamp = self.last_build_timestamp
|
||
|
self.tree, self.last_build_timestamp = tree.build_tree(self.workdir)
|
||
|
self.difference = tree.tree_difference(self.previous_tree, self.tree)
|
||
|
if self.difference.empty():
|
||
|
# If nothing has been changed by this build and sufficient time has
|
||
|
# passed since the last build that actually changed something,
|
||
|
# there is no need to wait for touched or newly created files to
|
||
|
# start getting newer timestamps than the currently existing ones.
|
||
|
self.last_build_timestamp = old_last_build_timestamp
|
||
|
|
||
|
self.difference.ignore_directories()
|
||
|
self.unexpected_difference = copy.deepcopy(self.difference)
|
||
|
|
||
|
if (status and self.status) is not None and self.status != status:
|
||
|
expect = ""
|
||
|
if status != 0:
|
||
|
expect = " (expected %d)" % status
|
||
|
|
||
|
annotation("failure", '"%s" returned %d%s' % (kw["program"],
|
||
|
self.status, expect))
|
||
|
|
||
|
annotation("reason", "unexpected status returned by bjam")
|
||
|
self.fail_test(1)
|
||
|
|
||
|
if stdout is not None and not match(self.stdout(), stdout):
|
||
|
stdout_test = match(self.stdout(), stdout)
|
||
|
annotation("failure", "Unexpected stdout")
|
||
|
annotation("Expected STDOUT", stdout)
|
||
|
annotation("Actual STDOUT", self.stdout())
|
||
|
stderr = self.stderr()
|
||
|
if stderr:
|
||
|
annotation("STDERR", stderr)
|
||
|
self.maybe_do_diff(self.stdout(), stdout, stdout_test)
|
||
|
self.fail_test(1, dump_stdio=False)
|
||
|
|
||
|
# Intel tends to produce some messages to stderr which make tests fail.
|
||
|
intel_workaround = re.compile("^xi(link|lib): executing.*\n", re.M)
|
||
|
actual_stderr = re.sub(intel_workaround, "", self.stderr())
|
||
|
|
||
|
if stderr is not None and not match(actual_stderr, stderr):
|
||
|
stderr_test = match(actual_stderr, stderr)
|
||
|
annotation("failure", "Unexpected stderr")
|
||
|
annotation("Expected STDERR", stderr)
|
||
|
annotation("Actual STDERR", self.stderr())
|
||
|
annotation("STDOUT", self.stdout())
|
||
|
self.maybe_do_diff(actual_stderr, stderr, stderr_test)
|
||
|
self.fail_test(1, dump_stdio=False)
|
||
|
|
||
|
if expected_duration is not None:
|
||
|
actual_duration = build_time_finish - build_time_start
|
||
|
if actual_duration > expected_duration:
|
||
|
print("Test run lasted %f seconds while it was expected to "
|
||
|
"finish in under %f seconds." % (actual_duration,
|
||
|
expected_duration))
|
||
|
self.fail_test(1, dump_stdio=False)
|
||
|
|
||
|
self.__ignore_junk()
|
||
|
|
||
|
def glob_file(self, name):
|
||
|
name = self.adjust_name(name)
|
||
|
result = None
|
||
|
if hasattr(self, "difference"):
|
||
|
for f in (self.difference.added_files +
|
||
|
self.difference.modified_files +
|
||
|
self.difference.touched_files):
|
||
|
if fnmatch.fnmatch(f, name):
|
||
|
result = self.__native_file_name(f)
|
||
|
break
|
||
|
if not result:
|
||
|
result = glob.glob(self.__native_file_name(name))
|
||
|
if result:
|
||
|
result = result[0]
|
||
|
return result
|
||
|
|
||
|
def __read(self, name, binary=False):
|
||
|
try:
|
||
|
openMode = "r"
|
||
|
if binary:
|
||
|
openMode += "b"
|
||
|
else:
|
||
|
openMode += "U"
|
||
|
f = open(name, openMode)
|
||
|
result = f.read()
|
||
|
f.close()
|
||
|
return result
|
||
|
except:
|
||
|
annotation("failure", "Could not open '%s'" % name)
|
||
|
self.fail_test(1)
|
||
|
return ""
|
||
|
|
||
|
def read(self, name, binary=False):
|
||
|
name = self.glob_file(name)
|
||
|
return self.__read(name, binary=binary)
|
||
|
|
||
|
def read_and_strip(self, name):
|
||
|
if not self.glob_file(name):
|
||
|
return ""
|
||
|
f = open(self.glob_file(name), "rb")
|
||
|
lines = f.readlines()
|
||
|
f.close()
|
||
|
result = "\n".join(x.decode().rstrip() for x in lines)
|
||
|
if lines and lines[-1][-1] != "\n":
|
||
|
return result + "\n"
|
||
|
return result
|
||
|
|
||
|
def fail_test(self, condition, dump_difference=True, dump_stdio=True,
|
||
|
dump_stack=True):
|
||
|
if not condition:
|
||
|
return
|
||
|
|
||
|
if dump_difference and hasattr(self, "difference"):
|
||
|
f = StringIO()
|
||
|
self.difference.pprint(f)
|
||
|
annotation("changes caused by the last build command",
|
||
|
f.getvalue())
|
||
|
|
||
|
if dump_stdio:
|
||
|
self.dump_stdio()
|
||
|
|
||
|
if "--preserve" in sys.argv:
|
||
|
print()
|
||
|
print("*** Copying the state of working dir into 'failed_test' ***")
|
||
|
print()
|
||
|
path = os.path.join(self.original_workdir, "failed_test")
|
||
|
if os.path.isdir(path):
|
||
|
shutil.rmtree(path, ignore_errors=False)
|
||
|
elif os.path.exists(path):
|
||
|
raise "Path " + path + " already exists and is not a directory"
|
||
|
shutil.copytree(self.workdir, path)
|
||
|
print("The failed command was:")
|
||
|
print(" ".join(self.last_program_invocation))
|
||
|
|
||
|
if dump_stack:
|
||
|
annotate_stack_trace()
|
||
|
sys.exit(1)
|
||
|
|
||
|
# A number of methods below check expectations with actual difference
|
||
|
# between directory trees before and after a build. All the 'expect*'
|
||
|
# methods require exact names to be passed. All the 'ignore*' methods allow
|
||
|
# wildcards.
|
||
|
|
||
|
# All names can be either a string or a list of strings.
|
||
|
def expect_addition(self, names):
|
||
|
for name in self.adjust_names(names):
|
||
|
try:
|
||
|
glob_remove(self.unexpected_difference.added_files, name)
|
||
|
except:
|
||
|
annotation("failure", "File %s not added as expected" % name)
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def ignore_addition(self, wildcard):
|
||
|
self.__ignore_elements(self.unexpected_difference.added_files,
|
||
|
wildcard)
|
||
|
|
||
|
def expect_removal(self, names):
|
||
|
for name in self.adjust_names(names):
|
||
|
try:
|
||
|
glob_remove(self.unexpected_difference.removed_files, name)
|
||
|
except:
|
||
|
annotation("failure", "File %s not removed as expected" % name)
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def ignore_removal(self, wildcard):
|
||
|
self.__ignore_elements(self.unexpected_difference.removed_files,
|
||
|
wildcard)
|
||
|
|
||
|
def expect_modification(self, names):
|
||
|
for name in self.adjust_names(names):
|
||
|
try:
|
||
|
glob_remove(self.unexpected_difference.modified_files, name)
|
||
|
except:
|
||
|
annotation("failure", "File %s not modified as expected" %
|
||
|
name)
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def ignore_modification(self, wildcard):
|
||
|
self.__ignore_elements(self.unexpected_difference.modified_files,
|
||
|
wildcard)
|
||
|
|
||
|
def expect_touch(self, names):
|
||
|
d = self.unexpected_difference
|
||
|
for name in self.adjust_names(names):
|
||
|
# We need to check both touched and modified files. The reason is
|
||
|
# that:
|
||
|
# (1) Windows binaries such as obj, exe or dll files have slight
|
||
|
# differences even with identical inputs due to Windows PE
|
||
|
# format headers containing an internal timestamp.
|
||
|
# (2) Intel's compiler for Linux has the same behaviour.
|
||
|
filesets = [d.modified_files, d.touched_files]
|
||
|
|
||
|
while filesets:
|
||
|
try:
|
||
|
glob_remove(filesets[-1], name)
|
||
|
break
|
||
|
except ValueError:
|
||
|
filesets.pop()
|
||
|
|
||
|
if not filesets:
|
||
|
annotation("failure", "File %s not touched as expected" % name)
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def ignore_touch(self, wildcard):
|
||
|
self.__ignore_elements(self.unexpected_difference.touched_files,
|
||
|
wildcard)
|
||
|
|
||
|
def ignore(self, wildcard):
|
||
|
self.ignore_addition(wildcard)
|
||
|
self.ignore_removal(wildcard)
|
||
|
self.ignore_modification(wildcard)
|
||
|
self.ignore_touch(wildcard)
|
||
|
|
||
|
def expect_nothing(self, names):
|
||
|
for name in self.adjust_names(names):
|
||
|
if name in self.difference.added_files:
|
||
|
annotation("failure",
|
||
|
"File %s added, but no action was expected" % name)
|
||
|
self.fail_test(1)
|
||
|
if name in self.difference.removed_files:
|
||
|
annotation("failure",
|
||
|
"File %s removed, but no action was expected" % name)
|
||
|
self.fail_test(1)
|
||
|
pass
|
||
|
if name in self.difference.modified_files:
|
||
|
annotation("failure",
|
||
|
"File %s modified, but no action was expected" % name)
|
||
|
self.fail_test(1)
|
||
|
if name in self.difference.touched_files:
|
||
|
annotation("failure",
|
||
|
"File %s touched, but no action was expected" % name)
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def __ignore_junk(self):
|
||
|
# Not totally sure about this change, but I do not see a good
|
||
|
# alternative.
|
||
|
if windows:
|
||
|
self.ignore("*.ilk") # MSVC incremental linking files.
|
||
|
self.ignore("*.pdb") # MSVC program database files.
|
||
|
self.ignore("*.rsp") # Response files.
|
||
|
self.ignore("*.tds") # Borland debug symbols.
|
||
|
self.ignore("*.manifest") # MSVC DLL manifests.
|
||
|
self.ignore("bin/standalone/msvc/*/msvc-setup.bat")
|
||
|
|
||
|
# Debug builds of bjam built with gcc produce this profiling data.
|
||
|
self.ignore("gmon.out")
|
||
|
self.ignore("*/gmon.out")
|
||
|
|
||
|
# Boost Build's 'configure' functionality (unfinished at the time)
|
||
|
# produces this file.
|
||
|
self.ignore("bin/config.log")
|
||
|
self.ignore("bin/project-cache.jam")
|
||
|
|
||
|
# Compiled Python files created when running Python based Boost Build.
|
||
|
self.ignore("*.pyc")
|
||
|
|
||
|
# OSX/Darwin files and dirs.
|
||
|
self.ignore("*.dSYM/*")
|
||
|
|
||
|
def expect_nothing_more(self):
|
||
|
if not self.unexpected_difference.empty():
|
||
|
annotation("failure", "Unexpected changes found")
|
||
|
output = StringIO()
|
||
|
self.unexpected_difference.pprint(output)
|
||
|
annotation("unexpected changes", output.getvalue())
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def expect_output_lines(self, lines, expected=True):
|
||
|
self.__expect_lines(self.stdout(), lines, expected)
|
||
|
|
||
|
def expect_content_lines(self, filename, line, expected=True):
|
||
|
self.__expect_lines(self.read_and_strip(filename), line, expected)
|
||
|
|
||
|
def expect_content(self, name, content, exact=False):
|
||
|
actual = self.read(name)
|
||
|
content = content.replace("$toolset", self.expanded_toolset + "*")
|
||
|
|
||
|
matched = False
|
||
|
if exact:
|
||
|
matched = fnmatch.fnmatch(actual, content)
|
||
|
else:
|
||
|
def sorted_(z):
|
||
|
z.sort(key=lambda x: x.lower().replace("\\", "/"))
|
||
|
return z
|
||
|
actual_ = list(map(lambda x: sorted_(x.split()), actual.splitlines()))
|
||
|
content_ = list(map(lambda x: sorted_(x.split()), content.splitlines()))
|
||
|
if len(actual_) == len(content_):
|
||
|
matched = map(
|
||
|
lambda x, y: map(lambda n, p: fnmatch.fnmatch(n, p), x, y),
|
||
|
actual_, content_)
|
||
|
matched = reduce(
|
||
|
lambda x, y: x and reduce(
|
||
|
lambda a, b: a and b,
|
||
|
y, True),
|
||
|
matched, True)
|
||
|
|
||
|
if not matched:
|
||
|
print("Expected:\n")
|
||
|
print(content)
|
||
|
print("Got:\n")
|
||
|
print(actual)
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def maybe_do_diff(self, actual, expected, result=None):
|
||
|
if os.environ.get("DO_DIFF"):
|
||
|
e = tempfile.mktemp("expected")
|
||
|
a = tempfile.mktemp("actual")
|
||
|
f = open(e, "w")
|
||
|
f.write(expected)
|
||
|
f.close()
|
||
|
f = open(a, "w")
|
||
|
f.write(actual)
|
||
|
f.close()
|
||
|
print("DIFFERENCE")
|
||
|
# Current diff should return 1 to indicate 'different input files'
|
||
|
# but some older diff versions may return 0 and depending on the
|
||
|
# exact Python/OS platform version, os.system() call may gobble up
|
||
|
# the external process's return code and return 0 itself.
|
||
|
if os.system('diff -u "%s" "%s"' % (e, a)) not in [0, 1]:
|
||
|
print('Unable to compute difference: diff -u "%s" "%s"' % (e, a
|
||
|
))
|
||
|
os.unlink(e)
|
||
|
os.unlink(a)
|
||
|
elif type(result) is TestCmd.MatchError:
|
||
|
print(result.message)
|
||
|
else:
|
||
|
print("Set environmental variable 'DO_DIFF' to examine the "
|
||
|
"difference.")
|
||
|
|
||
|
# Internal methods.
|
||
|
def adjust_lib_name(self, name):
|
||
|
global lib_prefix
|
||
|
global dll_prefix
|
||
|
result = name
|
||
|
|
||
|
pos = name.rfind(".")
|
||
|
if pos != -1:
|
||
|
suffix = name[pos:]
|
||
|
if suffix == ".lib":
|
||
|
(head, tail) = os.path.split(name)
|
||
|
if lib_prefix:
|
||
|
tail = lib_prefix + tail
|
||
|
result = os.path.join(head, tail)
|
||
|
elif suffix == ".dll" or suffix == ".implib":
|
||
|
(head, tail) = os.path.split(name)
|
||
|
if dll_prefix:
|
||
|
tail = dll_prefix + tail
|
||
|
result = os.path.join(head, tail)
|
||
|
# If we want to use this name in a Jamfile, we better convert \ to /,
|
||
|
# as otherwise we would have to quote \.
|
||
|
result = result.replace("\\", "/")
|
||
|
return result
|
||
|
|
||
|
def adjust_suffix(self, name):
|
||
|
if not self.translate_suffixes:
|
||
|
return name
|
||
|
pos = name.rfind(".")
|
||
|
if pos == -1:
|
||
|
return name
|
||
|
suffix = name[pos:]
|
||
|
return name[:pos] + suffixes.get(suffix, suffix)
|
||
|
|
||
|
# Acceps either a string or a list of strings and returns a list of
|
||
|
# strings. Adjusts suffixes on all names.
|
||
|
def adjust_names(self, names):
|
||
|
if isstr(names):
|
||
|
names = [names]
|
||
|
r = map(self.adjust_lib_name, names)
|
||
|
r = map(self.adjust_suffix, r)
|
||
|
r = map(lambda x, t=self.expanded_toolset: x.replace("$toolset", t + "*"), r)
|
||
|
return list(r)
|
||
|
|
||
|
def adjust_name(self, name):
|
||
|
return self.adjust_names(name)[0]
|
||
|
|
||
|
def __native_file_name(self, name):
|
||
|
return os.path.normpath(os.path.join(self.workdir, *name.split("/")))
|
||
|
|
||
|
def native_file_name(self, name):
|
||
|
return self.__native_file_name(self.adjust_name(name))
|
||
|
|
||
|
def wait_for_time_change(self, path, touch):
|
||
|
"""
|
||
|
Wait for newly assigned file system modification timestamps for the
|
||
|
given path to become large enough for the timestamp difference to be
|
||
|
correctly recognized by both this Python based testing framework and
|
||
|
the Boost Jam executable being tested. May optionally touch the given
|
||
|
path to set its modification timestamp to the new value.
|
||
|
|
||
|
"""
|
||
|
self.__wait_for_time_change(path, touch, last_build_time=False)
|
||
|
|
||
|
def wait_for_time_change_since_last_build(self):
|
||
|
"""
|
||
|
Wait for newly assigned file system modification timestamps to
|
||
|
become large enough for the timestamp difference to be
|
||
|
correctly recognized by the Python based testing framework.
|
||
|
Does not care about Jam's timestamp resolution, since we
|
||
|
only need this to detect touched files.
|
||
|
"""
|
||
|
if self.last_build_timestamp:
|
||
|
timestamp_file = "timestamp-3df2f2317e15e4a9"
|
||
|
open(timestamp_file, "wb").close()
|
||
|
self.__wait_for_time_change_impl(timestamp_file,
|
||
|
self.last_build_timestamp,
|
||
|
self.__python_timestamp_resolution(timestamp_file, 0), 0)
|
||
|
os.unlink(timestamp_file)
|
||
|
|
||
|
def __build_timestamp_resolution(self):
|
||
|
"""
|
||
|
Returns the minimum path modification timestamp resolution supported
|
||
|
by the used Boost Jam executable.
|
||
|
|
||
|
"""
|
||
|
dir = tempfile.mkdtemp("bjam_version_info")
|
||
|
try:
|
||
|
jam_script = "timestamp_resolution.jam"
|
||
|
f = open(os.path.join(dir, jam_script), "w")
|
||
|
try:
|
||
|
f.write("EXIT $(JAM_TIMESTAMP_RESOLUTION) : 0 ;")
|
||
|
finally:
|
||
|
f.close()
|
||
|
p = subprocess.Popen([self.program[0], "-d0", "-f%s" % jam_script],
|
||
|
stdout=subprocess.PIPE, cwd=dir, universal_newlines=True)
|
||
|
out, err = p.communicate()
|
||
|
finally:
|
||
|
shutil.rmtree(dir, ignore_errors=False)
|
||
|
|
||
|
if p.returncode != 0:
|
||
|
raise TestEnvironmentError("Unexpected return code (%s) when "
|
||
|
"detecting Boost Jam's minimum supported path modification "
|
||
|
"timestamp resolution version information." % p.returncode)
|
||
|
if err:
|
||
|
raise TestEnvironmentError("Unexpected error output (%s) when "
|
||
|
"detecting Boost Jam's minimum supported path modification "
|
||
|
"timestamp resolution version information." % err)
|
||
|
|
||
|
r = re.match("([0-9]{2}):([0-9]{2}):([0-9]{2}\\.[0-9]{9})$", out)
|
||
|
if not r:
|
||
|
# Older Boost Jam versions did not report their minimum supported
|
||
|
# path modification timestamp resolution and did not actually
|
||
|
# support path modification timestamp resolutions finer than 1
|
||
|
# second.
|
||
|
# TODO: Phase this support out to avoid such fallback code from
|
||
|
# possibly covering up other problems.
|
||
|
return 1
|
||
|
if r.group(1) != "00" or r.group(2) != "00": # hours, minutes
|
||
|
raise TestEnvironmentError("Boost Jam with too coarse minimum "
|
||
|
"supported path modification timestamp resolution (%s:%s:%s)."
|
||
|
% (r.group(1), r.group(2), r.group(3)))
|
||
|
return float(r.group(3)) # seconds.nanoseconds
|
||
|
|
||
|
def __ensure_newer_than_last_build(self, path):
|
||
|
"""
|
||
|
Updates the given path's modification timestamp after waiting for the
|
||
|
newly assigned file system modification timestamp to become large
|
||
|
enough for the timestamp difference between it and the last build
|
||
|
timestamp to be correctly recognized by both this Python based testing
|
||
|
framework and the Boost Jam executable being tested. Does nothing if
|
||
|
there is no 'last build' information available.
|
||
|
|
||
|
"""
|
||
|
if self.last_build_timestamp:
|
||
|
self.__wait_for_time_change(path, touch=True, last_build_time=True)
|
||
|
|
||
|
def __expect_lines(self, data, lines, expected):
|
||
|
"""
|
||
|
Checks whether the given data contains the given lines.
|
||
|
|
||
|
Data may be specified as a single string containing text lines
|
||
|
separated by newline characters.
|
||
|
|
||
|
Lines may be specified in any of the following forms:
|
||
|
* Single string containing text lines separated by newlines - the
|
||
|
given lines are searched for in the given data without any extra
|
||
|
data lines between them.
|
||
|
* Container of strings containing text lines separated by newlines
|
||
|
- the given lines are searched for in the given data with extra
|
||
|
data lines allowed between lines belonging to different strings.
|
||
|
* Container of strings containing text lines separated by newlines
|
||
|
and containers containing strings - the same as above with the
|
||
|
internal containers containing strings being interpreted as if
|
||
|
all their content was joined together into a single string
|
||
|
separated by newlines.
|
||
|
|
||
|
A newline at the end of any multi-line lines string is interpreted as
|
||
|
an expected extra trailig empty line.
|
||
|
"""
|
||
|
# str.splitlines() trims at most one trailing newline while we want the
|
||
|
# trailing newline to indicate that there should be an extra empty line
|
||
|
# at the end.
|
||
|
def splitlines(x):
|
||
|
return (x + "\n").splitlines()
|
||
|
|
||
|
if data is None:
|
||
|
data = []
|
||
|
elif isstr(data):
|
||
|
data = splitlines(data)
|
||
|
|
||
|
if isstr(lines):
|
||
|
lines = [splitlines(lines)]
|
||
|
else:
|
||
|
expanded = []
|
||
|
for x in lines:
|
||
|
if isstr(x):
|
||
|
x = splitlines(x)
|
||
|
expanded.append(x)
|
||
|
lines = expanded
|
||
|
|
||
|
if _contains_lines(data, lines) != bool(expected):
|
||
|
output = []
|
||
|
if expected:
|
||
|
output = ["Did not find expected lines:"]
|
||
|
else:
|
||
|
output = ["Found unexpected lines:"]
|
||
|
first = True
|
||
|
for line_sequence in lines:
|
||
|
if line_sequence:
|
||
|
if first:
|
||
|
first = False
|
||
|
else:
|
||
|
output.append("...")
|
||
|
output.extend(" > " + line for line in line_sequence)
|
||
|
output.append("in output:")
|
||
|
output.extend(" > " + line for line in data)
|
||
|
annotation("failure", "\n".join(output))
|
||
|
self.fail_test(1)
|
||
|
|
||
|
def __ignore_elements(self, things, wildcard):
|
||
|
"""Removes in-place 'things' elements matching the given 'wildcard'."""
|
||
|
things[:] = list(filter(lambda x: not fnmatch.fnmatch(x, wildcard), things))
|
||
|
|
||
|
def __makedirs(self, path, wait):
|
||
|
"""
|
||
|
Creates a folder with the given path, together with any missing
|
||
|
parent folders. If WAIT is set, makes sure any newly created folders
|
||
|
have modification timestamps newer than the ones left behind by the
|
||
|
last build run.
|
||
|
|
||
|
"""
|
||
|
try:
|
||
|
if wait:
|
||
|
stack = []
|
||
|
while path and path not in stack and not os.path.isdir(path):
|
||
|
stack.append(path)
|
||
|
path = os.path.dirname(path)
|
||
|
while stack:
|
||
|
path = stack.pop()
|
||
|
os.mkdir(path)
|
||
|
self.__ensure_newer_than_last_build(path)
|
||
|
else:
|
||
|
os.makedirs(path)
|
||
|
except Exception:
|
||
|
pass
|
||
|
|
||
|
def __python_timestamp_resolution(self, path, minimum_resolution):
|
||
|
"""
|
||
|
Returns the modification timestamp resolution for the given path
|
||
|
supported by the used Python interpreter/OS/filesystem combination.
|
||
|
Will not check for resolutions less than the given minimum value. Will
|
||
|
change the path's modification timestamp in the process.
|
||
|
|
||
|
Return values:
|
||
|
0 - nanosecond resolution supported
|
||
|
positive decimal - timestamp resolution in seconds
|
||
|
|
||
|
"""
|
||
|
# Note on Python's floating point timestamp support:
|
||
|
# Python interpreter versions prior to Python 2.3 did not support
|
||
|
# floating point timestamps. Versions 2.3 through 3.3 may or may not
|
||
|
# support it depending on the configuration (may be toggled by calling
|
||
|
# os.stat_float_times(True/False) at program startup, disabled by
|
||
|
# default prior to Python 2.5 and enabled by default since). Python 3.3
|
||
|
# deprecated this configuration and 3.4 removed support for it after
|
||
|
# which floating point timestamps are always supported.
|
||
|
ver = sys.version_info[0:2]
|
||
|
python_nanosecond_support = ver >= (3, 4) or (ver >= (2, 3) and
|
||
|
os.stat_float_times())
|
||
|
|
||
|
# Minimal expected floating point difference used to account for
|
||
|
# possible imprecise floating point number representations. We want
|
||
|
# this number to be small (at least smaller than 0.0001) but still
|
||
|
# large enough that we can be sure that increasing a floating point
|
||
|
# value by 2 * eta guarantees the value read back will be increased by
|
||
|
# at least eta.
|
||
|
eta = 0.00005
|
||
|
|
||
|
stats_orig = os.stat(path)
|
||
|
def test_time(diff):
|
||
|
"""Returns whether a timestamp difference is detectable."""
|
||
|
os.utime(path, (stats_orig.st_atime, stats_orig.st_mtime + diff))
|
||
|
return os.stat(path).st_mtime > stats_orig.st_mtime + eta
|
||
|
|
||
|
# Test for nanosecond timestamp resolution support.
|
||
|
if not minimum_resolution and python_nanosecond_support:
|
||
|
if test_time(2 * eta):
|
||
|
return 0
|
||
|
|
||
|
# Detect the filesystem timestamp resolution. Note that there is no
|
||
|
# need to make this code 'as fast as possible' as, this function gets
|
||
|
# called before having to sleep until the next detectable modification
|
||
|
# timestamp value and that, since we already know nanosecond resolution
|
||
|
# is not supported, will surely take longer than whatever we do here to
|
||
|
# detect this minimal detectable modification timestamp resolution.
|
||
|
step = 0.1
|
||
|
if not python_nanosecond_support:
|
||
|
# If Python does not support nanosecond timestamp resolution we
|
||
|
# know the minimum possible supported timestamp resolution is 1
|
||
|
# second.
|
||
|
minimum_resolution = max(1, minimum_resolution)
|
||
|
index = max(1, int(minimum_resolution / step))
|
||
|
while step * index < minimum_resolution:
|
||
|
# Floating point number representation errors may cause our
|
||
|
# initially calculated start index to be too small if calculated
|
||
|
# directly.
|
||
|
index += 1
|
||
|
while True:
|
||
|
# Do not simply add up the steps to avoid cumulative floating point
|
||
|
# number representation errors.
|
||
|
next = step * index
|
||
|
if next > 10:
|
||
|
raise TestEnvironmentError("File systems with too coarse "
|
||
|
"modification timestamp resolutions not supported.")
|
||
|
if test_time(next):
|
||
|
return next
|
||
|
index += 1
|
||
|
|
||
|
def __wait_for_time_change(self, path, touch, last_build_time):
|
||
|
"""
|
||
|
Wait until a newly assigned file system modification timestamp for
|
||
|
the given path is large enough for the timestamp difference between it
|
||
|
and the last build timestamp or the path's original file system
|
||
|
modification timestamp (depending on the last_build_time flag) to be
|
||
|
correctly recognized by both this Python based testing framework and
|
||
|
the Boost Jam executable being tested. May optionally touch the given
|
||
|
path to set its modification timestamp to the new value.
|
||
|
|
||
|
"""
|
||
|
assert self.last_build_timestamp or not last_build_time
|
||
|
stats_orig = os.stat(path)
|
||
|
|
||
|
if last_build_time:
|
||
|
start_time = self.last_build_timestamp
|
||
|
else:
|
||
|
start_time = stats_orig.st_mtime
|
||
|
|
||
|
build_resolution = self.__build_timestamp_resolution()
|
||
|
assert build_resolution >= 0
|
||
|
|
||
|
# Check whether the current timestamp is already new enough.
|
||
|
if stats_orig.st_mtime > start_time and (not build_resolution or
|
||
|
stats_orig.st_mtime >= start_time + build_resolution):
|
||
|
return
|
||
|
|
||
|
resolution = self.__python_timestamp_resolution(path, build_resolution)
|
||
|
assert resolution >= build_resolution
|
||
|
self.__wait_for_time_change_impl(path, start_time, resolution, build_resolution)
|
||
|
|
||
|
if not touch:
|
||
|
os.utime(path, (stats_orig.st_atime, stats_orig.st_mtime))
|
||
|
|
||
|
def __wait_for_time_change_impl(self, path, start_time, resolution, build_resolution):
|
||
|
# Implementation notes:
|
||
|
# * Theoretically time.sleep() API might get interrupted too soon
|
||
|
# (never actually encountered).
|
||
|
# * We encountered cases where we sleep just long enough for the
|
||
|
# filesystem's modifiction timestamp to change to the desired value,
|
||
|
# but after waking up, the read timestamp is still just a tiny bit
|
||
|
# too small (encountered on Windows). This is most likely caused by
|
||
|
# imprecise floating point timestamp & sleep interval representation
|
||
|
# used by Python. Note though that we never encountered a case where
|
||
|
# more than one additional tiny sleep() call was needed to remedy
|
||
|
# the situation.
|
||
|
# * We try to wait long enough for the timestamp to change, but do not
|
||
|
# want to waste processing time by waiting too long. The main
|
||
|
# problem is that when we have a coarse resolution, the actual times
|
||
|
# get rounded and we do not know the exact sleep time needed for the
|
||
|
# difference between two such times to pass. E.g. if we have a 1
|
||
|
# second resolution and the original and the current file timestamps
|
||
|
# are both 10 seconds then it could be that the current time is
|
||
|
# 10.99 seconds and that we can wait for just one hundredth of a
|
||
|
# second for the current file timestamp to reach its next value, and
|
||
|
# using a longer sleep interval than that would just be wasting
|
||
|
# time.
|
||
|
while True:
|
||
|
os.utime(path, None)
|
||
|
c = os.stat(path).st_mtime
|
||
|
if resolution:
|
||
|
if c > start_time and (not build_resolution or c >= start_time
|
||
|
+ build_resolution):
|
||
|
break
|
||
|
if c <= start_time - resolution:
|
||
|
# Move close to the desired timestamp in one sleep, but not
|
||
|
# close enough for timestamp rounding to potentially cause
|
||
|
# us to wait too long.
|
||
|
if start_time - c > 5:
|
||
|
if last_build_time:
|
||
|
error_message = ("Last build time recorded as "
|
||
|
"being a future event, causing a too long "
|
||
|
"wait period. Something must have played "
|
||
|
"around with the system clock.")
|
||
|
else:
|
||
|
error_message = ("Original path modification "
|
||
|
"timestamp set to far into the future or "
|
||
|
"something must have played around with the "
|
||
|
"system clock, causing a too long wait "
|
||
|
"period.\nPath: '%s'" % path)
|
||
|
raise TestEnvironmentError(message)
|
||
|
_sleep(start_time - c)
|
||
|
else:
|
||
|
# We are close to the desired timestamp so take baby sleeps
|
||
|
# to avoid sleeping too long.
|
||
|
_sleep(max(0.01, resolution / 10))
|
||
|
else:
|
||
|
if c > start_time:
|
||
|
break
|
||
|
_sleep(max(0.01, start_time - c))
|
||
|
|
||
|
|
||
|
class List:
|
||
|
def __init__(self, s=""):
|
||
|
elements = []
|
||
|
if isstr(s):
|
||
|
# Have to handle escaped spaces correctly.
|
||
|
elements = s.replace("\ ", "\001").split()
|
||
|
else:
|
||
|
elements = s
|
||
|
self.l = [e.replace("\001", " ") for e in elements]
|
||
|
|
||
|
def __len__(self):
|
||
|
return len(self.l)
|
||
|
|
||
|
def __getitem__(self, key):
|
||
|
return self.l[key]
|
||
|
|
||
|
def __setitem__(self, key, value):
|
||
|
self.l[key] = value
|
||
|
|
||
|
def __delitem__(self, key):
|
||
|
del self.l[key]
|
||
|
|
||
|
def __str__(self):
|
||
|
return str(self.l)
|
||
|
|
||
|
def __repr__(self):
|
||
|
return "%s.List(%r)" % (self.__module__, " ".join(self.l))
|
||
|
|
||
|
def __mul__(self, other):
|
||
|
result = List()
|
||
|
if not isinstance(other, List):
|
||
|
other = List(other)
|
||
|
for f in self:
|
||
|
for s in other:
|
||
|
result.l.append(f + s)
|
||
|
return result
|
||
|
|
||
|
def __rmul__(self, other):
|
||
|
if not isinstance(other, List):
|
||
|
other = List(other)
|
||
|
return List.__mul__(other, self)
|
||
|
|
||
|
def __add__(self, other):
|
||
|
result = List()
|
||
|
result.l = self.l[:] + other.l[:]
|
||
|
return result
|
||
|
|
||
|
|
||
|
def _contains_lines(data, lines):
|
||
|
data_line_count = len(data)
|
||
|
expected_line_count = reduce(lambda x, y: x + len(y), lines, 0)
|
||
|
index = 0
|
||
|
for expected in lines:
|
||
|
if expected_line_count > data_line_count - index:
|
||
|
return False
|
||
|
expected_line_count -= len(expected)
|
||
|
index = _match_line_sequence(data, index, data_line_count -
|
||
|
expected_line_count, expected)
|
||
|
if index < 0:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
|
||
|
def _match_line_sequence(data, start, end, lines):
|
||
|
if not lines:
|
||
|
return start
|
||
|
for index in range(start, end - len(lines) + 1):
|
||
|
data_index = index
|
||
|
for expected in lines:
|
||
|
if not fnmatch.fnmatch(data[data_index], expected):
|
||
|
break
|
||
|
data_index += 1
|
||
|
else:
|
||
|
return data_index
|
||
|
return -1
|
||
|
|
||
|
|
||
|
def _sleep(delay):
|
||
|
if delay > 5:
|
||
|
raise TestEnvironmentError("Test environment error: sleep period of "
|
||
|
"more than 5 seconds requested. Most likely caused by a file with "
|
||
|
"its modification timestamp set to sometime in the future.")
|
||
|
time.sleep(delay)
|
||
|
|
||
|
|
||
|
###############################################################################
|
||
|
#
|
||
|
# Initialization.
|
||
|
#
|
||
|
###############################################################################
|
||
|
|
||
|
# Make os.stat() return file modification times as floats instead of integers
|
||
|
# to get the best possible file timestamp resolution available. The exact
|
||
|
# resolution depends on the underlying file system and the Python os.stat()
|
||
|
# implementation. The better the resolution we achieve, the shorter we need to
|
||
|
# wait for files we create to start getting new timestamps.
|
||
|
#
|
||
|
# Additional notes:
|
||
|
# * os.stat_float_times() function first introduced in Python 2.3. and
|
||
|
# suggested for deprecation in Python 3.3.
|
||
|
# * On Python versions 2.5+ we do not need to do this as there os.stat()
|
||
|
# returns floating point file modification times by default.
|
||
|
# * Windows CPython implementations prior to version 2.5 do not support file
|
||
|
# modification timestamp resolutions of less than 1 second no matter whether
|
||
|
# these timestamps are returned as integer or floating point values.
|
||
|
# * Python documentation states that this should be set in a program's
|
||
|
# __main__ module to avoid affecting other libraries that might not be ready
|
||
|
# to support floating point timestamps. Since we use no such external
|
||
|
# libraries, we ignore this warning to make it easier to enable this feature
|
||
|
# in both our single & multiple-test scripts.
|
||
|
if (2, 3) <= sys.version_info < (2, 5) and not os.stat_float_times():
|
||
|
os.stat_float_times(True)
|
||
|
|
||
|
|
||
|
# Quickie tests. Should use doctest instead.
|
||
|
if __name__ == "__main__":
|
||
|
assert str(List("foo bar") * "/baz") == "['foo/baz', 'bar/baz']"
|
||
|
assert repr("foo/" * List("bar baz")) == "__main__.List('foo/bar foo/baz')"
|
||
|
|
||
|
assert _contains_lines([], [])
|
||
|
assert _contains_lines([], [[]])
|
||
|
assert _contains_lines([], [[], []])
|
||
|
assert _contains_lines([], [[], [], []])
|
||
|
assert not _contains_lines([], [[""]])
|
||
|
assert not _contains_lines([], [["a"]])
|
||
|
|
||
|
assert _contains_lines([""], [])
|
||
|
assert _contains_lines(["a"], [])
|
||
|
assert _contains_lines(["a", "b"], [])
|
||
|
assert _contains_lines(["a", "b"], [[], [], []])
|
||
|
|
||
|
assert _contains_lines([""], [[""]])
|
||
|
assert not _contains_lines([""], [["a"]])
|
||
|
assert not _contains_lines(["a"], [[""]])
|
||
|
assert _contains_lines(["a", "", "b", ""], [["a"]])
|
||
|
assert _contains_lines(["a", "", "b", ""], [[""]])
|
||
|
assert _contains_lines(["a", "", "b"], [["b"]])
|
||
|
assert not _contains_lines(["a", "b"], [[""]])
|
||
|
assert not _contains_lines(["a", "", "b", ""], [["c"]])
|
||
|
assert _contains_lines(["a", "", "b", "x"], [["x"]])
|
||
|
|
||
|
data = ["1", "2", "3", "4", "5", "6", "7", "8", "9"]
|
||
|
assert _contains_lines(data, [["1", "2"]])
|
||
|
assert not _contains_lines(data, [["2", "1"]])
|
||
|
assert not _contains_lines(data, [["1", "3"]])
|
||
|
assert not _contains_lines(data, [["1", "3"]])
|
||
|
assert _contains_lines(data, [["1"], ["2"]])
|
||
|
assert _contains_lines(data, [["1"], [], [], [], ["2"]])
|
||
|
assert _contains_lines(data, [["1"], ["3"]])
|
||
|
assert not _contains_lines(data, [["3"], ["1"]])
|
||
|
assert _contains_lines(data, [["3"], ["7"], ["8"]])
|
||
|
assert not _contains_lines(data, [["1"], ["3", "5"]])
|
||
|
assert not _contains_lines(data, [["1"], [""], ["5"]])
|
||
|
assert not _contains_lines(data, [["1"], ["5"], ["3"]])
|
||
|
assert not _contains_lines(data, [["1"], ["5", "3"]])
|
||
|
|
||
|
assert not _contains_lines(data, [[" 3"]])
|
||
|
assert not _contains_lines(data, [["3 "]])
|
||
|
assert not _contains_lines(data, [["3", ""]])
|
||
|
assert not _contains_lines(data, [["", "3"]])
|
||
|
|
||
|
print("tests passed")
|