610 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			610 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| """
 | |
| TestCmd.py:  a testing framework for commands and scripts.
 | |
| 
 | |
| The TestCmd module provides a framework for portable automated testing of
 | |
| executable commands and scripts (in any language, not just Python), especially
 | |
| commands and scripts that require file system interaction.
 | |
| 
 | |
| In addition to running tests and evaluating conditions, the TestCmd module
 | |
| manages and cleans up one or more temporary workspace directories, and provides
 | |
| methods for creating files and directories in those workspace directories from
 | |
| in-line data, here-documents), allowing tests to be completely self-contained.
 | |
| 
 | |
| A TestCmd environment object is created via the usual invocation:
 | |
| 
 | |
|     test = TestCmd()
 | |
| 
 | |
| The TestCmd module provides pass_test(), fail_test(), and no_result() unbound
 | |
| methods that report test results for use with the Aegis change management
 | |
| system. These methods terminate the test immediately, reporting PASSED, FAILED
 | |
| or NO RESULT respectively and exiting with status 0 (success), 1 or 2
 | |
| respectively. This allows for a distinction between an actual failed test and a
 | |
| test that could not be properly evaluated because of an external condition (such
 | |
| as a full file system or incorrect permissions).
 | |
| 
 | |
| """
 | |
| 
 | |
| # Copyright 2000 Steven Knight
 | |
| # This module is free software, and you may redistribute it and/or modify
 | |
| # it under the same terms as Python itself, so long as this copyright message
 | |
| # and disclaimer are retained in their original form.
 | |
| #
 | |
| # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
 | |
| # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
 | |
| # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
 | |
| # DAMAGE.
 | |
| #
 | |
| # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
 | |
| # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
 | |
| # PARTICULAR PURPOSE.  THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
 | |
| # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
 | |
| # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
 | |
| 
 | |
| # Copyright 2002-2003 Vladimir Prus.
 | |
| # Copyright 2002-2003 Dave Abrahams.
 | |
| # Copyright 2006 Rene Rivera.
 | |
| # Distributed under the Boost Software License, Version 1.0.
 | |
| #    (See accompanying file LICENSE.txt or copy at
 | |
| #         https://www.bfgroup.xyz/b2/LICENSE.txt)
 | |
| 
 | |
| from __future__ import print_function
 | |
| 
 | |
| __author__ = "Steven Knight <knight@baldmt.com>"
 | |
| __revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software"
 | |
| __version__ = "0.02"
 | |
| 
 | |
| from types import *
 | |
| 
 | |
| import os
 | |
| import os.path
 | |
| import re
 | |
| import shutil
 | |
| import stat
 | |
| import subprocess
 | |
| import sys
 | |
| import tempfile
 | |
| import traceback
 | |
| 
 | |
| 
 | |
| tempfile.template = 'testcmd.'
 | |
| 
 | |
| _Cleanup = []
 | |
| 
 | |
| def _clean():
 | |
|     global _Cleanup
 | |
|     list = _Cleanup[:]
 | |
|     _Cleanup = []
 | |
|     list.reverse()
 | |
|     for test in list:
 | |
|         test.cleanup()
 | |
| 
 | |
| sys.exitfunc = _clean
 | |
| 
 | |
| 
 | |
| def caller(tblist, skip):
 | |
|     string = ""
 | |
|     arr = []
 | |
|     for file, line, name, text in tblist:
 | |
|         if file[-10:] == "TestCmd.py":
 | |
|                 break
 | |
|         arr = [(file, line, name, text)] + arr
 | |
|     atfrom = "at"
 | |
|     for file, line, name, text in arr[skip:]:
 | |
|         if name == "?":
 | |
|             name = ""
 | |
|         else:
 | |
|             name = " (" + name + ")"
 | |
|         string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
 | |
|         atfrom = "\tfrom"
 | |
|     return string
 | |
| 
 | |
| 
 | |
| def fail_test(self=None, condition=True, function=None, skip=0):
 | |
|     """Cause the test to fail.
 | |
| 
 | |
|       By default, the fail_test() method reports that the test FAILED and exits
 | |
|     with a status of 1. If a condition argument is supplied, the test fails
 | |
|     only if the condition is true.
 | |
| 
 | |
|     """
 | |
|     if not condition:
 | |
|         return
 | |
|     if not function is None:
 | |
|         function()
 | |
|     of = ""
 | |
|     desc = ""
 | |
|     sep = " "
 | |
|     if not self is None:
 | |
|         if self.program:
 | |
|             of = " of " + " ".join(self.program)
 | |
|             sep = "\n\t"
 | |
|         if self.description:
 | |
|             desc = " [" + self.description + "]"
 | |
|             sep = "\n\t"
 | |
| 
 | |
|     at = caller(traceback.extract_stack(), skip)
 | |
| 
 | |
|     sys.stderr.write("FAILED test" + of + desc + sep + at + """
 | |
| in directory: """ + os.getcwd() )
 | |
|     sys.exit(1)
 | |
| 
 | |
| 
 | |
| def no_result(self=None, condition=True, function=None, skip=0):
 | |
|     """Causes a test to exit with no valid result.
 | |
| 
 | |
|       By default, the no_result() method reports NO RESULT for the test and
 | |
|     exits with a status of 2. If a condition argument is supplied, the test
 | |
|     fails only if the condition is true.
 | |
| 
 | |
|     """
 | |
|     if not condition:
 | |
|         return
 | |
|     if not function is None:
 | |
|         function()
 | |
|     of = ""
 | |
|     desc = ""
 | |
|     sep = " "
 | |
|     if not self is None:
 | |
|         if self.program:
 | |
|             of = " of " + self.program
 | |
|             sep = "\n\t"
 | |
|         if self.description:
 | |
|             desc = " [" + self.description + "]"
 | |
|             sep = "\n\t"
 | |
| 
 | |
|     at = caller(traceback.extract_stack(), skip)
 | |
|     sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
 | |
|     sys.exit(2)
 | |
| 
 | |
| 
 | |
| def pass_test(self=None, condition=True, function=None):
 | |
|     """Causes a test to pass.
 | |
| 
 | |
|       By default, the pass_test() method reports PASSED for the test and exits
 | |
|     with a status of 0. If a condition argument is supplied, the test passes
 | |
|     only if the condition is true.
 | |
| 
 | |
|     """
 | |
|     if not condition:
 | |
|         return
 | |
|     if not function is None:
 | |
|         function()
 | |
|     sys.stderr.write("PASSED\n")
 | |
|     sys.exit(0)
 | |
| 
 | |
| class MatchError(object):
 | |
|     def __init__(self, message):
 | |
|         self.message = message
 | |
|     def __nonzero__(self):
 | |
|         return False
 | |
|     def __bool__(self):
 | |
|         return False
 | |
| 
 | |
| def match_exact(lines=None, matches=None):
 | |
|     """
 | |
|       Returns whether the given lists or strings containing lines separated
 | |
|     using newline characters contain exactly the same data.
 | |
| 
 | |
|     """
 | |
|     if not type(lines) is list:
 | |
|         lines = lines.split("\n")
 | |
|     if not type(matches) is list:
 | |
|         matches = matches.split("\n")
 | |
|     if len(lines) != len(matches):
 | |
|         return
 | |
|     for i in range(len(lines)):
 | |
|         if lines[i] != matches[i]:
 | |
|             return MatchError("Mismatch at line %d\n- %s\n+ %s\n" %
 | |
|                 (i+1, matches[i], lines[i]))
 | |
|     if len(lines) < len(matches):
 | |
|         return MatchError("Missing lines at line %d\n- %s" %
 | |
|             (len(lines), "\n- ".join(matches[len(lines):])))
 | |
|     if len(lines) > len(matches):
 | |
|         return MatchError("Extra lines at line %d\n+ %s" %
 | |
|             (len(matches), "\n+ ".join(lines[len(matches):])))
 | |
|     return 1
 | |
| 
 | |
| 
 | |
| def match_re(lines=None, res=None):
 | |
|     """
 | |
|       Given lists or strings contain lines separated using newline characters.
 | |
|     This function matches those lines one by one, interpreting the lines in the
 | |
|     res parameter as regular expressions.
 | |
| 
 | |
|     """
 | |
|     if not type(lines) is list:
 | |
|         lines = lines.split("\n")
 | |
|     if not type(res) is list:
 | |
|         res = res.split("\n")
 | |
|     for i in range(min(len(lines), len(res))):
 | |
|         if not re.compile("^" + res[i] + "$").search(lines[i]):
 | |
|             return MatchError("Mismatch at line %d\n- %s\n+ %s\n" %
 | |
|                 (i+1, res[i], lines[i]))
 | |
|     if len(lines) < len(res):
 | |
|         return MatchError("Missing lines at line %d\n- %s" %
 | |
|             (len(lines), "\n- ".join(res[len(lines):])))
 | |
|     if len(lines) > len(res):
 | |
|         return MatchError("Extra lines at line %d\n+ %s" %
 | |
|             (len(res), "\n+ ".join(lines[len(res):])))
 | |
|     return 1
 | |
| 
 | |
| 
 | |
| class TestCmd:
 | |
|     def __init__(self, description=None, program=None, workdir=None,
 | |
|         subdir=None, verbose=False, match=None, inpath=None):
 | |
| 
 | |
|         self._cwd = os.getcwd()
 | |
|         self.description_set(description)
 | |
|         self.program_set(program, inpath)
 | |
|         self.verbose_set(verbose)
 | |
|         if match is None:
 | |
|             self.match_func = match_re
 | |
|         else:
 | |
|             self.match_func = match
 | |
|         self._dirlist = []
 | |
|         self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
 | |
|         env = os.environ.get('PRESERVE')
 | |
|         if env:
 | |
|             self._preserve['pass_test'] = env
 | |
|             self._preserve['fail_test'] = env
 | |
|             self._preserve['no_result'] = env
 | |
|         else:
 | |
|             env = os.environ.get('PRESERVE_PASS')
 | |
|             if env is not None:
 | |
|                 self._preserve['pass_test'] = env
 | |
|             env = os.environ.get('PRESERVE_FAIL')
 | |
|             if env is not None:
 | |
|                 self._preserve['fail_test'] = env
 | |
|             env = os.environ.get('PRESERVE_PASS')
 | |
|             if env is not None:
 | |
|                 self._preserve['PRESERVE_NO_RESULT'] = env
 | |
|         self._stdout = []
 | |
|         self._stderr = []
 | |
|         self.status = None
 | |
|         self.condition = 'no_result'
 | |
|         self.workdir_set(workdir)
 | |
|         self.subdir(subdir)
 | |
| 
 | |
|     def __del__(self):
 | |
|         self.cleanup()
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return "%x" % id(self)
 | |
| 
 | |
|     def cleanup(self, condition=None):
 | |
|         """
 | |
|           Removes any temporary working directories for the specified TestCmd
 | |
|         environment. If the environment variable PRESERVE was set when the
 | |
|         TestCmd environment was created, temporary working directories are not
 | |
|         removed. If any of the environment variables PRESERVE_PASS,
 | |
|         PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd
 | |
|         environment was created, then temporary working directories are not
 | |
|         removed if the test passed, failed or had no result, respectively.
 | |
|         Temporary working directories are also preserved for conditions
 | |
|         specified via the preserve method.
 | |
| 
 | |
|           Typically, this method is not called directly, but is used when the
 | |
|         script exits to clean up temporary working directories as appropriate
 | |
|         for the exit status.
 | |
| 
 | |
|         """
 | |
|         if not self._dirlist:
 | |
|             return
 | |
|         if condition is None:
 | |
|             condition = self.condition
 | |
|         if self._preserve[condition]:
 | |
|             for dir in self._dirlist:
 | |
|                 print("Preserved directory %s" % dir)
 | |
|         else:
 | |
|             list = self._dirlist[:]
 | |
|             list.reverse()
 | |
|             for dir in list:
 | |
|                 self.writable(dir, 1)
 | |
|                 shutil.rmtree(dir, ignore_errors=1)
 | |
| 
 | |
|         self._dirlist = []
 | |
|         self.workdir = None
 | |
|         os.chdir(self._cwd)
 | |
|         try:
 | |
|             global _Cleanup
 | |
|             _Cleanup.remove(self)
 | |
|         except (AttributeError, ValueError):
 | |
|             pass
 | |
| 
 | |
|     def description_set(self, description):
 | |
|         """Set the description of the functionality being tested."""
 | |
|         self.description = description
 | |
| 
 | |
|     def fail_test(self, condition=True, function=None, skip=0):
 | |
|         """Cause the test to fail."""
 | |
|         if not condition:
 | |
|             return
 | |
|         self.condition = 'fail_test'
 | |
|         fail_test(self = self,
 | |
|                   condition = condition,
 | |
|                   function = function,
 | |
|                   skip = skip)
 | |
| 
 | |
|     def match(self, lines, matches):
 | |
|         """Compare actual and expected file contents."""
 | |
|         return self.match_func(lines, matches)
 | |
| 
 | |
|     def match_exact(self, lines, matches):
 | |
|         """Compare actual and expected file content exactly."""
 | |
|         return match_exact(lines, matches)
 | |
| 
 | |
|     def match_re(self, lines, res):
 | |
|         """Compare file content with a regular expression."""
 | |
|         return match_re(lines, res)
 | |
| 
 | |
|     def no_result(self, condition=True, function=None, skip=0):
 | |
|         """Report that the test could not be run."""
 | |
|         if not condition:
 | |
|             return
 | |
|         self.condition = 'no_result'
 | |
|         no_result(self = self,
 | |
|                   condition = condition,
 | |
|                   function = function,
 | |
|                   skip = skip)
 | |
| 
 | |
|     def pass_test(self, condition=True, function=None):
 | |
|         """Cause the test to pass."""
 | |
|         if not condition:
 | |
|             return
 | |
|         self.condition = 'pass_test'
 | |
|         pass_test(self, condition, function)
 | |
| 
 | |
|     def preserve(self, *conditions):
 | |
|         """
 | |
|           Arrange for the temporary working directories for the specified
 | |
|         TestCmd environment to be preserved for one or more conditions. If no
 | |
|         conditions are specified, arranges for the temporary working
 | |
|         directories to be preserved for all conditions.
 | |
| 
 | |
|         """
 | |
|         if conditions == ():
 | |
|             conditions = ('pass_test', 'fail_test', 'no_result')
 | |
|         for cond in conditions:
 | |
|             self._preserve[cond] = 1
 | |
| 
 | |
|     def program_set(self, program, inpath):
 | |
|         """Set the executable program or script to be tested."""
 | |
|         if not inpath and program and not os.path.isabs(program[0]):
 | |
|             program[0] = os.path.join(self._cwd, program[0])
 | |
|         self.program = program
 | |
| 
 | |
|     def read(self, file, mode='rb'):
 | |
|         """
 | |
|           Reads and returns the contents of the specified file name. The file
 | |
|         name may be a list, in which case the elements are concatenated with
 | |
|         the os.path.join() method. The file is assumed to be under the
 | |
|         temporary working directory unless it is an absolute path name. The I/O
 | |
|         mode for the file may be specified and must begin with an 'r'. The
 | |
|         default is 'rb' (binary read).
 | |
| 
 | |
|         """
 | |
|         if type(file) is list:
 | |
|             file = os.path.join(*file)
 | |
|         if not os.path.isabs(file):
 | |
|             file = os.path.join(self.workdir, file)
 | |
|         if mode[0] != 'r':
 | |
|             raise ValueError("mode must begin with 'r'")
 | |
|         return open(file, mode).read()
 | |
| 
 | |
|     def run(self, program=None, arguments=None, chdir=None, stdin=None,
 | |
|         universal_newlines=True):
 | |
|         """
 | |
|           Runs a test of the program or script for the test environment.
 | |
|         Standard output and error output are saved for future retrieval via the
 | |
|         stdout() and stderr() methods.
 | |
| 
 | |
|           'universal_newlines' parameter controls how the child process
 | |
|         input/output streams are opened as defined for the same named Python
 | |
|         subprocess.POpen constructor parameter.
 | |
| 
 | |
|         """
 | |
|         if chdir:
 | |
|             if not os.path.isabs(chdir):
 | |
|                 chdir = os.path.join(self.workpath(chdir))
 | |
|             if self.verbose:
 | |
|                 sys.stderr.write("chdir(" + chdir + ")\n")
 | |
|         else:
 | |
|             chdir = self.workdir
 | |
| 
 | |
|         cmd = []
 | |
|         if program and program[0]:
 | |
|             if program[0] != self.program[0] and not os.path.isabs(program[0]):
 | |
|                 program[0] = os.path.join(self._cwd, program[0])
 | |
|             cmd += program
 | |
|         else:
 | |
|             cmd += self.program
 | |
|         if arguments:
 | |
|             cmd += arguments.split(" ")
 | |
|         if self.verbose:
 | |
|             sys.stderr.write("run(" + " ".join(cmd) + ")\n")
 | |
|         p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
 | |
|             stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir,
 | |
|             universal_newlines=universal_newlines)
 | |
| 
 | |
|         if stdin:
 | |
|             if type(stdin) is list:
 | |
|                 stdin = "".join(stdin)
 | |
|         out, err = p.communicate(stdin)
 | |
|         if not type(out) is str:
 | |
|             out = out.decode()
 | |
|         if not type(err) is str:
 | |
|             err = err.decode()
 | |
|         self._stdout.append(out)
 | |
|         self._stderr.append(err)
 | |
|         self.status = p.returncode
 | |
| 
 | |
|         if self.verbose:
 | |
|             sys.stdout.write(self._stdout[-1])
 | |
|             sys.stderr.write(self._stderr[-1])
 | |
| 
 | |
|     def stderr(self, run=None):
 | |
|         """
 | |
|           Returns the error output from the specified run number. If there is
 | |
|         no specified run number, then returns the error output of the last run.
 | |
|         If the run number is less than zero, then returns the error output from
 | |
|         that many runs back from the current run.
 | |
| 
 | |
|         """
 | |
|         if not run:
 | |
|             run = len(self._stderr)
 | |
|         elif run < 0:
 | |
|             run = len(self._stderr) + run
 | |
|         run -= 1
 | |
|         if run < 0:
 | |
|             return ''
 | |
|         return self._stderr[run]
 | |
| 
 | |
|     def stdout(self, run=None):
 | |
|         """
 | |
|           Returns the standard output from the specified run number. If there
 | |
|         is no specified run number, then returns the standard output of the
 | |
|         last run. If the run number is less than zero, then returns the
 | |
|         standard output from that many runs back from the current run.
 | |
| 
 | |
|         """
 | |
|         if not run:
 | |
|             run = len(self._stdout)
 | |
|         elif run < 0:
 | |
|             run = len(self._stdout) + run
 | |
|         run -= 1
 | |
|         if run < 0:
 | |
|             return ''
 | |
|         return self._stdout[run]
 | |
| 
 | |
|     def subdir(self, *subdirs):
 | |
|         """
 | |
|           Create new subdirectories under the temporary working directory, one
 | |
|         for each argument. An argument may be a list, in which case the list
 | |
|         elements are concatenated using the os.path.join() method.
 | |
|         Subdirectories multiple levels deep must be created using a separate
 | |
|         argument for each level:
 | |
| 
 | |
|             test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
 | |
| 
 | |
|         Returns the number of subdirectories actually created.
 | |
| 
 | |
|         """
 | |
|         count = 0
 | |
|         for sub in subdirs:
 | |
|             if sub is None:
 | |
|                 continue
 | |
|             if type(sub) is list:
 | |
|                 sub = os.path.join(*tuple(sub))
 | |
|             new = os.path.join(self.workdir, sub)
 | |
|             try:
 | |
|                 os.mkdir(new)
 | |
|             except:
 | |
|                 pass
 | |
|             else:
 | |
|                 count += 1
 | |
|         return count
 | |
| 
 | |
|     def unlink(self, file):
 | |
|         """
 | |
|           Unlinks the specified file name. The file name may be a list, in
 | |
|         which case the elements are concatenated using the os.path.join()
 | |
|         method. The file is assumed to be under the temporary working directory
 | |
|         unless it is an absolute path name.
 | |
| 
 | |
|         """
 | |
|         if type(file) is list:
 | |
|             file = os.path.join(*tuple(file))
 | |
|         if not os.path.isabs(file):
 | |
|             file = os.path.join(self.workdir, file)
 | |
|         os.unlink(file)
 | |
| 
 | |
|     def verbose_set(self, verbose):
 | |
|         """Set the verbose level."""
 | |
|         self.verbose = verbose
 | |
| 
 | |
|     def workdir_set(self, path):
 | |
|         """
 | |
|           Creates a temporary working directory with the specified path name.
 | |
|         If the path is a null string (''), a unique directory name is created.
 | |
| 
 | |
|         """
 | |
|         if os.path.isabs(path):
 | |
|             self.workdir = path
 | |
|         else:
 | |
|             if path != None:
 | |
|                 if path == '':
 | |
|                     path = tempfile.mktemp()
 | |
|                 if path != None:
 | |
|                     os.mkdir(path)
 | |
|                 self._dirlist.append(path)
 | |
|                 global _Cleanup
 | |
|                 try:
 | |
|                     _Cleanup.index(self)
 | |
|                 except ValueError:
 | |
|                     _Cleanup.append(self)
 | |
|                 # We would like to set self.workdir like this:
 | |
|                 #     self.workdir = path
 | |
|                 # But symlinks in the path will report things differently from
 | |
|                 # os.getcwd(), so chdir there and back to fetch the canonical
 | |
|                 # path.
 | |
|                 cwd = os.getcwd()
 | |
|                 os.chdir(path)
 | |
|                 self.workdir = os.getcwd()
 | |
|                 os.chdir(cwd)
 | |
|             else:
 | |
|                 self.workdir = None
 | |
| 
 | |
|     def workpath(self, *args):
 | |
|         """
 | |
|           Returns the absolute path name to a subdirectory or file within the
 | |
|         current temporary working directory. Concatenates the temporary working
 | |
|         directory name with the specified arguments using os.path.join().
 | |
| 
 | |
|         """
 | |
|         return os.path.join(self.workdir, *tuple(args))
 | |
| 
 | |
|     def writable(self, top, write):
 | |
|         """
 | |
|           Make the specified directory tree writable (write == 1) or not
 | |
|         (write == None).
 | |
| 
 | |
|         """
 | |
|         def _walk_chmod(arg, dirname, names):
 | |
|             st = os.stat(dirname)
 | |
|             os.chmod(dirname, arg(st[stat.ST_MODE]))
 | |
|             for name in names:
 | |
|                 fullname = os.path.join(dirname, name)
 | |
|                 st = os.stat(fullname)
 | |
|                 os.chmod(fullname, arg(st[stat.ST_MODE]))
 | |
| 
 | |
|         _mode_writable = lambda mode: stat.S_IMODE(mode|0o200)
 | |
|         _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0o200)
 | |
| 
 | |
|         if write:
 | |
|             f = _mode_writable
 | |
|         else:
 | |
|             f = _mode_non_writable
 | |
|         try:
 | |
|             for root, _, files in os.walk(top):
 | |
|                 _walk_chmod(f, root, files)
 | |
|         except:
 | |
|             pass  # Ignore any problems changing modes.
 | |
| 
 | |
|     def write(self, file, content, mode='wb'):
 | |
|         """
 | |
|           Writes the specified content text (second argument) to the specified
 | |
|         file name (first argument). The file name may be a list, in which case
 | |
|         the elements are concatenated using the os.path.join() method. The file
 | |
|         is created under the temporary working directory. Any subdirectories in
 | |
|         the path must already exist. The I/O mode for the file may be specified
 | |
|         and must begin with a 'w'. The default is 'wb' (binary write).
 | |
| 
 | |
|         """
 | |
|         if type(file) is list:
 | |
|             file = os.path.join(*tuple(file))
 | |
|         if not os.path.isabs(file):
 | |
|             file = os.path.join(self.workdir, file)
 | |
|         if mode[0] != 'w':
 | |
|             raise ValueError("mode must begin with 'w'")
 | |
|         open(file, mode).write(content)
 |