1382 lines
		
	
	
		
			52 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			1382 lines
		
	
	
		
			52 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| # Copyright 2002-2005 Vladimir Prus.
 | |
| # Copyright 2002-2003 Dave Abrahams.
 | |
| # Copyright 2006 Rene Ferdinand Rivera Morell.
 | |
| # Distributed under the Boost Software License, Version 1.0.
 | |
| # (See accompanying file LICENSE.txt or copy at
 | |
| # https://www.bfgroup.xyz/b2/LICENSE.txt)
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| import TestCmd
 | |
| 
 | |
| import copy
 | |
| import fnmatch
 | |
| import glob
 | |
| import math
 | |
| import os
 | |
| import os.path
 | |
| import re
 | |
| import shutil
 | |
| try:
 | |
|     from StringIO import StringIO
 | |
| except:
 | |
|     from io import StringIO
 | |
| import subprocess
 | |
| import sys
 | |
| import tempfile
 | |
| import time
 | |
| import traceback
 | |
| import tree
 | |
| import types
 | |
| 
 | |
| from xml.sax.saxutils import escape
 | |
| 
 | |
| try:
 | |
|     from functools import reduce
 | |
| except:
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def isstr(data):
 | |
|     return isinstance(data, (type(''), type(u'')))
 | |
| 
 | |
| 
 | |
| class TestEnvironmentError(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| annotations = []
 | |
| 
 | |
| 
 | |
| def print_annotation(name, value, xml):
 | |
|     """Writes some named bits of information about the current test run."""
 | |
|     if xml:
 | |
|         print(escape(name) + " {{{")
 | |
|         print(escape(value))
 | |
|         print("}}}")
 | |
|     else:
 | |
|         print(name + " {{{")
 | |
|         print(value)
 | |
|         print("}}}")
 | |
| 
 | |
| 
 | |
| def flush_annotations(xml=0):
 | |
|     global annotations
 | |
|     for ann in annotations:
 | |
|         print_annotation(ann[0], ann[1], xml)
 | |
|     annotations = []
 | |
| 
 | |
| 
 | |
| def clear_annotations():
 | |
|     global annotations
 | |
|     annotations = []
 | |
| 
 | |
| 
 | |
| defer_annotations = 0
 | |
| 
 | |
| def set_defer_annotations(n):
 | |
|     global defer_annotations
 | |
|     defer_annotations = n
 | |
| 
 | |
| 
 | |
| def annotate_stack_trace(tb=None):
 | |
|     if tb:
 | |
|         trace = TestCmd.caller(traceback.extract_tb(tb), 0)
 | |
|     else:
 | |
|         trace = TestCmd.caller(traceback.extract_stack(), 1)
 | |
|     annotation("stacktrace", trace)
 | |
| 
 | |
| 
 | |
| def annotation(name, value):
 | |
|     """Records an annotation about the test run."""
 | |
|     annotations.append((name, value))
 | |
|     if not defer_annotations:
 | |
|         flush_annotations()
 | |
| 
 | |
| 
 | |
| def get_toolset():
 | |
|     toolset = None
 | |
|     for arg in sys.argv[1:]:
 | |
|         if not arg.startswith("-"):
 | |
|             toolset = arg
 | |
|     return toolset or "gcc"
 | |
| 
 | |
| 
 | |
| # Detect the host OS.
 | |
| cygwin = hasattr(os, "uname") and os.uname()[0].lower().startswith("cygwin")
 | |
| windows = cygwin or os.environ.get("OS", "").lower().startswith("windows")
 | |
| 
 | |
| if cygwin:
 | |
|     default_os = "cygwin"
 | |
| elif windows:
 | |
|     default_os = "windows"
 | |
| elif hasattr(os, "uname"):
 | |
|     default_os = os.uname()[0].lower()
 | |
| 
 | |
| 
 | |
| def expand_toolset(toolset, target_os=default_os):
 | |
|     match = re.match(r'^(clang|intel)(-[\d\.]+|)$', toolset)
 | |
|     if match:
 | |
|         if match.group(1) == "intel" and target_os == "windows":
 | |
|             return match.expand(r'\1-win\2')
 | |
|         elif target_os == "darwin":
 | |
|             return match.expand(r'\1-darwin\2')
 | |
|         else:
 | |
|             return match.expand(r'\1-linux\2')
 | |
| 
 | |
|     return toolset
 | |
| 
 | |
| 
 | |
| def prepare_prefixes_and_suffixes(toolset, target_os=default_os):
 | |
|     ind = toolset.find('-')
 | |
|     if ind == -1:
 | |
|         rtoolset = toolset
 | |
|     else:
 | |
|         rtoolset = toolset[:ind]
 | |
|     prepare_suffix_map(rtoolset, target_os)
 | |
|     prepare_library_prefix(rtoolset, target_os)
 | |
| 
 | |
| 
 | |
| def prepare_suffix_map(toolset, target_os=default_os):
 | |
|     """
 | |
|       Set up suffix translation performed by the Boost Build testing framework
 | |
|     to accommodate different toolsets generating targets of the same type using
 | |
|     different filename extensions (suffixes).
 | |
| 
 | |
|     """
 | |
|     global suffixes
 | |
|     suffixes = {}
 | |
|     if target_os == "cygwin":
 | |
|         suffixes[".lib"] = ".a"
 | |
|         suffixes[".obj"] = ".o"
 | |
|         suffixes[".implib"] = ".lib.a"
 | |
|     elif target_os == "windows":
 | |
|         if toolset == "gcc":
 | |
|             # MinGW
 | |
|             suffixes[".lib"] = ".a"
 | |
|             suffixes[".obj"] = ".o"
 | |
|             suffixes[".implib"] = ".dll.a"
 | |
|         else:
 | |
|             # Everything else Windows
 | |
|             suffixes[".implib"] = ".lib"
 | |
|     else:
 | |
|         suffixes[".exe"] = ""
 | |
|         suffixes[".dll"] = ".so"
 | |
|         suffixes[".lib"] = ".a"
 | |
|         suffixes[".obj"] = ".o"
 | |
|         suffixes[".implib"] = ".no_implib_files_on_this_platform"
 | |
| 
 | |
|         if target_os == "darwin":
 | |
|             suffixes[".dll"] = ".dylib"
 | |
| 
 | |
| 
 | |
| def prepare_library_prefix(toolset, target_os=default_os):
 | |
|     """
 | |
|       Setup whether Boost Build is expected to automatically prepend prefixes
 | |
|     to its built library targets.
 | |
| 
 | |
|     """
 | |
|     global lib_prefix
 | |
|     lib_prefix = "lib"
 | |
| 
 | |
|     global dll_prefix
 | |
|     if target_os == "cygwin":
 | |
|         dll_prefix = "cyg"
 | |
|     elif target_os == "windows" and toolset != "gcc":
 | |
|         dll_prefix = None
 | |
|     else:
 | |
|         dll_prefix = "lib"
 | |
| 
 | |
| 
 | |
| def re_remove(sequence, regex):
 | |
|     me = re.compile(regex)
 | |
|     result = list(filter(lambda x: me.match(x), sequence))
 | |
|     if not result:
 | |
|         raise ValueError()
 | |
|     for r in result:
 | |
|         sequence.remove(r)
 | |
| 
 | |
| 
 | |
| def glob_remove(sequence, pattern):
 | |
|     result = list(fnmatch.filter(sequence, pattern))
 | |
|     if not result:
 | |
|         raise ValueError()
 | |
|     for r in result:
 | |
|         sequence.remove(r)
 | |
| 
 | |
| 
 | |
| class Tester(TestCmd.TestCmd):
 | |
|     """Main tester class for Boost Build.
 | |
| 
 | |
|     Optional arguments:
 | |
| 
 | |
|     `arguments`                   - Arguments passed to the run executable.
 | |
|     `executable`                  - Name of the executable to invoke.
 | |
|     `match`                       - Function to use for compating actual and
 | |
|                                     expected file contents.
 | |
|     `boost_build_path`            - Boost build path to be passed to the run
 | |
|                                     executable.
 | |
|     `translate_suffixes`          - Whether to update suffixes on the the file
 | |
|                                     names passed from the test script so they
 | |
|                                     match those actually created by the current
 | |
|                                     toolset. For example, static library files
 | |
|                                     are specified by using the .lib suffix but
 | |
|                                     when the "gcc" toolset is used it actually
 | |
|                                     creates them using the .a suffix.
 | |
|     `pass_toolset`                - Whether the test system should pass the
 | |
|                                     specified toolset to the run executable.
 | |
|     `use_test_config`             - Whether the test system should tell the run
 | |
|                                     executable to read in the test_config.jam
 | |
|                                     configuration file.
 | |
|     `ignore_toolset_requirements` - Whether the test system should tell the run
 | |
|                                     executable to ignore toolset requirements.
 | |
|     `workdir`                     - Absolute directory where the test will be
 | |
|                                     run from.
 | |
|     `pass_d0`                     - If set, when tests are not explicitly run
 | |
|                                     in verbose mode, they are run as silent
 | |
|                                     (-d0 & --quiet Boost Jam options).
 | |
| 
 | |
|     Optional arguments inherited from the base class:
 | |
| 
 | |
|     `description`                 - Test description string displayed in case
 | |
|                                     of a failed test.
 | |
|     `subdir`                      - List of subdirectories to automatically
 | |
|                                     create under the working directory. Each
 | |
|                                     subdirectory needs to be specified
 | |
|                                     separately, parent coming before its child.
 | |
|     `verbose`                     - Flag that may be used to enable more
 | |
|                                     verbose test system output. Note that it
 | |
|                                     does not also enable more verbose build
 | |
|                                     system output like the --verbose command
 | |
|                                     line option does.
 | |
|     """
 | |
|     def __init__(self, arguments=None, executable=None,
 | |
|         match=TestCmd.match_exact, boost_build_path=None,
 | |
|         translate_suffixes=True, pass_toolset=True, use_test_config=True,
 | |
|         ignore_toolset_requirements=False, workdir="", pass_d0=False,
 | |
|         **keywords):
 | |
| 
 | |
|         if not executable:
 | |
|             executable = os.getenv('B2')
 | |
|         if not executable:
 | |
|             executable = 'b2'
 | |
| 
 | |
|         assert arguments.__class__ is not str
 | |
|         self.original_workdir = os.path.dirname(__file__)
 | |
|         if workdir and not os.path.isabs(workdir):
 | |
|             raise ("Parameter workdir <%s> must point to an absolute "
 | |
|                 "directory: " % workdir)
 | |
| 
 | |
|         self.last_build_timestamp = 0
 | |
|         self.translate_suffixes = translate_suffixes
 | |
|         self.use_test_config = use_test_config
 | |
| 
 | |
|         self.toolset = get_toolset()
 | |
|         self.expanded_toolset = expand_toolset(self.toolset)
 | |
|         self.pass_toolset = pass_toolset
 | |
|         self.ignore_toolset_requirements = ignore_toolset_requirements
 | |
| 
 | |
|         prepare_prefixes_and_suffixes(pass_toolset and self.toolset or "gcc")
 | |
| 
 | |
|         use_default_bjam = "--default-bjam" in sys.argv
 | |
| 
 | |
|         if not use_default_bjam:
 | |
|             jam_build_dir = ""
 | |
| 
 | |
|             # Find where jam_src is located. Try for the debug version if it is
 | |
|             # lying around.
 | |
|             srcdir = os.path.join(os.path.dirname(__file__), "..", "src")
 | |
|             dirs = [os.path.join(srcdir, "engine", jam_build_dir + ".debug"),
 | |
|                     os.path.join(srcdir, "engine", jam_build_dir)]
 | |
|             for d in dirs:
 | |
|                 if os.path.exists(d):
 | |
|                     jam_build_dir = d
 | |
|                     break
 | |
|             else:
 | |
|                 print("Cannot find built Boost.Jam")
 | |
|                 sys.exit(1)
 | |
| 
 | |
|         verbosity = ["-d0", "--quiet"]
 | |
|         if not pass_d0:
 | |
|             verbosity = []
 | |
|         if "--verbose" in sys.argv:
 | |
|             keywords["verbose"] = True
 | |
|             verbosity = ["-d2"]
 | |
|         self.verbosity = verbosity
 | |
| 
 | |
|         if boost_build_path is None:
 | |
|             boost_build_path = self.original_workdir + "/.."
 | |
| 
 | |
|         program_list = []
 | |
|         if use_default_bjam:
 | |
|             program_list.append(executable)
 | |
|         else:
 | |
|             program_list.append(os.path.join(jam_build_dir, executable))
 | |
|         program_list.append('-sBOOST_BUILD_PATH="' + boost_build_path + '"')
 | |
|         if arguments:
 | |
|             program_list += arguments
 | |
| 
 | |
|         TestCmd.TestCmd.__init__(self, program=program_list, match=match,
 | |
|             workdir=workdir, inpath=use_default_bjam, **keywords)
 | |
| 
 | |
|         os.chdir(self.workdir)
 | |
| 
 | |
|     def cleanup(self):
 | |
|         try:
 | |
|             TestCmd.TestCmd.cleanup(self)
 | |
|             os.chdir(self.original_workdir)
 | |
|         except AttributeError:
 | |
|             # When this is called during TestCmd.TestCmd.__del__ we can have
 | |
|             # both 'TestCmd' and 'os' unavailable in our scope. Do nothing in
 | |
|             # this case.
 | |
|             pass
 | |
| 
 | |
|     def set_toolset(self, toolset, target_os=default_os):
 | |
|         self.toolset = toolset
 | |
|         self.expanded_toolset = expand_toolset(toolset, target_os)
 | |
|         self.pass_toolset = True
 | |
|         prepare_prefixes_and_suffixes(toolset, target_os)
 | |
| 
 | |
| 
 | |
|     #
 | |
|     # Methods that change the working directory's content.
 | |
|     #
 | |
|     def set_tree(self, tree_location):
 | |
|         # It is not possible to remove the current directory.
 | |
|         d = os.getcwd()
 | |
|         os.chdir(os.path.dirname(self.workdir))
 | |
|         shutil.rmtree(self.workdir, ignore_errors=False)
 | |
| 
 | |
|         if not os.path.isabs(tree_location):
 | |
|             tree_location = os.path.join(self.original_workdir, tree_location)
 | |
|         shutil.copytree(tree_location, self.workdir)
 | |
| 
 | |
|         os.chdir(d)
 | |
|         def make_writable(unused, dir, entries):
 | |
|             for e in entries:
 | |
|                 name = os.path.join(dir, e)
 | |
|                 os.chmod(name, os.stat(name).st_mode | 0o222)
 | |
|         for root, _, files in os.walk("."):
 | |
|             make_writable(None, root, files)
 | |
| 
 | |
|     def write(self, file, content, wait=True):
 | |
|         nfile = self.native_file_name(file)
 | |
|         self.__makedirs(os.path.dirname(nfile), wait)
 | |
|         if not type(content) == bytes:
 | |
|             content = content.encode()
 | |
|         f = open(nfile, "wb")
 | |
|         try:
 | |
|             f.write(content)
 | |
|         finally:
 | |
|             f.close()
 | |
|         self.__ensure_newer_than_last_build(nfile)
 | |
| 
 | |
|     def rename(self, src, dst):
 | |
|         src_name = self.native_file_name(src)
 | |
|         dst_name = self.native_file_name(dst)
 | |
|         os.rename(src_name, dst_name)
 | |
| 
 | |
|     def copy(self, src, dst):
 | |
|         try:
 | |
|             self.write(dst, self.read(src, binary=True))
 | |
|         except:
 | |
|             self.fail_test(1)
 | |
| 
 | |
|     def copy_timestamp(self, src, dst):
 | |
|         src_name = self.native_file_name(src)
 | |
|         dst_name = self.native_file_name(dst)
 | |
|         shutil.copystat(src_name, dst_name)
 | |
| 
 | |
|     def copy_preserving_timestamp(self, src, dst):
 | |
|         src_name = self.native_file_name(src)
 | |
|         dst_name = self.native_file_name(dst)
 | |
|         shutil.copy2(src_name, dst_name)
 | |
| 
 | |
|     def touch(self, names, wait=True):
 | |
|         if isstr(names):
 | |
|             names = [names]
 | |
|         for name in names:
 | |
|             path = self.native_file_name(name)
 | |
|             if wait:
 | |
|                 self.__ensure_newer_than_last_build(path)
 | |
|             else:
 | |
|                 os.utime(path, None)
 | |
| 
 | |
|     def rm(self, names):
 | |
|         if not type(names) == list:
 | |
|             names = [names]
 | |
| 
 | |
|         if names == ["."]:
 | |
|             # If we are deleting the entire workspace, there is no need to wait
 | |
|             # for a clock tick.
 | |
|             self.last_build_timestamp = 0
 | |
| 
 | |
|         # Avoid attempts to remove the current directory.
 | |
|         os.chdir(self.original_workdir)
 | |
|         for name in names:
 | |
|             n = glob.glob(self.native_file_name(name))
 | |
|             if n: n = n[0]
 | |
|             if not n:
 | |
|                 n = self.glob_file(name.replace("$toolset", self.expanded_toolset + "*")
 | |
|                     )
 | |
|             if n:
 | |
|                 if os.path.isdir(n):
 | |
|                     shutil.rmtree(n, ignore_errors=False)
 | |
|                 else:
 | |
|                     os.unlink(n)
 | |
| 
 | |
|         # Create working dir root again in case we removed it.
 | |
|         if not os.path.exists(self.workdir):
 | |
|             os.mkdir(self.workdir)
 | |
|         os.chdir(self.workdir)
 | |
| 
 | |
|     def expand_toolset(self, name):
 | |
|         """
 | |
|           Expands $toolset placeholder in the given file to the name of the
 | |
|         toolset currently being tested.
 | |
| 
 | |
|         """
 | |
|         self.write(name, self.read(name).replace("$toolset", self.expanded_toolset))
 | |
| 
 | |
|     def dump_stdio(self):
 | |
|         annotation("STDOUT", self.stdout())
 | |
|         annotation("STDERR", self.stderr())
 | |
| 
 | |
|     def run_build_system(self, extra_args=None, subdir="", stdout=None,
 | |
|         stderr="", status=0, match=None, pass_toolset=None,
 | |
|         use_test_config=None, ignore_toolset_requirements=None,
 | |
|         expected_duration=None, **kw):
 | |
| 
 | |
|         assert extra_args.__class__ is not str
 | |
| 
 | |
|         if os.path.isabs(subdir):
 | |
|             raise ValueError(
 | |
|                 "You must pass a relative directory to subdir <%s>." % subdir)
 | |
| 
 | |
|         self.previous_tree, dummy = tree.build_tree(self.workdir)
 | |
|         self.wait_for_time_change_since_last_build()
 | |
| 
 | |
|         if match is None:
 | |
|             match = self.match
 | |
| 
 | |
|         if pass_toolset is None:
 | |
|             pass_toolset = self.pass_toolset
 | |
| 
 | |
|         if use_test_config is None:
 | |
|             use_test_config = self.use_test_config
 | |
| 
 | |
|         if ignore_toolset_requirements is None:
 | |
|             ignore_toolset_requirements = self.ignore_toolset_requirements
 | |
| 
 | |
|         try:
 | |
|             kw["program"] = []
 | |
|             kw["program"] += self.program
 | |
|             if extra_args:
 | |
|                 kw["program"] += extra_args
 | |
|             if not extra_args or not any(a.startswith("-j") for a in extra_args):
 | |
|                 kw["program"] += ["-j1"]
 | |
|             if stdout is None and not any(a.startswith("-d") for a in kw["program"]):
 | |
|                 kw["program"] += self.verbosity
 | |
|             if pass_toolset:
 | |
|                 kw["program"].append("toolset=" + self.toolset)
 | |
|             if use_test_config:
 | |
|                 kw["program"].append('--test-config="%s"' % os.path.join(
 | |
|                     self.original_workdir, "test-config.jam"))
 | |
|             if ignore_toolset_requirements:
 | |
|                 kw["program"].append("--ignore-toolset-requirements")
 | |
|             if "--python" in sys.argv:
 | |
|                 # -z disables Python optimization mode.
 | |
|                 # this enables type checking (all assert
 | |
|                 # and if __debug__ statements).
 | |
|                 kw["program"].extend(["--python", "-z"])
 | |
|             if "--stacktrace" in sys.argv:
 | |
|                 kw["program"].append("--stacktrace")
 | |
|             kw["chdir"] = subdir
 | |
|             self.last_program_invocation = kw["program"]
 | |
|             build_time_start = time.time()
 | |
|             TestCmd.TestCmd.run(self, **kw)
 | |
|             build_time_finish = time.time()
 | |
|         except:
 | |
|             self.dump_stdio()
 | |
|             raise
 | |
| 
 | |
|         old_last_build_timestamp = self.last_build_timestamp
 | |
|         self.tree, self.last_build_timestamp = tree.build_tree(self.workdir)
 | |
|         self.difference = tree.tree_difference(self.previous_tree, self.tree)
 | |
|         if self.difference.empty():
 | |
|             # If nothing has been changed by this build and sufficient time has
 | |
|             # passed since the last build that actually changed something,
 | |
|             # there is no need to wait for touched or newly created files to
 | |
|             # start getting newer timestamps than the currently existing ones.
 | |
|             self.last_build_timestamp = old_last_build_timestamp
 | |
| 
 | |
|         self.difference.ignore_directories()
 | |
|         self.unexpected_difference = copy.deepcopy(self.difference)
 | |
| 
 | |
|         if (status and self.status) is not None and self.status != status:
 | |
|             expect = ""
 | |
|             if status != 0:
 | |
|                 expect = " (expected %d)" % status
 | |
| 
 | |
|             annotation("failure", '"%s" returned %d%s' % (kw["program"],
 | |
|                 self.status, expect))
 | |
| 
 | |
|             annotation("reason", "unexpected status returned by bjam")
 | |
|             self.fail_test(1)
 | |
| 
 | |
|         if stdout is not None and not match(self.stdout(), stdout):
 | |
|             stdout_test = match(self.stdout(), stdout)
 | |
|             annotation("failure", "Unexpected stdout")
 | |
|             annotation("Expected STDOUT", stdout)
 | |
|             annotation("Actual STDOUT", self.stdout())
 | |
|             stderr = self.stderr()
 | |
|             if stderr:
 | |
|                 annotation("STDERR", stderr)
 | |
|             self.maybe_do_diff(self.stdout(), stdout, stdout_test)
 | |
|             self.fail_test(1, dump_stdio=False)
 | |
| 
 | |
|         # Intel tends to produce some messages to stderr which make tests fail.
 | |
|         intel_workaround = re.compile("^xi(link|lib): executing.*\n", re.M)
 | |
|         actual_stderr = re.sub(intel_workaround, "", self.stderr())
 | |
| 
 | |
|         if stderr is not None and not match(actual_stderr, stderr):
 | |
|             stderr_test = match(actual_stderr, stderr)
 | |
|             annotation("failure", "Unexpected stderr")
 | |
|             annotation("Expected STDERR", stderr)
 | |
|             annotation("Actual STDERR", self.stderr())
 | |
|             annotation("STDOUT", self.stdout())
 | |
|             self.maybe_do_diff(actual_stderr, stderr, stderr_test)
 | |
|             self.fail_test(1, dump_stdio=False)
 | |
| 
 | |
|         if expected_duration is not None:
 | |
|             actual_duration = build_time_finish - build_time_start
 | |
|             if actual_duration > expected_duration:
 | |
|                 print("Test run lasted %f seconds while it was expected to "
 | |
|                     "finish in under %f seconds." % (actual_duration,
 | |
|                     expected_duration))
 | |
|                 self.fail_test(1, dump_stdio=False)
 | |
| 
 | |
|         self.__ignore_junk()
 | |
| 
 | |
|     def glob_file(self, name):
 | |
|         name = self.adjust_name(name)
 | |
|         result = None
 | |
|         if hasattr(self, "difference"):
 | |
|             for f in (self.difference.added_files +
 | |
|                 self.difference.modified_files +
 | |
|                 self.difference.touched_files):
 | |
|                 if fnmatch.fnmatch(f, name):
 | |
|                     result = self.__native_file_name(f)
 | |
|                     break
 | |
|         if not result:
 | |
|             result = glob.glob(self.__native_file_name(name))
 | |
|             if result:
 | |
|                 result = result[0]
 | |
|         return result
 | |
| 
 | |
|     def __read(self, name, binary=False):
 | |
|         try:
 | |
|             openMode = "r"
 | |
|             if binary:
 | |
|                 openMode += "b"
 | |
|             else:
 | |
|                 openMode += "U"
 | |
|             f = open(name, openMode)
 | |
|             result = f.read()
 | |
|             f.close()
 | |
|             return result
 | |
|         except:
 | |
|             annotation("failure", "Could not open '%s'" % name)
 | |
|             self.fail_test(1)
 | |
|             return ""
 | |
| 
 | |
|     def read(self, name, binary=False):
 | |
|         name = self.glob_file(name)
 | |
|         return self.__read(name, binary=binary)
 | |
| 
 | |
|     def read_and_strip(self, name):
 | |
|         if not self.glob_file(name):
 | |
|             return ""
 | |
|         f = open(self.glob_file(name), "rb")
 | |
|         lines = f.readlines()
 | |
|         f.close()
 | |
|         result = "\n".join(x.decode().rstrip() for x in lines)
 | |
|         if lines and lines[-1][-1] != "\n":
 | |
|             return result + "\n"
 | |
|         return result
 | |
| 
 | |
|     def fail_test(self, condition, dump_difference=True, dump_stdio=True,
 | |
|         dump_stack=True):
 | |
|         if not condition:
 | |
|             return
 | |
| 
 | |
|         if dump_difference and hasattr(self, "difference"):
 | |
|             f = StringIO()
 | |
|             self.difference.pprint(f)
 | |
|             annotation("changes caused by the last build command",
 | |
|                 f.getvalue())
 | |
| 
 | |
|         if dump_stdio:
 | |
|             self.dump_stdio()
 | |
| 
 | |
|         if "--preserve" in sys.argv:
 | |
|             print()
 | |
|             print("*** Copying the state of working dir into 'failed_test' ***")
 | |
|             print()
 | |
|             path = os.path.join(self.original_workdir, "failed_test")
 | |
|             if os.path.isdir(path):
 | |
|                 shutil.rmtree(path, ignore_errors=False)
 | |
|             elif os.path.exists(path):
 | |
|                 raise "Path " + path + " already exists and is not a directory"
 | |
|             shutil.copytree(self.workdir, path)
 | |
|             print("The failed command was:")
 | |
|             print(" ".join(self.last_program_invocation))
 | |
| 
 | |
|         if dump_stack:
 | |
|             annotate_stack_trace()
 | |
|         sys.exit(1)
 | |
| 
 | |
|     # A number of methods below check expectations with actual difference
 | |
|     # between directory trees before and after a build. All the 'expect*'
 | |
|     # methods require exact names to be passed. All the 'ignore*' methods allow
 | |
|     # wildcards.
 | |
| 
 | |
|     # All names can be either a string or a list of strings.
 | |
|     def expect_addition(self, names):
 | |
|         for name in self.adjust_names(names):
 | |
|             try:
 | |
|                 glob_remove(self.unexpected_difference.added_files, name)
 | |
|             except:
 | |
|                 annotation("failure", "File %s not added as expected" % name)
 | |
|                 self.fail_test(1)
 | |
| 
 | |
|     def ignore_addition(self, wildcard):
 | |
|         self.__ignore_elements(self.unexpected_difference.added_files,
 | |
|             wildcard)
 | |
| 
 | |
|     def expect_removal(self, names):
 | |
|         for name in self.adjust_names(names):
 | |
|             try:
 | |
|                 glob_remove(self.unexpected_difference.removed_files, name)
 | |
|             except:
 | |
|                 annotation("failure", "File %s not removed as expected" % name)
 | |
|                 self.fail_test(1)
 | |
| 
 | |
|     def ignore_removal(self, wildcard):
 | |
|         self.__ignore_elements(self.unexpected_difference.removed_files,
 | |
|             wildcard)
 | |
| 
 | |
|     def expect_modification(self, names):
 | |
|         for name in self.adjust_names(names):
 | |
|             try:
 | |
|                 glob_remove(self.unexpected_difference.modified_files, name)
 | |
|             except:
 | |
|                 annotation("failure", "File %s not modified as expected" %
 | |
|                     name)
 | |
|                 self.fail_test(1)
 | |
| 
 | |
|     def ignore_modification(self, wildcard):
 | |
|         self.__ignore_elements(self.unexpected_difference.modified_files,
 | |
|             wildcard)
 | |
| 
 | |
|     def expect_touch(self, names):
 | |
|         d = self.unexpected_difference
 | |
|         for name in self.adjust_names(names):
 | |
|             # We need to check both touched and modified files. The reason is
 | |
|             # that:
 | |
|             #   (1) Windows binaries such as obj, exe or dll files have slight
 | |
|             #       differences even with identical inputs due to Windows PE
 | |
|             #       format headers containing an internal timestamp.
 | |
|             #   (2) Intel's compiler for Linux has the same behaviour.
 | |
|             filesets = [d.modified_files, d.touched_files]
 | |
| 
 | |
|             while filesets:
 | |
|                 try:
 | |
|                     glob_remove(filesets[-1], name)
 | |
|                     break
 | |
|                 except ValueError:
 | |
|                     filesets.pop()
 | |
| 
 | |
|             if not filesets:
 | |
|                 annotation("failure", "File %s not touched as expected" % name)
 | |
|                 self.fail_test(1)
 | |
| 
 | |
|     def ignore_touch(self, wildcard):
 | |
|         self.__ignore_elements(self.unexpected_difference.touched_files,
 | |
|             wildcard)
 | |
| 
 | |
|     def ignore(self, wildcard):
 | |
|         self.ignore_addition(wildcard)
 | |
|         self.ignore_removal(wildcard)
 | |
|         self.ignore_modification(wildcard)
 | |
|         self.ignore_touch(wildcard)
 | |
| 
 | |
|     def expect_nothing(self, names):
 | |
|         for name in self.adjust_names(names):
 | |
|             if name in self.difference.added_files:
 | |
|                 annotation("failure",
 | |
|                     "File %s added, but no action was expected" % name)
 | |
|                 self.fail_test(1)
 | |
|             if name in self.difference.removed_files:
 | |
|                 annotation("failure",
 | |
|                     "File %s removed, but no action was expected" % name)
 | |
|                 self.fail_test(1)
 | |
|                 pass
 | |
|             if name in self.difference.modified_files:
 | |
|                 annotation("failure",
 | |
|                     "File %s modified, but no action was expected" % name)
 | |
|                 self.fail_test(1)
 | |
|             if name in self.difference.touched_files:
 | |
|                 annotation("failure",
 | |
|                     "File %s touched, but no action was expected" % name)
 | |
|                 self.fail_test(1)
 | |
| 
 | |
|     def __ignore_junk(self):
 | |
|         # Not totally sure about this change, but I do not see a good
 | |
|         # alternative.
 | |
|         if windows:
 | |
|             self.ignore("*.ilk")       # MSVC incremental linking files.
 | |
|             self.ignore("*.pdb")       # MSVC program database files.
 | |
|             self.ignore("*.rsp")       # Response files.
 | |
|             self.ignore("*.tds")       # Borland debug symbols.
 | |
|             self.ignore("*.manifest")  # MSVC DLL manifests.
 | |
|             self.ignore("bin/standalone/msvc/*/msvc-setup.bat")
 | |
| 
 | |
|         # Debug builds of bjam built with gcc produce this profiling data.
 | |
|         self.ignore("gmon.out")
 | |
|         self.ignore("*/gmon.out")
 | |
| 
 | |
|         # Boost Build's 'configure' functionality (unfinished at the time)
 | |
|         # produces this file.
 | |
|         self.ignore("bin/config.log")
 | |
|         self.ignore("bin/project-cache.jam")
 | |
| 
 | |
|         # Compiled Python files created when running Python based Boost Build.
 | |
|         self.ignore("*.pyc")
 | |
| 
 | |
|         # OSX/Darwin files and dirs.
 | |
|         self.ignore("*.dSYM/*")
 | |
| 
 | |
|     def expect_nothing_more(self):
 | |
|         if not self.unexpected_difference.empty():
 | |
|             annotation("failure", "Unexpected changes found")
 | |
|             output = StringIO()
 | |
|             self.unexpected_difference.pprint(output)
 | |
|             annotation("unexpected changes", output.getvalue())
 | |
|             self.fail_test(1)
 | |
| 
 | |
|     def expect_output_lines(self, lines, expected=True):
 | |
|         self.__expect_lines(self.stdout(), lines, expected)
 | |
| 
 | |
|     def expect_content_lines(self, filename, line, expected=True):
 | |
|         self.__expect_lines(self.read_and_strip(filename), line, expected)
 | |
| 
 | |
|     def expect_content(self, name, content, exact=False):
 | |
|         actual = self.read(name)
 | |
|         content = content.replace("$toolset", self.expanded_toolset + "*")
 | |
| 
 | |
|         matched = False
 | |
|         if exact:
 | |
|             matched = fnmatch.fnmatch(actual, content)
 | |
|         else:
 | |
|             def sorted_(z):
 | |
|                 z.sort(key=lambda x: x.lower().replace("\\", "/"))
 | |
|                 return z
 | |
|             actual_ = list(map(lambda x: sorted_(x.split()), actual.splitlines()))
 | |
|             content_ = list(map(lambda x: sorted_(x.split()), content.splitlines()))
 | |
|             if len(actual_) == len(content_):
 | |
|                 matched = map(
 | |
|                     lambda x, y: map(lambda n, p: fnmatch.fnmatch(n, p), x, y),
 | |
|                     actual_, content_)
 | |
|                 matched = reduce(
 | |
|                     lambda x, y: x and reduce(
 | |
|                         lambda a, b: a and b,
 | |
|                         y, True),
 | |
|                     matched, True)
 | |
| 
 | |
|         if not matched:
 | |
|             print("Expected:\n")
 | |
|             print(content)
 | |
|             print("Got:\n")
 | |
|             print(actual)
 | |
|             self.fail_test(1)
 | |
| 
 | |
|     def maybe_do_diff(self, actual, expected, result=None):
 | |
|         if os.environ.get("DO_DIFF"):
 | |
|             e = tempfile.mktemp("expected")
 | |
|             a = tempfile.mktemp("actual")
 | |
|             f = open(e, "w")
 | |
|             f.write(expected)
 | |
|             f.close()
 | |
|             f = open(a, "w")
 | |
|             f.write(actual)
 | |
|             f.close()
 | |
|             print("DIFFERENCE")
 | |
|             # Current diff should return 1 to indicate 'different input files'
 | |
|             # but some older diff versions may return 0 and depending on the
 | |
|             # exact Python/OS platform version, os.system() call may gobble up
 | |
|             # the external process's return code and return 0 itself.
 | |
|             if os.system('diff -u "%s" "%s"' % (e, a)) not in [0, 1]:
 | |
|                 print('Unable to compute difference: diff -u "%s" "%s"' % (e, a
 | |
|                     ))
 | |
|             os.unlink(e)
 | |
|             os.unlink(a)
 | |
|         elif type(result) is TestCmd.MatchError:
 | |
|             print(result.message)
 | |
|         else:
 | |
|             print("Set environmental variable 'DO_DIFF' to examine the "
 | |
|                 "difference.")
 | |
| 
 | |
|     # Internal methods.
 | |
|     def adjust_lib_name(self, name):
 | |
|         global lib_prefix
 | |
|         global dll_prefix
 | |
|         result = name
 | |
| 
 | |
|         pos = name.rfind(".")
 | |
|         if pos != -1:
 | |
|             suffix = name[pos:]
 | |
|             if suffix == ".lib":
 | |
|                 (head, tail) = os.path.split(name)
 | |
|                 if lib_prefix:
 | |
|                     tail = lib_prefix + tail
 | |
|                     result = os.path.join(head, tail)
 | |
|             elif suffix == ".dll" or suffix == ".implib":
 | |
|                 (head, tail) = os.path.split(name)
 | |
|                 if dll_prefix:
 | |
|                     tail = dll_prefix + tail
 | |
|                     result = os.path.join(head, tail)
 | |
|         # If we want to use this name in a Jamfile, we better convert \ to /,
 | |
|         # as otherwise we would have to quote \.
 | |
|         result = result.replace("\\", "/")
 | |
|         return result
 | |
| 
 | |
|     def adjust_suffix(self, name):
 | |
|         if not self.translate_suffixes:
 | |
|             return name
 | |
|         pos = name.rfind(".")
 | |
|         if pos == -1:
 | |
|             return name
 | |
|         suffix = name[pos:]
 | |
|         return name[:pos] + suffixes.get(suffix, suffix)
 | |
| 
 | |
|     # Acceps either a string or a list of strings and returns a list of
 | |
|     # strings. Adjusts suffixes on all names.
 | |
|     def adjust_names(self, names):
 | |
|         if isstr(names):
 | |
|             names = [names]
 | |
|         r = map(self.adjust_lib_name, names)
 | |
|         r = map(self.adjust_suffix, r)
 | |
|         r = map(lambda x, t=self.expanded_toolset: x.replace("$toolset", t + "*"), r)
 | |
|         return list(r)
 | |
| 
 | |
|     def adjust_name(self, name):
 | |
|         return self.adjust_names(name)[0]
 | |
| 
 | |
|     def __native_file_name(self, name):
 | |
|         return os.path.normpath(os.path.join(self.workdir, *name.split("/")))
 | |
| 
 | |
|     def native_file_name(self, name):
 | |
|         return self.__native_file_name(self.adjust_name(name))
 | |
| 
 | |
|     def wait_for_time_change(self, path, touch):
 | |
|         """
 | |
|           Wait for newly assigned file system modification timestamps for the
 | |
|         given path to become large enough for the timestamp difference to be
 | |
|         correctly recognized by both this Python based testing framework and
 | |
|         the Boost Jam executable being tested. May optionally touch the given
 | |
|         path to set its modification timestamp to the new value.
 | |
| 
 | |
|         """
 | |
|         self.__wait_for_time_change(path, touch, last_build_time=False)
 | |
| 
 | |
|     def wait_for_time_change_since_last_build(self):
 | |
|         """
 | |
|           Wait for newly assigned file system modification timestamps to
 | |
|         become large enough for the timestamp difference to be
 | |
|         correctly recognized by the Python based testing framework.
 | |
|         Does not care about Jam's timestamp resolution, since we
 | |
|         only need this to detect touched files.
 | |
|         """
 | |
|         if self.last_build_timestamp:
 | |
|             timestamp_file = "timestamp-3df2f2317e15e4a9"
 | |
|             open(timestamp_file, "wb").close()
 | |
|             self.__wait_for_time_change_impl(timestamp_file,
 | |
|                 self.last_build_timestamp,
 | |
|                 self.__python_timestamp_resolution(timestamp_file, 0), 0)
 | |
|             os.unlink(timestamp_file)
 | |
| 
 | |
|     def __build_timestamp_resolution(self):
 | |
|         """
 | |
|           Returns the minimum path modification timestamp resolution supported
 | |
|         by the used Boost Jam executable.
 | |
| 
 | |
|         """
 | |
|         dir = tempfile.mkdtemp("bjam_version_info")
 | |
|         try:
 | |
|             jam_script = "timestamp_resolution.jam"
 | |
|             f = open(os.path.join(dir, jam_script), "w")
 | |
|             try:
 | |
|                 f.write("EXIT $(JAM_TIMESTAMP_RESOLUTION) : 0 ;")
 | |
|             finally:
 | |
|                 f.close()
 | |
|             p = subprocess.Popen([self.program[0], "-d0", "-f%s" % jam_script],
 | |
|                 stdout=subprocess.PIPE, cwd=dir, universal_newlines=True)
 | |
|             out, err = p.communicate()
 | |
|         finally:
 | |
|             shutil.rmtree(dir, ignore_errors=False)
 | |
| 
 | |
|         if p.returncode != 0:
 | |
|             raise TestEnvironmentError("Unexpected return code (%s) when "
 | |
|                 "detecting Boost Jam's minimum supported path modification "
 | |
|                 "timestamp resolution version information." % p.returncode)
 | |
|         if err:
 | |
|             raise TestEnvironmentError("Unexpected error output (%s) when "
 | |
|                 "detecting Boost Jam's minimum supported path modification "
 | |
|                 "timestamp resolution version information." % err)
 | |
| 
 | |
|         r = re.match("([0-9]{2}):([0-9]{2}):([0-9]{2}\\.[0-9]{9})$", out)
 | |
|         if not r:
 | |
|             # Older Boost Jam versions did not report their minimum supported
 | |
|             # path modification timestamp resolution and did not actually
 | |
|             # support path modification timestamp resolutions finer than 1
 | |
|             # second.
 | |
|             # TODO: Phase this support out to avoid such fallback code from
 | |
|             # possibly covering up other problems.
 | |
|             return 1
 | |
|         if r.group(1) != "00" or r.group(2) != "00":  # hours, minutes
 | |
|             raise TestEnvironmentError("Boost Jam with too coarse minimum "
 | |
|                 "supported path modification timestamp resolution (%s:%s:%s)."
 | |
|                 % (r.group(1), r.group(2), r.group(3)))
 | |
|         return float(r.group(3))  # seconds.nanoseconds
 | |
| 
 | |
|     def __ensure_newer_than_last_build(self, path):
 | |
|         """
 | |
|           Updates the given path's modification timestamp after waiting for the
 | |
|         newly assigned file system modification timestamp to become large
 | |
|         enough for the timestamp difference between it and the last build
 | |
|         timestamp to be correctly recognized by both this Python based testing
 | |
|         framework and the Boost Jam executable being tested. Does nothing if
 | |
|         there is no 'last build' information available.
 | |
| 
 | |
|         """
 | |
|         if self.last_build_timestamp:
 | |
|             self.__wait_for_time_change(path, touch=True, last_build_time=True)
 | |
| 
 | |
|     def __expect_lines(self, data, lines, expected):
 | |
|         """
 | |
|           Checks whether the given data contains the given lines.
 | |
| 
 | |
|           Data may be specified as a single string containing text lines
 | |
|         separated by newline characters.
 | |
| 
 | |
|           Lines may be specified in any of the following forms:
 | |
|             * Single string containing text lines separated by newlines - the
 | |
|               given lines are searched for in the given data without any extra
 | |
|               data lines between them.
 | |
|             * Container of strings containing text lines separated by newlines
 | |
|               - the given lines are searched for in the given data with extra
 | |
|               data lines allowed between lines belonging to different strings.
 | |
|             * Container of strings containing text lines separated by newlines
 | |
|               and containers containing strings - the same as above with the
 | |
|               internal containers containing strings being interpreted as if
 | |
|               all their content was joined together into a single string
 | |
|               separated by newlines.
 | |
| 
 | |
|           A newline at the end of any multi-line lines string is interpreted as
 | |
|         an expected extra trailig empty line.
 | |
|         """
 | |
|         # str.splitlines() trims at most one trailing newline while we want the
 | |
|         # trailing newline to indicate that there should be an extra empty line
 | |
|         # at the end.
 | |
|         def splitlines(x):
 | |
|             return (x + "\n").splitlines()
 | |
| 
 | |
|         if data is None:
 | |
|             data = []
 | |
|         elif isstr(data):
 | |
|             data = splitlines(data)
 | |
| 
 | |
|         if isstr(lines):
 | |
|             lines = [splitlines(lines)]
 | |
|         else:
 | |
|             expanded = []
 | |
|             for x in lines:
 | |
|                 if isstr(x):
 | |
|                     x = splitlines(x)
 | |
|                 expanded.append(x)
 | |
|             lines = expanded
 | |
| 
 | |
|         if _contains_lines(data, lines) != bool(expected):
 | |
|             output = []
 | |
|             if expected:
 | |
|                 output = ["Did not find expected lines:"]
 | |
|             else:
 | |
|                 output = ["Found unexpected lines:"]
 | |
|             first = True
 | |
|             for line_sequence in lines:
 | |
|                 if line_sequence:
 | |
|                     if first:
 | |
|                         first = False
 | |
|                     else:
 | |
|                         output.append("...")
 | |
|                     output.extend("  > " + line for line in line_sequence)
 | |
|             output.append("in output:")
 | |
|             output.extend("  > " + line for line in data)
 | |
|             annotation("failure", "\n".join(output))
 | |
|             self.fail_test(1)
 | |
| 
 | |
|     def __ignore_elements(self, things, wildcard):
 | |
|         """Removes in-place 'things' elements matching the given 'wildcard'."""
 | |
|         things[:] = list(filter(lambda x: not fnmatch.fnmatch(x, wildcard), things))
 | |
| 
 | |
|     def __makedirs(self, path, wait):
 | |
|         """
 | |
|           Creates a folder with the given path, together with any missing
 | |
|         parent folders. If WAIT is set, makes sure any newly created folders
 | |
|         have modification timestamps newer than the ones left behind by the
 | |
|         last build run.
 | |
| 
 | |
|         """
 | |
|         try:
 | |
|             if wait:
 | |
|                 stack = []
 | |
|                 while path and path not in stack and not os.path.isdir(path):
 | |
|                     stack.append(path)
 | |
|                     path = os.path.dirname(path)
 | |
|                 while stack:
 | |
|                     path = stack.pop()
 | |
|                     os.mkdir(path)
 | |
|                     self.__ensure_newer_than_last_build(path)
 | |
|             else:
 | |
|                 os.makedirs(path)
 | |
|         except Exception:
 | |
|             pass
 | |
| 
 | |
|     def __python_timestamp_resolution(self, path, minimum_resolution):
 | |
|         """
 | |
|           Returns the modification timestamp resolution for the given path
 | |
|         supported by the used Python interpreter/OS/filesystem combination.
 | |
|         Will not check for resolutions less than the given minimum value. Will
 | |
|         change the path's modification timestamp in the process.
 | |
| 
 | |
|           Return values:
 | |
|             0                - nanosecond resolution supported
 | |
|             positive decimal - timestamp resolution in seconds
 | |
| 
 | |
|         """
 | |
|         # Note on Python's floating point timestamp support:
 | |
|         #   Python interpreter versions prior to Python 2.3 did not support
 | |
|         # floating point timestamps. Versions 2.3 through 3.3 may or may not
 | |
|         # support it depending on the configuration (may be toggled by calling
 | |
|         # os.stat_float_times(True/False) at program startup, disabled by
 | |
|         # default prior to Python 2.5 and enabled by default since). Python 3.3
 | |
|         # deprecated this configuration and 3.4 removed support for it after
 | |
|         # which floating point timestamps are always supported.
 | |
|         ver = sys.version_info[0:2]
 | |
|         python_nanosecond_support = ver >= (3, 4) or (ver >= (2, 3) and
 | |
|             os.stat_float_times())
 | |
| 
 | |
|         # Minimal expected floating point difference used to account for
 | |
|         # possible imprecise floating point number representations. We want
 | |
|         # this number to be small (at least smaller than 0.0001) but still
 | |
|         # large enough that we can be sure that increasing a floating point
 | |
|         # value by 2 * eta guarantees the value read back will be increased by
 | |
|         # at least eta.
 | |
|         eta = 0.00005
 | |
| 
 | |
|         stats_orig = os.stat(path)
 | |
|         def test_time(diff):
 | |
|             """Returns whether a timestamp difference is detectable."""
 | |
|             os.utime(path, (stats_orig.st_atime, stats_orig.st_mtime + diff))
 | |
|             return os.stat(path).st_mtime > stats_orig.st_mtime + eta
 | |
| 
 | |
|         # Test for nanosecond timestamp resolution support.
 | |
|         if not minimum_resolution and python_nanosecond_support:
 | |
|             if test_time(2 * eta):
 | |
|                 return 0
 | |
| 
 | |
|         # Detect the filesystem timestamp resolution. Note that there is no
 | |
|         # need to make this code 'as fast as possible' as, this function gets
 | |
|         # called before having to sleep until the next detectable modification
 | |
|         # timestamp value and that, since we already know nanosecond resolution
 | |
|         # is not supported, will surely take longer than whatever we do here to
 | |
|         # detect this minimal detectable modification timestamp resolution.
 | |
|         step = 0.1
 | |
|         if not python_nanosecond_support:
 | |
|             # If Python does not support nanosecond timestamp resolution we
 | |
|             # know the minimum possible supported timestamp resolution is 1
 | |
|             # second.
 | |
|             minimum_resolution = max(1, minimum_resolution)
 | |
|         index = max(1, int(minimum_resolution / step))
 | |
|         while step * index < minimum_resolution:
 | |
|             # Floating point number representation errors may cause our
 | |
|             # initially calculated start index to be too small if calculated
 | |
|             # directly.
 | |
|             index += 1
 | |
|         while True:
 | |
|             # Do not simply add up the steps to avoid cumulative floating point
 | |
|             # number representation errors.
 | |
|             next = step * index
 | |
|             if next > 10:
 | |
|                 raise TestEnvironmentError("File systems with too coarse "
 | |
|                     "modification timestamp resolutions not supported.")
 | |
|             if test_time(next):
 | |
|                 return next
 | |
|             index += 1
 | |
| 
 | |
|     def __wait_for_time_change(self, path, touch, last_build_time):
 | |
|         """
 | |
|           Wait until a newly assigned file system modification timestamp for
 | |
|         the given path is large enough for the timestamp difference between it
 | |
|         and the last build timestamp or the path's original file system
 | |
|         modification timestamp (depending on the last_build_time flag) to be
 | |
|         correctly recognized by both this Python based testing framework and
 | |
|         the Boost Jam executable being tested. May optionally touch the given
 | |
|         path to set its modification timestamp to the new value.
 | |
| 
 | |
|         """
 | |
|         assert self.last_build_timestamp or not last_build_time
 | |
|         stats_orig = os.stat(path)
 | |
| 
 | |
|         if last_build_time:
 | |
|             start_time = self.last_build_timestamp
 | |
|         else:
 | |
|             start_time = stats_orig.st_mtime
 | |
| 
 | |
|         build_resolution = self.__build_timestamp_resolution()
 | |
|         assert build_resolution >= 0
 | |
| 
 | |
|         # Check whether the current timestamp is already new enough.
 | |
|         if stats_orig.st_mtime > start_time and (not build_resolution or
 | |
|             stats_orig.st_mtime >= start_time + build_resolution):
 | |
|             return
 | |
| 
 | |
|         resolution = self.__python_timestamp_resolution(path, build_resolution)
 | |
|         assert resolution >= build_resolution
 | |
|         self.__wait_for_time_change_impl(path, start_time, resolution, build_resolution)
 | |
| 
 | |
|         if not touch:
 | |
|             os.utime(path, (stats_orig.st_atime, stats_orig.st_mtime))
 | |
| 
 | |
|     def __wait_for_time_change_impl(self, path, start_time, resolution, build_resolution):
 | |
|         # Implementation notes:
 | |
|         #  * Theoretically time.sleep() API might get interrupted too soon
 | |
|         #    (never actually encountered).
 | |
|         #  * We encountered cases where we sleep just long enough for the
 | |
|         #    filesystem's modifiction timestamp to change to the desired value,
 | |
|         #    but after waking up, the read timestamp is still just a tiny bit
 | |
|         #    too small (encountered on Windows). This is most likely caused by
 | |
|         #    imprecise floating point timestamp & sleep interval representation
 | |
|         #    used by Python. Note though that we never encountered a case where
 | |
|         #    more than one additional tiny sleep() call was needed to remedy
 | |
|         #    the situation.
 | |
|         #  * We try to wait long enough for the timestamp to change, but do not
 | |
|         #    want to waste processing time by waiting too long. The main
 | |
|         #    problem is that when we have a coarse resolution, the actual times
 | |
|         #    get rounded and we do not know the exact sleep time needed for the
 | |
|         #    difference between two such times to pass. E.g. if we have a 1
 | |
|         #    second resolution and the original and the current file timestamps
 | |
|         #    are both 10 seconds then it could be that the current time is
 | |
|         #    10.99 seconds and that we can wait for just one hundredth of a
 | |
|         #    second for the current file timestamp to reach its next value, and
 | |
|         #    using a longer sleep interval than that would just be wasting
 | |
|         #    time.
 | |
|         while True:
 | |
|             os.utime(path, None)
 | |
|             c = os.stat(path).st_mtime
 | |
|             if resolution:
 | |
|                 if c > start_time and (not build_resolution or c >= start_time
 | |
|                     + build_resolution):
 | |
|                     break
 | |
|                 if c <= start_time - resolution:
 | |
|                     # Move close to the desired timestamp in one sleep, but not
 | |
|                     # close enough for timestamp rounding to potentially cause
 | |
|                     # us to wait too long.
 | |
|                     if start_time - c > 5:
 | |
|                         if last_build_time:
 | |
|                             error_message = ("Last build time recorded as "
 | |
|                                 "being a future event, causing a too long "
 | |
|                                 "wait period. Something must have played "
 | |
|                                 "around with the system clock.")
 | |
|                         else:
 | |
|                             error_message = ("Original path modification "
 | |
|                                 "timestamp set to far into the future or "
 | |
|                                 "something must have played around with the "
 | |
|                                 "system clock, causing a too long wait "
 | |
|                                 "period.\nPath: '%s'" % path)
 | |
|                         raise TestEnvironmentError(message)
 | |
|                     _sleep(start_time - c)
 | |
|                 else:
 | |
|                     # We are close to the desired timestamp so take baby sleeps
 | |
|                     # to avoid sleeping too long.
 | |
|                     _sleep(max(0.01, resolution / 10))
 | |
|             else:
 | |
|                 if c > start_time:
 | |
|                     break
 | |
|                 _sleep(max(0.01, start_time - c))
 | |
| 
 | |
| 
 | |
| class List:
 | |
|     def __init__(self, s=""):
 | |
|         elements = []
 | |
|         if isstr(s):
 | |
|             # Have to handle escaped spaces correctly.
 | |
|             elements = s.replace("\ ", "\001").split()
 | |
|         else:
 | |
|             elements = s
 | |
|         self.l = [e.replace("\001", " ") for e in elements]
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self.l)
 | |
| 
 | |
|     def __getitem__(self, key):
 | |
|         return self.l[key]
 | |
| 
 | |
|     def __setitem__(self, key, value):
 | |
|         self.l[key] = value
 | |
| 
 | |
|     def __delitem__(self, key):
 | |
|         del self.l[key]
 | |
| 
 | |
|     def __str__(self):
 | |
|         return str(self.l)
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "%s.List(%r)" % (self.__module__, " ".join(self.l))
 | |
| 
 | |
|     def __mul__(self, other):
 | |
|         result = List()
 | |
|         if not isinstance(other, List):
 | |
|             other = List(other)
 | |
|         for f in self:
 | |
|             for s in other:
 | |
|                 result.l.append(f + s)
 | |
|         return result
 | |
| 
 | |
|     def __rmul__(self, other):
 | |
|         if not isinstance(other, List):
 | |
|             other = List(other)
 | |
|         return List.__mul__(other, self)
 | |
| 
 | |
|     def __add__(self, other):
 | |
|         result = List()
 | |
|         result.l = self.l[:] + other.l[:]
 | |
|         return result
 | |
| 
 | |
| 
 | |
| def _contains_lines(data, lines):
 | |
|     data_line_count = len(data)
 | |
|     expected_line_count = reduce(lambda x, y: x + len(y), lines, 0)
 | |
|     index = 0
 | |
|     for expected in lines:
 | |
|         if expected_line_count > data_line_count - index:
 | |
|             return False
 | |
|         expected_line_count -= len(expected)
 | |
|         index = _match_line_sequence(data, index, data_line_count -
 | |
|             expected_line_count, expected)
 | |
|         if index < 0:
 | |
|             return False
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def _match_line_sequence(data, start, end, lines):
 | |
|     if not lines:
 | |
|         return start
 | |
|     for index in range(start, end - len(lines) + 1):
 | |
|         data_index = index
 | |
|         for expected in lines:
 | |
|             if not fnmatch.fnmatch(data[data_index], expected):
 | |
|                 break
 | |
|             data_index += 1
 | |
|         else:
 | |
|             return data_index
 | |
|     return -1
 | |
| 
 | |
| 
 | |
| def _sleep(delay):
 | |
|     if delay > 5:
 | |
|         raise TestEnvironmentError("Test environment error: sleep period of "
 | |
|             "more than 5 seconds requested. Most likely caused by a file with "
 | |
|             "its modification timestamp set to sometime in the future.")
 | |
|     time.sleep(delay)
 | |
| 
 | |
| 
 | |
| ###############################################################################
 | |
| #
 | |
| # Initialization.
 | |
| #
 | |
| ###############################################################################
 | |
| 
 | |
| # Make os.stat() return file modification times as floats instead of integers
 | |
| # to get the best possible file timestamp resolution available. The exact
 | |
| # resolution depends on the underlying file system and the Python os.stat()
 | |
| # implementation. The better the resolution we achieve, the shorter we need to
 | |
| # wait for files we create to start getting new timestamps.
 | |
| #
 | |
| # Additional notes:
 | |
| #  * os.stat_float_times() function first introduced in Python 2.3. and
 | |
| #    suggested for deprecation in Python 3.3.
 | |
| #  * On Python versions 2.5+ we do not need to do this as there os.stat()
 | |
| #    returns floating point file modification times by default.
 | |
| #  * Windows CPython implementations prior to version 2.5 do not support file
 | |
| #    modification timestamp resolutions of less than 1 second no matter whether
 | |
| #    these timestamps are returned as integer or floating point values.
 | |
| #  * Python documentation states that this should be set in a program's
 | |
| #    __main__ module to avoid affecting other libraries that might not be ready
 | |
| #    to support floating point timestamps. Since we use no such external
 | |
| #    libraries, we ignore this warning to make it easier to enable this feature
 | |
| #    in both our single & multiple-test scripts.
 | |
| if (2, 3) <= sys.version_info < (2, 5) and not os.stat_float_times():
 | |
|     os.stat_float_times(True)
 | |
| 
 | |
| 
 | |
| # Quickie tests. Should use doctest instead.
 | |
| if __name__ == "__main__":
 | |
|     assert str(List("foo bar") * "/baz") == "['foo/baz', 'bar/baz']"
 | |
|     assert repr("foo/" * List("bar baz")) == "__main__.List('foo/bar foo/baz')"
 | |
| 
 | |
|     assert _contains_lines([], [])
 | |
|     assert _contains_lines([], [[]])
 | |
|     assert _contains_lines([], [[], []])
 | |
|     assert _contains_lines([], [[], [], []])
 | |
|     assert not _contains_lines([], [[""]])
 | |
|     assert not _contains_lines([], [["a"]])
 | |
| 
 | |
|     assert _contains_lines([""], [])
 | |
|     assert _contains_lines(["a"], [])
 | |
|     assert _contains_lines(["a", "b"], [])
 | |
|     assert _contains_lines(["a", "b"], [[], [], []])
 | |
| 
 | |
|     assert _contains_lines([""], [[""]])
 | |
|     assert not _contains_lines([""], [["a"]])
 | |
|     assert not _contains_lines(["a"], [[""]])
 | |
|     assert _contains_lines(["a", "", "b", ""], [["a"]])
 | |
|     assert _contains_lines(["a", "", "b", ""], [[""]])
 | |
|     assert _contains_lines(["a", "", "b"], [["b"]])
 | |
|     assert not _contains_lines(["a", "b"], [[""]])
 | |
|     assert not _contains_lines(["a", "", "b", ""], [["c"]])
 | |
|     assert _contains_lines(["a", "", "b", "x"], [["x"]])
 | |
| 
 | |
|     data = ["1", "2", "3", "4", "5", "6", "7", "8", "9"]
 | |
|     assert _contains_lines(data, [["1", "2"]])
 | |
|     assert not _contains_lines(data, [["2", "1"]])
 | |
|     assert not _contains_lines(data, [["1", "3"]])
 | |
|     assert not _contains_lines(data, [["1", "3"]])
 | |
|     assert _contains_lines(data, [["1"], ["2"]])
 | |
|     assert _contains_lines(data, [["1"], [], [], [], ["2"]])
 | |
|     assert _contains_lines(data, [["1"], ["3"]])
 | |
|     assert not _contains_lines(data, [["3"], ["1"]])
 | |
|     assert _contains_lines(data, [["3"], ["7"], ["8"]])
 | |
|     assert not _contains_lines(data, [["1"], ["3", "5"]])
 | |
|     assert not _contains_lines(data, [["1"], [""], ["5"]])
 | |
|     assert not _contains_lines(data, [["1"], ["5"], ["3"]])
 | |
|     assert not _contains_lines(data, [["1"], ["5", "3"]])
 | |
| 
 | |
|     assert not _contains_lines(data, [[" 3"]])
 | |
|     assert not _contains_lines(data, [["3 "]])
 | |
|     assert not _contains_lines(data, [["3", ""]])
 | |
|     assert not _contains_lines(data, [["", "3"]])
 | |
| 
 | |
|     print("tests passed")
 |