Python os 模块,path() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用@R_404_4360@()。
def Loadplugins(plugins, verbose):
if plugins == '':
return
scriptPath = os.path.dirname(sys.argv[0])
for plugin in sum(map(ProcessAt, plugins.split(',')), []):
try:
if not plugin.lower().endswith('.py'):
plugin += '.py'
if os.path.dirname(plugin) == '':
if not os.path.exists(plugin):
scriptPlugin = os.path.join(scriptPath, plugin)
if os.path.exists(scriptPlugin):
plugin = scriptPlugin
exec(open(plugin, 'r').read())
except Exception as e:
print('Error loading plugin: %s' % plugin)
if verbose:
raise e
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location,such as /tmp,it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a kNown insecure location is used.
See distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows,permissions are generally restrictive by default
# and temp directories are not writable by other users,so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def atomic_writer(file_path, mode):
"""Atomic file writer.
:param file_path: path of file to write to.
:type file_path: ``unicode``
:param mode: sames as for `func:open`
:type mode: string
.. versionadded:: 1.12
Context manager that ensures the file is only written if the write
succeeds. The data is first written to a temporary file.
"""
temp_suffix = '.aw.temp'
temp_file_path = file_path + temp_suffix
with open(temp_file_path, mode) as file_obj:
try:
yield file_obj
os.rename(temp_file_path, file_path)
finally:
try:
os.remove(temp_file_path)
except (OSError, IOError):
pass
def cachedir(self):
"""Path to workflow's cache directory.
The cache directory is a subdirectory of Alfred's own cache directory
in ``~/Library/Caches``. The full path is:
``~/Library/Caches/com.runningwithCrayons.Alfred-X/Workflow Data/<bundle id>``
``Alfred-X`` may be ``Alfred-2`` or ``Alfred-3``.
:returns: full path to workflow's cache directory
:rtype: ``unicode``
"""
if self.alfred_env.get('workflow_cache'):
dirpath = self.alfred_env.get('workflow_cache')
else:
dirpath = self._default_cachedir
return self._create(dirpath)
def datadir(self):
"""Path to workflow's data directory.
The data directory is a subdirectory of Alfred's own data directory in
``~/Library/Application Support``. The full path is:
``~/Library/Application Support/Alfred 2/Workflow Data/<bundle id>``
:returns: full path to workflow data directory
:rtype: ``unicode``
"""
if self.alfred_env.get('workflow_data'):
dirpath = self.alfred_env.get('workflow_data')
else:
dirpath = self._default_datadir
return self._create(dirpath)
def _delete_directory_contents(self, dirpath, filter_func):
"""Delete all files in a directory.
:param dirpath: path to directory to clear
:type dirpath: ``unicode`` or ``str``
:param filter_func function to determine whether a file shall be
deleted or not.
:type filter_func ``callable``
"""
if os.path.exists(dirpath):
for filename in os.listdir(dirpath):
if not filter_func(filename):
continue
path = os.path.join(dirpath, filename)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.unlink(path)
self.logger.debug('Deleted : %r', path)
def test_rar_detection(self):
"""
Tests the rar file detection process
"""
from newsreap.codecs.CodecRar import RAR_PART_RE
result = RAR_PART_RE.match('/path/to/test.rar')
assert result is not None
assert result.group('part') is None
result = RAR_PART_RE.match('/path/to/test.RaR')
assert result is not None
assert result.group('part') is None
result = RAR_PART_RE.match('/path/to/test.r01')
assert result is not None
assert result.group('part') == '01'
result = RAR_PART_RE.match('/path/to/test.R200')
assert result is not None
assert result.group('part') == '200'
result = RAR_PART_RE.match('/path/to/test.part65.rar')
assert result is not None
assert result.group('part') == '65'
def test_7z_detection(self):
"""
Tests the 7z file detection process
"""
from newsreap.codecs.Codec7Zip import SEVEN_ZIP_PART_RE
result = SEVEN_ZIP_PART_RE.match('/path/to/test.7z')
assert result is not None
assert result.group('part') is None
result = SEVEN_ZIP_PART_RE.match('/path/to/test.7Z')
assert result is not None
assert result.group('part') is None
result = SEVEN_ZIP_PART_RE.match('/path/to/test.7z.001')
assert result is not None
assert result.group('part0') == '001'
result = SEVEN_ZIP_PART_RE.match('/path/to/test.part65.7z')
assert result is not None
assert result.group('part') == '65'
def _build_from_requirements(cls, req_spec):
"""
Build a working set from a requirement spec. Rewrites sys.path.
"""
# try it without defaults already on sys.path
# by starting with an empty path
ws = cls([])
reqs = parse_requirements(req_spec)
dists = ws.resolve(reqs, Environment())
for dist in dists:
ws.add(dist)
# add any missing entries from sys.path
for entry in sys.path:
if entry not in ws.entries:
ws.add_entry(entry)
# then copy back to sys.path
sys.path[:] = ws.entries
return ws
def __init__(self, search_path=None, platform=get_supported_platform(),
python=PY_MAJOR):
"""Snapshot distributions available on a search path
Any distributions found on `search_path` are added to the environment.
`search_path` should be a sequence of ``sys.path`` items. If not
supplied,``sys.path`` is used.
`platform` is an optional string specifying the name of the platform
that platform-specific distributions must be compatible with. If
unspecified,it defaults to the current platform. `python` is an
optional string naming the desired version of Python (e.g. ``'3.3'``);
it defaults to the current version.
You may explicitly set `platform` (and/or `python`) to ``None`` if you
wish to map *all* distributions,not just those compatible with the
running platform or Python version.
"""
self._distmap = {}
self.platform = platform
self.python = python
self.scan(search_path)
def get_cache_path(self, archive_name, names=()):
"""Return absolute location in cache for `archive_name` and `names`
The parent directory of the resulting path will be created if it does
not already exist. `archive_name` should be the base filename of the
enclosing egg (which may not be the name of the enclosing zipfile!),
including its ".egg" extension. `names`,if provided,should be a
sequence of path name parts "under" the egg's extraction location.
This method should only be called by resource providers that need to
obtain an extraction location,and only for names they intend to
extract,as it tracks the generated names for possible cleanup later.
"""
extract_path = self.extraction_path or get_default_cache()
target_path = os.path.join(extract_path, archive_name + '-tmp', *names)
try:
_bypass_ensure_directory(target_path)
except:
self.extraction_error()
self._warn_unsafe_extraction_path(extract_path)
self.cached_files[target_path] = 1
return target_path
def build(cls, path):
"""
Build a dictionary similar to the zipimport directory
caches,except instead of tuples,store ZipInfo objects.
Use a platform-specific path separator (os.sep) for the path keys
for compatibility with pypy on Windows.
"""
with ContextualZipFile(path) as zfile:
items = (
(
name.replace('/', os.sep),
zfile.getinfo(name),
)
for name in zfile.namelist()
)
return dict(items)
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def find_eggs_in_zip(importer, path_item, only=False):
"""
Find eggs in zip files; possibly multiple nested eggs.
"""
if importer.archive.endswith('.whl'):
# wheels are not supported with this finder
# they don't have PKG-INFO Metadata,and won't ever contain eggs
return
Metadata = EggMetadata(importer)
if Metadata.has_Metadata('PKG-INFO'):
yield distribution.from_filename(path_item, Metadata=Metadata)
if only:
# don't yield nested distros
return
for subitem in Metadata.resource_listdir('/'):
if _is_unpacked_egg(subitem):
subpath = os.path.join(path_item, subitem)
for dist in find_eggs_in_zip(zipimport.zipimporter(subpath), subpath):
yield dist
def _by_version_descending(names):
"""
Given a list of filenames,return them in descending order
by version number.
>>> names = 'bar','foo','Python-2.7.10.egg','Python-2.7.2.egg'
>>> _by_version_descending(names)
['Python-2.7.10.egg','Python-2.7.2.egg','bar']
>>> names = 'Setuptools-1.2.3b1.egg','Setuptools-1.2.3.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.egg','Setuptools-1.2.3b1.egg']
>>> names = 'Setuptools-1.2.3b1.egg','Setuptools-1.2.3.post1.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.post1.egg','Setuptools-1.2.3b1.egg']
"""
def _by_version(name):
"""
Parse each component of the filename
"""
name, ext = os.path.splitext(name)
parts = itertools.chain(name.split('-'), [ext])
return [packaging.version.parse(part) for part in parts]
return sorted(names, key=_by_version, reverse=True)
def register_namespace_handler(importer_type, namespace_handler):
"""Register `namespace_handler` to declare namespace packages
`importer_type` is the type or class of a PEP 302 "Importer" (sys.path item
handler),and `namespace_handler` is a callable like this::
def namespace_handler(importer,path_entry,moduleName,module):
# return a path_entry to use for child packages
Namespace handlers are only called if the importer object has already
agreed that it can handle the relevant path item,and they should only
return a subpath if the module __path__ does not already contain an
equivalent subpath. For an example namespace handler,see
``pkg_resources.file_ns_handler``.
"""
_namespace_handlers[importer_type] = namespace_handler
def _handle_ns(packageName, path_item):
"""Ensure that named package includes a subpath of path_item (if needed)"""
importer = get_importer(path_item)
if importer is None:
return None
loader = importer.find_module(packageName)
if loader is None:
return None
module = sys.modules.get(packageName)
if module is None:
module = sys.modules[packageName] = types.ModuleType(packageName)
module.__path__ = []
_set_parent_ns(packageName)
elif not hasattr(module, '__path__'):
raise TypeError("Not a package:", packageName)
handler = _find_adapter(_namespace_handlers, importer)
subpath = handler(importer, packageName, module)
if subpath is not None:
path = module.__path__
path.append(subpath)
loader.load_module(packageName)
_rebuild_mod_path(path, module)
return subpath
def check_version_conflict(self):
if self.key == 'setuptools':
# ignore the inevitable setuptools self-conflicts :(
return
nsp = dict.fromkeys(self._get_Metadata('namespace_packages.txt'))
loc = normalize_path(self.location)
for modname in self._get_Metadata('top_level.txt'):
if (modname not in sys.modules or modname in nsp
or modname in _namespace_packages):
continue
if modname in ('pkg_resources', 'setuptools', 'site'):
continue
fn = getattr(sys.modules[modname], '__file__', None)
if fn and (normalize_path(fn).startswith(loc) or
fn.startswith(self.location)):
continue
issue_warning(
"Module %s was already imported from %s,but %s is being added"
" to sys.path" % (modname, fn, self.location),
)
def __init__(self, name=None, delete=None):
# If we were not given an explicit directory,and we were not given an
# explicit delete option,then we'll default to deleting.
if name is None and delete is None:
delete = True
if name is None:
# We realpath here because some systems have their default tmpdir
# symlinked to another directory. This tends to confuse build
# scripts,so we canonicalize the path by traversing potential
# symlinks here.
name = os.path.realpath(tempfile.mkdtemp(prefix="pip-build-"))
# If we were not given an explicit directory,and we were not given
# an explicit delete option,then we'll default to deleting.
if delete is None:
delete = True
self.name = name
self.delete = delete
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process,then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
prevIoUs = None
while path != prevIoUs:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
prevIoUs, path = path, os.path.dirname(path)
def fix_script(path):
"""Replace #!python with #!/path/to/python
Return True if file was changed."""
# XXX RECORD hashes will need to be updated
if os.path.isfile(path):
with open(path, 'rb') as script:
firstline = script.readline()
if not firstline.startswith(b'#!python'):
return False
exename = sys.executable.encode(sys.getfilesystemencoding())
firstline = b'#!' + exename + os.linesep.encode("ascii")
rest = script.read()
with open(path, 'wb') as script:
script.write(firstline)
script.write(rest)
return True
def uninstallation_paths(dist):
"""
Yield all the uninstallation paths for dist based on RECORD-without-.pyc
Yield paths to all the files in RECORD. For each .py file in RECORD,add
the .pyc in the same directory.
UninstallPathSet.add() takes care of the __pycache__ .pyc.
"""
from pip.utils import FakeFile # circular import
r = csv.reader(FakeFile(dist.get_Metadata_lines('RECORD')))
for row in r:
path = os.path.join(dist.location, row[0])
yield path
if path.endswith('.py'):
dn, fn = os.path.split(path)
base = fn[:-3]
path = os.path.join(dn, base + '.pyc')
yield path
def _build_one(self, req, output_dir, python_tag=None):
"""Build one wheel.
:return: The filename of the built wheel,or None if the build Failed.
"""
tempd = tempfile.mkdtemp('pip-wheel-')
try:
if self.__build_one(req, tempd, python_tag=python_tag):
try:
wheel_name = os.listdir(tempd)[0]
wheel_path = os.path.join(output_dir, wheel_name)
shutil.move(os.path.join(tempd, wheel_name), wheel_path)
logger.info('Stored in directory: %s', output_dir)
return wheel_path
except:
pass
# Ignore return,we can't do anything else useful.
self._clean_one(req)
return None
finally:
rmtree(tempd)
def load(self):
# XXX JSON is not a great database
for path in load_config_paths('wheel'):
conf = os.path.join(native(path), self.CONfig_NAME)
if os.path.exists(conf):
with open(conf, 'r') as infile:
self.data = json.load(infile)
for x in ('signers', 'verifiers'):
if not x in self.data:
self.data[x] = []
if 'schema' not in self.data:
self.data['schema'] = self.SCHEMA
elif self.data['schema'] != self.SCHEMA:
raise ValueError(
"Bad wheel.json version {0},expected {1}".format(
self.data['schema'], self.SCHEMA))
break
return self
def get_cache_path(self, *names)
try:
_bypass_ensure_directory(target_path)
except:
self.extraction_error()
self._warn_unsafe_extraction_path(extract_path)
self.cached_files[target_path] = 1
return target_path
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, UserWarning)
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_Metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_Metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, 'exec')
exec(script_code, namespace)
def build(cls,
)
for name in zfile.namelist()
)
return dict(items)
def _index(self):
try:
return self._dirindex
except AttributeError:
ind = {}
for path in self.zipinfo:
parts = path.split(os.sep)
while parts:
parent = os.sep.join(parts[:-1])
if parent in ind:
ind[parent].append(parts[-1])
break
else:
ind[parent] = [parts.pop()]
self._dirindex = ind
return ind
def find_eggs_in_zip(importer, Metadata=Metadata)
if only:
# don't yield nested distros
return
for subitem in Metadata.resource_listdir('/'):
if _is_egg_path(subitem):
subpath = os.path.join(path_item, subpath):
yield dist
elif subitem.lower().endswith('.dist-info'):
subpath = os.path.join(path_item, subitem)
subMeta = EggMetadata(zipimport.zipimporter(subpath))
subMeta.egg_info = subpath
yield distribution.from_location(path_item, subitem, subMeta)
def _by_version_descending(names):
"""
Given a list of filenames, reverse=True)
def register_namespace_handler(importer_type,see
``pkg_resources.file_ns_handler``.
"""
_namespace_handlers[importer_type] = namespace_handler
def maybe_move(self, spec, dist_filename, setup_base):
dst = os.path.join(self.build_directory, spec.key)
if os.path.exists(dst):
msg = (
"%r already exists in %s; build directory %s will not be kept"
)
log.warn(msg, spec.key, self.build_directory, setup_base)
return setup_base
if os.path.isdir(dist_filename):
setup_base = dist_filename
else:
if os.path.dirname(dist_filename) == setup_base:
os.unlink(dist_filename) # get it out of the tmp dir
contents = os.listdir(setup_base)
if len(contents) == 1:
dist_filename = os.path.join(setup_base, contents[0])
if os.path.isdir(dist_filename):
# if the only thing there is a directory,move it instead
setup_base = dist_filename
ensure_directory(dst)
shutil.move(setup_base, dst)
return dst
def build_and_install(self, setup_script, setup_base):
args = ['bdist_egg', '--dist-dir']
dist_dir = tempfile.mkdtemp(
prefix='egg-dist-tmp-', dir=os.path.dirname(setup_script)
)
try:
self._set_fetcher_options(os.path.dirname(setup_script))
args.append(dist_dir)
self.run_setup(setup_script, setup_base, args)
all_eggs = Environment([dist_dir])
eggs = []
for key in all_eggs:
for dist in all_eggs[key]:
eggs.append(self.install_egg(dist.location, setup_base))
if not eggs and not self.dry_run:
log.warn("No eggs found in %s (setup script problem?)",
dist_dir)
return eggs
finally:
rmtree(dist_dir)
log.set_verbosity(self.verbose) # restore our log verbosity
def _set_fetcher_options(self, base):
"""
When easy_install is about to run bdist_egg on a source dist,that
source dist might have 'setup_requires' directives,requiring
additional fetching. Ensure the fetcher options given to easy_install
are available to that command as well.
"""
# find the fetch options from easy_install and write them out
# to the setup.cfg file.
ei_opts = self.distribution.get_option_dict('easy_install').copy()
fetch_directives = (
'find_links', 'site_dirs', 'index_url', 'optimize',
'site_dirs', 'allow_hosts',
)
fetch_options = {}
for key, val in ei_opts.items():
if key not in fetch_directives:
continue
fetch_options[key.replace('_', '-')] = val[1]
# create a settings dictionary suitable for `edit_config`
settings = dict(easy_install=fetch_options)
cfg_filename = os.path.join(base, 'setup.cfg')
setopt.edit_config(cfg_filename, settings)
def _expand(self, *attrs):
config_vars = self.get_finalized_command('install').config_vars
if self.prefix:
# Set default install_dir/scripts from --prefix
config_vars = config_vars.copy()
config_vars['base'] = self.prefix
scheme = self.INSTALL_SCHEMES.get(os.name, self.DEFAULT_SCHEME)
for attr, val in scheme.items():
if getattr(self, attr, None) is None:
setattr(self, val)
from distutils.util import subst_vars
for attr in attrs:
val = getattr(self, attr)
if val is not None:
val = subst_vars(val, config_vars)
if os.name == 'posix':
val = os.path.expanduser(val)
setattr(self, val)
def sanitize(self):
import @R_404_4360@
if not self.working_dir:
self.working_dir = os.path.curdir
if not self.output_dir:
self.output_dir = os.path.join(self.working_dir, 'build', '')
allowed_types = ['executable', 'library', 'shared']
if self.type not in allowed_types:
# Todo: exceptions?
Style.error('Invalid output type:', self.type)
Style.error('Allowed types:', allowed_types)
self.type = allowed_types[0]
Style.error('Reverting to', self.type)
if not self.executable:
self.executable = os.path.join(self.output_dir, self.name)
if self.type == 'executable':
self.executable = platform.current.as_executable(self.executable)
elif self.type == 'library':
self.executable = platform.current.as_static_library(self.executable)
elif self.type == 'shared':
self.executable = platform.current.as_shared_library(self.executable)
self.working_dir = os.path.realpath(self.working_dir)
self.output_dir = os.path.realpath(self.output_dir)
self.sources.working_directory = Path(self.working_dir)
self._init_hooks()
# Stage hooks
# Create an empty list for each stage
def tail():
if request.method == 'POST':
fi = request.form['file']
if os.path.isfile(fi):
n = int(request.form['n'])
le = io.open(fi, 'r', encoding='utf-8')
taildata = le.read()[-n:]
le.close()
else:
taildata = "No such file."
return render_template('tail.html',taildata=taildata)
def _create_home(self):
if not os.path.isdir(self._HOME + '/' + self._CONfig_DIR):
os.makedirs(self._HOME + '/' + self._CONfig_DIR)
with os.fdopen(os.open(self._HOME + '/' + self._CONfig_DIR + '/' + self._CONfig_FILE_NAME,
os.O_WRONLY | os.O_CREAT, 0o600), 'w'):
pass
with os.fdopen(os.open(self._HOME + '/' + self._CONfig_DIR + '/' + self._CREDENTIALS_FILE_NAME, 'w'):
pass
def grab_dump_and_convert(dev, target_name, dest_path, pkg):
Fun.p_open(Res.adb_grab_heap_dump_file_with_pkg(dev, absolute_tmp_dump_path, pkg))
Fun.sleep(4)
dump_file_path = os.path.join(dest_path, target_name)
dump_file = dump_file_path + Res.hprof_suffix if not dump_file_path.endswith(Res.hprof_suffix) else dump_file_path
if os.path.exists(dump_file):
dump_file = dump_file.replace(Res.hprof_suffix, Fun.current_time() + Res.hprof_suffix)
Fun.p_open(Res.adb_pull_heap_file_to_dest(dev, dump_file))
convert_file_into_directory(dump_file)
def convert_and_store(file_or_path):
"""
:param file_or_path: use a single file with full path or a full path
all the files end with under the path will be converted and store into directory named with the file name
"""
if not os.path.isdir(file_or_path):
convert_file_into_directory(file_or_path)
else:
for filename in os.listdir(file_or_path):
if filename.endswith(Res.hprof_suffix):
convert_file_into_directory(os.path.join(file_or_path, filename))
def convert_file_into_directory(file_with_full_path):
path = dirname(file_with_full_path)
tags = os.path.split(file_with_full_path)
file_name = tags[1]
convert_and_store_in_directory(path, file_name)
def convert_and_store_in_directory(dest_path, target_name):
target_name = target_name.replace(Res.hprof_suffix, '')
dump_file_path = os.path.join(dest_path, target_name)
dump_file = dump_file_path + Res.hprof_suffix if not dump_file_path.endswith(Res.hprof_suffix) else dump_file_path
Fun.make_dir_if_not_exist(dump_file_path)
convert_file = os.path.join(dump_file_path, target_name + Res.convert_suffix)
Fun.p_open(Res.adb_convert_heap_profile(dump_file, convert_file))
def execute(heap_dump_name):
destination_path = '/Users/elex/Documents/dump' # change directory path to your favorites
grab_dump_and_convert(destination_path, heap_dump_name)
def __init__(self, filepath, defaults=None):
"""Create new :class:`Settings` object."""
super(Settings, self).__init__()
self._filepath = filepath
self._nosave = False
self._original = {}
if os.path.exists(self._filepath):
self._load()
elif defaults:
for key, val in defaults.items():
self[key] = val
self.save() # save default settings