610 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			610 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | """
 | ||
|  | TestCmd.py:  a testing framework for commands and scripts. | ||
|  | 
 | ||
|  | The TestCmd module provides a framework for portable automated testing of | ||
|  | executable commands and scripts (in any language, not just Python), especially | ||
|  | commands and scripts that require file system interaction. | ||
|  | 
 | ||
|  | In addition to running tests and evaluating conditions, the TestCmd module | ||
|  | manages and cleans up one or more temporary workspace directories, and provides | ||
|  | methods for creating files and directories in those workspace directories from | ||
|  | in-line data, here-documents), allowing tests to be completely self-contained. | ||
|  | 
 | ||
|  | A TestCmd environment object is created via the usual invocation: | ||
|  | 
 | ||
|  |     test = TestCmd() | ||
|  | 
 | ||
|  | The TestCmd module provides pass_test(), fail_test(), and no_result() unbound | ||
|  | methods that report test results for use with the Aegis change management | ||
|  | system. These methods terminate the test immediately, reporting PASSED, FAILED | ||
|  | or NO RESULT respectively and exiting with status 0 (success), 1 or 2 | ||
|  | respectively. This allows for a distinction between an actual failed test and a | ||
|  | test that could not be properly evaluated because of an external condition (such | ||
|  | as a full file system or incorrect permissions). | ||
|  | 
 | ||
|  | """
 | ||
|  | 
 | ||
|  | # Copyright 2000 Steven Knight | ||
|  | # This module is free software, and you may redistribute it and/or modify | ||
|  | # it under the same terms as Python itself, so long as this copyright message | ||
|  | # and disclaimer are retained in their original form. | ||
|  | # | ||
|  | # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, | ||
|  | # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF | ||
|  | # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH | ||
|  | # DAMAGE. | ||
|  | # | ||
|  | # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT | ||
|  | # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A | ||
|  | # PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, | ||
|  | # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, | ||
|  | # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. | ||
|  | 
 | ||
|  | # Copyright 2002-2003 Vladimir Prus. | ||
|  | # Copyright 2002-2003 Dave Abrahams. | ||
|  | # Copyright 2006 Rene Rivera. | ||
|  | # Distributed under the Boost Software License, Version 1.0. | ||
|  | #    (See accompanying file LICENSE.txt or copy at | ||
|  | #         https://www.bfgroup.xyz/b2/LICENSE.txt) | ||
|  | 
 | ||
|  | from __future__ import print_function | ||
|  | 
 | ||
|  | __author__ = "Steven Knight <knight@baldmt.com>" | ||
|  | __revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software" | ||
|  | __version__ = "0.02" | ||
|  | 
 | ||
|  | from types import * | ||
|  | 
 | ||
|  | import os | ||
|  | import os.path | ||
|  | import re | ||
|  | import shutil | ||
|  | import stat | ||
|  | import subprocess | ||
|  | import sys | ||
|  | import tempfile | ||
|  | import traceback | ||
|  | 
 | ||
|  | 
 | ||
|  | tempfile.template = 'testcmd.' | ||
|  | 
 | ||
|  | _Cleanup = [] | ||
|  | 
 | ||
|  | def _clean(): | ||
|  |     global _Cleanup | ||
|  |     list = _Cleanup[:] | ||
|  |     _Cleanup = [] | ||
|  |     list.reverse() | ||
|  |     for test in list: | ||
|  |         test.cleanup() | ||
|  | 
 | ||
|  | sys.exitfunc = _clean | ||
|  | 
 | ||
|  | 
 | ||
|  | def caller(tblist, skip): | ||
|  |     string = "" | ||
|  |     arr = [] | ||
|  |     for file, line, name, text in tblist: | ||
|  |         if file[-10:] == "TestCmd.py": | ||
|  |                 break | ||
|  |         arr = [(file, line, name, text)] + arr | ||
|  |     atfrom = "at" | ||
|  |     for file, line, name, text in arr[skip:]: | ||
|  |         if name == "?": | ||
|  |             name = "" | ||
|  |         else: | ||
|  |             name = " (" + name + ")" | ||
|  |         string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) | ||
|  |         atfrom = "\tfrom" | ||
|  |     return string | ||
|  | 
 | ||
|  | 
 | ||
|  | def fail_test(self=None, condition=True, function=None, skip=0): | ||
|  |     """Cause the test to fail.
 | ||
|  | 
 | ||
|  |       By default, the fail_test() method reports that the test FAILED and exits | ||
|  |     with a status of 1. If a condition argument is supplied, the test fails | ||
|  |     only if the condition is true. | ||
|  | 
 | ||
|  |     """
 | ||
|  |     if not condition: | ||
|  |         return | ||
|  |     if not function is None: | ||
|  |         function() | ||
|  |     of = "" | ||
|  |     desc = "" | ||
|  |     sep = " " | ||
|  |     if not self is None: | ||
|  |         if self.program: | ||
|  |             of = " of " + " ".join(self.program) | ||
|  |             sep = "\n\t" | ||
|  |         if self.description: | ||
|  |             desc = " [" + self.description + "]" | ||
|  |             sep = "\n\t" | ||
|  | 
 | ||
|  |     at = caller(traceback.extract_stack(), skip) | ||
|  | 
 | ||
|  |     sys.stderr.write("FAILED test" + of + desc + sep + at + """
 | ||
|  | in directory: """ + os.getcwd() )
 | ||
|  |     sys.exit(1) | ||
|  | 
 | ||
|  | 
 | ||
|  | def no_result(self=None, condition=True, function=None, skip=0): | ||
|  |     """Causes a test to exit with no valid result.
 | ||
|  | 
 | ||
|  |       By default, the no_result() method reports NO RESULT for the test and | ||
|  |     exits with a status of 2. If a condition argument is supplied, the test | ||
|  |     fails only if the condition is true. | ||
|  | 
 | ||
|  |     """
 | ||
|  |     if not condition: | ||
|  |         return | ||
|  |     if not function is None: | ||
|  |         function() | ||
|  |     of = "" | ||
|  |     desc = "" | ||
|  |     sep = " " | ||
|  |     if not self is None: | ||
|  |         if self.program: | ||
|  |             of = " of " + self.program | ||
|  |             sep = "\n\t" | ||
|  |         if self.description: | ||
|  |             desc = " [" + self.description + "]" | ||
|  |             sep = "\n\t" | ||
|  | 
 | ||
|  |     at = caller(traceback.extract_stack(), skip) | ||
|  |     sys.stderr.write("NO RESULT for test" + of + desc + sep + at) | ||
|  |     sys.exit(2) | ||
|  | 
 | ||
|  | 
 | ||
|  | def pass_test(self=None, condition=True, function=None): | ||
|  |     """Causes a test to pass.
 | ||
|  | 
 | ||
|  |       By default, the pass_test() method reports PASSED for the test and exits | ||
|  |     with a status of 0. If a condition argument is supplied, the test passes | ||
|  |     only if the condition is true. | ||
|  | 
 | ||
|  |     """
 | ||
|  |     if not condition: | ||
|  |         return | ||
|  |     if not function is None: | ||
|  |         function() | ||
|  |     sys.stderr.write("PASSED\n") | ||
|  |     sys.exit(0) | ||
|  | 
 | ||
|  | class MatchError(object): | ||
|  |     def __init__(self, message): | ||
|  |         self.message = message | ||
|  |     def __nonzero__(self): | ||
|  |         return False | ||
|  |     def __bool__(self): | ||
|  |         return False | ||
|  | 
 | ||
|  | def match_exact(lines=None, matches=None): | ||
|  |     """
 | ||
|  |       Returns whether the given lists or strings containing lines separated | ||
|  |     using newline characters contain exactly the same data. | ||
|  | 
 | ||
|  |     """
 | ||
|  |     if not type(lines) is list: | ||
|  |         lines = lines.split("\n") | ||
|  |     if not type(matches) is list: | ||
|  |         matches = matches.split("\n") | ||
|  |     if len(lines) != len(matches): | ||
|  |         return | ||
|  |     for i in range(len(lines)): | ||
|  |         if lines[i] != matches[i]: | ||
|  |             return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % | ||
|  |                 (i+1, matches[i], lines[i])) | ||
|  |     if len(lines) < len(matches): | ||
|  |         return MatchError("Missing lines at line %d\n- %s" % | ||
|  |             (len(lines), "\n- ".join(matches[len(lines):]))) | ||
|  |     if len(lines) > len(matches): | ||
|  |         return MatchError("Extra lines at line %d\n+ %s" % | ||
|  |             (len(matches), "\n+ ".join(lines[len(matches):]))) | ||
|  |     return 1 | ||
|  | 
 | ||
|  | 
 | ||
|  | def match_re(lines=None, res=None): | ||
|  |     """
 | ||
|  |       Given lists or strings contain lines separated using newline characters. | ||
|  |     This function matches those lines one by one, interpreting the lines in the | ||
|  |     res parameter as regular expressions. | ||
|  | 
 | ||
|  |     """
 | ||
|  |     if not type(lines) is list: | ||
|  |         lines = lines.split("\n") | ||
|  |     if not type(res) is list: | ||
|  |         res = res.split("\n") | ||
|  |     for i in range(min(len(lines), len(res))): | ||
|  |         if not re.compile("^" + res[i] + "$").search(lines[i]): | ||
|  |             return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % | ||
|  |                 (i+1, res[i], lines[i])) | ||
|  |     if len(lines) < len(res): | ||
|  |         return MatchError("Missing lines at line %d\n- %s" % | ||
|  |             (len(lines), "\n- ".join(res[len(lines):]))) | ||
|  |     if len(lines) > len(res): | ||
|  |         return MatchError("Extra lines at line %d\n+ %s" % | ||
|  |             (len(res), "\n+ ".join(lines[len(res):]))) | ||
|  |     return 1 | ||
|  | 
 | ||
|  | 
 | ||
|  | class TestCmd: | ||
|  |     def __init__(self, description=None, program=None, workdir=None, | ||
|  |         subdir=None, verbose=False, match=None, inpath=None): | ||
|  | 
 | ||
|  |         self._cwd = os.getcwd() | ||
|  |         self.description_set(description) | ||
|  |         self.program_set(program, inpath) | ||
|  |         self.verbose_set(verbose) | ||
|  |         if match is None: | ||
|  |             self.match_func = match_re | ||
|  |         else: | ||
|  |             self.match_func = match | ||
|  |         self._dirlist = [] | ||
|  |         self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} | ||
|  |         env = os.environ.get('PRESERVE') | ||
|  |         if env: | ||
|  |             self._preserve['pass_test'] = env | ||
|  |             self._preserve['fail_test'] = env | ||
|  |             self._preserve['no_result'] = env | ||
|  |         else: | ||
|  |             env = os.environ.get('PRESERVE_PASS') | ||
|  |             if env is not None: | ||
|  |                 self._preserve['pass_test'] = env | ||
|  |             env = os.environ.get('PRESERVE_FAIL') | ||
|  |             if env is not None: | ||
|  |                 self._preserve['fail_test'] = env | ||
|  |             env = os.environ.get('PRESERVE_PASS') | ||
|  |             if env is not None: | ||
|  |                 self._preserve['PRESERVE_NO_RESULT'] = env | ||
|  |         self._stdout = [] | ||
|  |         self._stderr = [] | ||
|  |         self.status = None | ||
|  |         self.condition = 'no_result' | ||
|  |         self.workdir_set(workdir) | ||
|  |         self.subdir(subdir) | ||
|  | 
 | ||
|  |     def __del__(self): | ||
|  |         self.cleanup() | ||
|  | 
 | ||
|  |     def __repr__(self): | ||
|  |         return "%x" % id(self) | ||
|  | 
 | ||
|  |     def cleanup(self, condition=None): | ||
|  |         """
 | ||
|  |           Removes any temporary working directories for the specified TestCmd | ||
|  |         environment. If the environment variable PRESERVE was set when the | ||
|  |         TestCmd environment was created, temporary working directories are not | ||
|  |         removed. If any of the environment variables PRESERVE_PASS, | ||
|  |         PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd | ||
|  |         environment was created, then temporary working directories are not | ||
|  |         removed if the test passed, failed or had no result, respectively. | ||
|  |         Temporary working directories are also preserved for conditions | ||
|  |         specified via the preserve method. | ||
|  | 
 | ||
|  |           Typically, this method is not called directly, but is used when the | ||
|  |         script exits to clean up temporary working directories as appropriate | ||
|  |         for the exit status. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if not self._dirlist: | ||
|  |             return | ||
|  |         if condition is None: | ||
|  |             condition = self.condition | ||
|  |         if self._preserve[condition]: | ||
|  |             for dir in self._dirlist: | ||
|  |                 print("Preserved directory %s" % dir) | ||
|  |         else: | ||
|  |             list = self._dirlist[:] | ||
|  |             list.reverse() | ||
|  |             for dir in list: | ||
|  |                 self.writable(dir, 1) | ||
|  |                 shutil.rmtree(dir, ignore_errors=1) | ||
|  | 
 | ||
|  |         self._dirlist = [] | ||
|  |         self.workdir = None | ||
|  |         os.chdir(self._cwd) | ||
|  |         try: | ||
|  |             global _Cleanup | ||
|  |             _Cleanup.remove(self) | ||
|  |         except (AttributeError, ValueError): | ||
|  |             pass | ||
|  | 
 | ||
|  |     def description_set(self, description): | ||
|  |         """Set the description of the functionality being tested.""" | ||
|  |         self.description = description | ||
|  | 
 | ||
|  |     def fail_test(self, condition=True, function=None, skip=0): | ||
|  |         """Cause the test to fail.""" | ||
|  |         if not condition: | ||
|  |             return | ||
|  |         self.condition = 'fail_test' | ||
|  |         fail_test(self = self, | ||
|  |                   condition = condition, | ||
|  |                   function = function, | ||
|  |                   skip = skip) | ||
|  | 
 | ||
|  |     def match(self, lines, matches): | ||
|  |         """Compare actual and expected file contents.""" | ||
|  |         return self.match_func(lines, matches) | ||
|  | 
 | ||
|  |     def match_exact(self, lines, matches): | ||
|  |         """Compare actual and expected file content exactly.""" | ||
|  |         return match_exact(lines, matches) | ||
|  | 
 | ||
|  |     def match_re(self, lines, res): | ||
|  |         """Compare file content with a regular expression.""" | ||
|  |         return match_re(lines, res) | ||
|  | 
 | ||
|  |     def no_result(self, condition=True, function=None, skip=0): | ||
|  |         """Report that the test could not be run.""" | ||
|  |         if not condition: | ||
|  |             return | ||
|  |         self.condition = 'no_result' | ||
|  |         no_result(self = self, | ||
|  |                   condition = condition, | ||
|  |                   function = function, | ||
|  |                   skip = skip) | ||
|  | 
 | ||
|  |     def pass_test(self, condition=True, function=None): | ||
|  |         """Cause the test to pass.""" | ||
|  |         if not condition: | ||
|  |             return | ||
|  |         self.condition = 'pass_test' | ||
|  |         pass_test(self, condition, function) | ||
|  | 
 | ||
|  |     def preserve(self, *conditions): | ||
|  |         """
 | ||
|  |           Arrange for the temporary working directories for the specified | ||
|  |         TestCmd environment to be preserved for one or more conditions. If no | ||
|  |         conditions are specified, arranges for the temporary working | ||
|  |         directories to be preserved for all conditions. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if conditions == (): | ||
|  |             conditions = ('pass_test', 'fail_test', 'no_result') | ||
|  |         for cond in conditions: | ||
|  |             self._preserve[cond] = 1 | ||
|  | 
 | ||
|  |     def program_set(self, program, inpath): | ||
|  |         """Set the executable program or script to be tested.""" | ||
|  |         if not inpath and program and not os.path.isabs(program[0]): | ||
|  |             program[0] = os.path.join(self._cwd, program[0]) | ||
|  |         self.program = program | ||
|  | 
 | ||
|  |     def read(self, file, mode='rb'): | ||
|  |         """
 | ||
|  |           Reads and returns the contents of the specified file name. The file | ||
|  |         name may be a list, in which case the elements are concatenated with | ||
|  |         the os.path.join() method. The file is assumed to be under the | ||
|  |         temporary working directory unless it is an absolute path name. The I/O | ||
|  |         mode for the file may be specified and must begin with an 'r'. The | ||
|  |         default is 'rb' (binary read). | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if type(file) is list: | ||
|  |             file = os.path.join(*file) | ||
|  |         if not os.path.isabs(file): | ||
|  |             file = os.path.join(self.workdir, file) | ||
|  |         if mode[0] != 'r': | ||
|  |             raise ValueError("mode must begin with 'r'") | ||
|  |         return open(file, mode).read() | ||
|  | 
 | ||
|  |     def run(self, program=None, arguments=None, chdir=None, stdin=None, | ||
|  |         universal_newlines=True): | ||
|  |         """
 | ||
|  |           Runs a test of the program or script for the test environment. | ||
|  |         Standard output and error output are saved for future retrieval via the | ||
|  |         stdout() and stderr() methods. | ||
|  | 
 | ||
|  |           'universal_newlines' parameter controls how the child process | ||
|  |         input/output streams are opened as defined for the same named Python | ||
|  |         subprocess.POpen constructor parameter. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if chdir: | ||
|  |             if not os.path.isabs(chdir): | ||
|  |                 chdir = os.path.join(self.workpath(chdir)) | ||
|  |             if self.verbose: | ||
|  |                 sys.stderr.write("chdir(" + chdir + ")\n") | ||
|  |         else: | ||
|  |             chdir = self.workdir | ||
|  | 
 | ||
|  |         cmd = [] | ||
|  |         if program and program[0]: | ||
|  |             if program[0] != self.program[0] and not os.path.isabs(program[0]): | ||
|  |                 program[0] = os.path.join(self._cwd, program[0]) | ||
|  |             cmd += program | ||
|  |         else: | ||
|  |             cmd += self.program | ||
|  |         if arguments: | ||
|  |             cmd += arguments.split(" ") | ||
|  |         if self.verbose: | ||
|  |             sys.stderr.write("run(" + " ".join(cmd) + ")\n") | ||
|  |         p = subprocess.Popen(cmd, stdin=subprocess.PIPE, | ||
|  |             stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir, | ||
|  |             universal_newlines=universal_newlines) | ||
|  | 
 | ||
|  |         if stdin: | ||
|  |             if type(stdin) is list: | ||
|  |                 stdin = "".join(stdin) | ||
|  |         out, err = p.communicate(stdin) | ||
|  |         if not type(out) is str: | ||
|  |             out = out.decode() | ||
|  |         if not type(err) is str: | ||
|  |             err = err.decode() | ||
|  |         self._stdout.append(out) | ||
|  |         self._stderr.append(err) | ||
|  |         self.status = p.returncode | ||
|  | 
 | ||
|  |         if self.verbose: | ||
|  |             sys.stdout.write(self._stdout[-1]) | ||
|  |             sys.stderr.write(self._stderr[-1]) | ||
|  | 
 | ||
|  |     def stderr(self, run=None): | ||
|  |         """
 | ||
|  |           Returns the error output from the specified run number. If there is | ||
|  |         no specified run number, then returns the error output of the last run. | ||
|  |         If the run number is less than zero, then returns the error output from | ||
|  |         that many runs back from the current run. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if not run: | ||
|  |             run = len(self._stderr) | ||
|  |         elif run < 0: | ||
|  |             run = len(self._stderr) + run | ||
|  |         run -= 1 | ||
|  |         if run < 0: | ||
|  |             return '' | ||
|  |         return self._stderr[run] | ||
|  | 
 | ||
|  |     def stdout(self, run=None): | ||
|  |         """
 | ||
|  |           Returns the standard output from the specified run number. If there | ||
|  |         is no specified run number, then returns the standard output of the | ||
|  |         last run. If the run number is less than zero, then returns the | ||
|  |         standard output from that many runs back from the current run. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if not run: | ||
|  |             run = len(self._stdout) | ||
|  |         elif run < 0: | ||
|  |             run = len(self._stdout) + run | ||
|  |         run -= 1 | ||
|  |         if run < 0: | ||
|  |             return '' | ||
|  |         return self._stdout[run] | ||
|  | 
 | ||
|  |     def subdir(self, *subdirs): | ||
|  |         """
 | ||
|  |           Create new subdirectories under the temporary working directory, one | ||
|  |         for each argument. An argument may be a list, in which case the list | ||
|  |         elements are concatenated using the os.path.join() method. | ||
|  |         Subdirectories multiple levels deep must be created using a separate | ||
|  |         argument for each level: | ||
|  | 
 | ||
|  |             test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) | ||
|  | 
 | ||
|  |         Returns the number of subdirectories actually created. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         count = 0 | ||
|  |         for sub in subdirs: | ||
|  |             if sub is None: | ||
|  |                 continue | ||
|  |             if type(sub) is list: | ||
|  |                 sub = os.path.join(*tuple(sub)) | ||
|  |             new = os.path.join(self.workdir, sub) | ||
|  |             try: | ||
|  |                 os.mkdir(new) | ||
|  |             except: | ||
|  |                 pass | ||
|  |             else: | ||
|  |                 count += 1 | ||
|  |         return count | ||
|  | 
 | ||
|  |     def unlink(self, file): | ||
|  |         """
 | ||
|  |           Unlinks the specified file name. The file name may be a list, in | ||
|  |         which case the elements are concatenated using the os.path.join() | ||
|  |         method. The file is assumed to be under the temporary working directory | ||
|  |         unless it is an absolute path name. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if type(file) is list: | ||
|  |             file = os.path.join(*tuple(file)) | ||
|  |         if not os.path.isabs(file): | ||
|  |             file = os.path.join(self.workdir, file) | ||
|  |         os.unlink(file) | ||
|  | 
 | ||
|  |     def verbose_set(self, verbose): | ||
|  |         """Set the verbose level.""" | ||
|  |         self.verbose = verbose | ||
|  | 
 | ||
|  |     def workdir_set(self, path): | ||
|  |         """
 | ||
|  |           Creates a temporary working directory with the specified path name. | ||
|  |         If the path is a null string (''), a unique directory name is created. | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if os.path.isabs(path): | ||
|  |             self.workdir = path | ||
|  |         else: | ||
|  |             if path != None: | ||
|  |                 if path == '': | ||
|  |                     path = tempfile.mktemp() | ||
|  |                 if path != None: | ||
|  |                     os.mkdir(path) | ||
|  |                 self._dirlist.append(path) | ||
|  |                 global _Cleanup | ||
|  |                 try: | ||
|  |                     _Cleanup.index(self) | ||
|  |                 except ValueError: | ||
|  |                     _Cleanup.append(self) | ||
|  |                 # We would like to set self.workdir like this: | ||
|  |                 #     self.workdir = path | ||
|  |                 # But symlinks in the path will report things differently from | ||
|  |                 # os.getcwd(), so chdir there and back to fetch the canonical | ||
|  |                 # path. | ||
|  |                 cwd = os.getcwd() | ||
|  |                 os.chdir(path) | ||
|  |                 self.workdir = os.getcwd() | ||
|  |                 os.chdir(cwd) | ||
|  |             else: | ||
|  |                 self.workdir = None | ||
|  | 
 | ||
|  |     def workpath(self, *args): | ||
|  |         """
 | ||
|  |           Returns the absolute path name to a subdirectory or file within the | ||
|  |         current temporary working directory. Concatenates the temporary working | ||
|  |         directory name with the specified arguments using os.path.join(). | ||
|  | 
 | ||
|  |         """
 | ||
|  |         return os.path.join(self.workdir, *tuple(args)) | ||
|  | 
 | ||
|  |     def writable(self, top, write): | ||
|  |         """
 | ||
|  |           Make the specified directory tree writable (write == 1) or not | ||
|  |         (write == None). | ||
|  | 
 | ||
|  |         """
 | ||
|  |         def _walk_chmod(arg, dirname, names): | ||
|  |             st = os.stat(dirname) | ||
|  |             os.chmod(dirname, arg(st[stat.ST_MODE])) | ||
|  |             for name in names: | ||
|  |                 fullname = os.path.join(dirname, name) | ||
|  |                 st = os.stat(fullname) | ||
|  |                 os.chmod(fullname, arg(st[stat.ST_MODE])) | ||
|  | 
 | ||
|  |         _mode_writable = lambda mode: stat.S_IMODE(mode|0o200) | ||
|  |         _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0o200) | ||
|  | 
 | ||
|  |         if write: | ||
|  |             f = _mode_writable | ||
|  |         else: | ||
|  |             f = _mode_non_writable | ||
|  |         try: | ||
|  |             for root, _, files in os.walk(top): | ||
|  |                 _walk_chmod(f, root, files) | ||
|  |         except: | ||
|  |             pass  # Ignore any problems changing modes. | ||
|  | 
 | ||
|  |     def write(self, file, content, mode='wb'): | ||
|  |         """
 | ||
|  |           Writes the specified content text (second argument) to the specified | ||
|  |         file name (first argument). The file name may be a list, in which case | ||
|  |         the elements are concatenated using the os.path.join() method. The file | ||
|  |         is created under the temporary working directory. Any subdirectories in | ||
|  |         the path must already exist. The I/O mode for the file may be specified | ||
|  |         and must begin with a 'w'. The default is 'wb' (binary write). | ||
|  | 
 | ||
|  |         """
 | ||
|  |         if type(file) is list: | ||
|  |             file = os.path.join(*tuple(file)) | ||
|  |         if not os.path.isabs(file): | ||
|  |             file = os.path.join(self.workdir, file) | ||
|  |         if mode[0] != 'w': | ||
|  |             raise ValueError("mode must begin with 'w'") | ||
|  |         open(file, mode).write(content) |