Python os 模块,link() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.link()。
def _remove_os_link():
"""
In a context,remove and restore os.link if it exists
"""
class NoValue:
pass
orig_val = getattr(os, 'link', NoValue)
try:
del os.link
except Exception:
pass
try:
yield
finally:
if orig_val is not NoValue:
setattr(os, orig_val)
def makelink(self, tarinfo, targetpath):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation),we try to make a copy of the referenced file
instead of a link.
"""
try:
# For systems that support symbolic and hard links.
if tarinfo.issym():
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
os.link(tarinfo._link_target, targetpath)
else:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except symlink_exception:
try:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
limit = None
else:
# Search the archive before the link,because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def __init__(self, name=""):
"""Construct a TarInfo object. name is the optional name
of the member.
"""
self.name = name # member name
self.mode = 0644 # file permissions
self.uid = 0 # user id
self.gid = 0 # group id
self.size = 0 # file size
self.mtime = 0 # modification time
self.chksum = 0 # header checksum
self.type = REGTYPE # member type
self.linkname = "" # link name
self.uname = "" # user name
self.gname = "" # group name
self.devmajor = 0 # device major number
self.devminor = 0 # device minor number
self.offset = 0 # the tar header starts here
self.offset_data = 0 # the file's data starts here
self.pax_headers = {} # pax header information
# In pax headers the "name" and "linkname" field are called
# "path" and "linkpath".
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, prev + 1))
if hasattr(os, 'link'):
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
else:
os.rename(os.path.join(self._path,
os.path.join(self._path, str(prev + 1)))
prev += 1
self._next_key = prev + 1
if len(changes) == 0:
return
for name, key_list in sequences.items():
for old, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def _remove_os_link():
"""
In a context, orig_val)
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def copy(self, target, mode=False):
""" copy path to target."""
if self.check(file=1):
if target.check(dir=1):
target = target.join(self.basename)
assert self!=target
copychunked(self, target)
if mode:
copymode(self.strpath, target.strpath)
else:
def rec(p):
return p.check(link=0)
for x in self.visit(rec=rec):
relpath = x.relto(self)
newx = target.join(relpath)
newx.dirpath().ensure(dir=1)
if x.check(link=1):
newx.mksymlinkto(x.readlink())
continue
elif x.check(file=1):
copychunked(x, newx)
elif x.check(dir=1):
newx.ensure(dir=1)
if mode:
copymode(x.strpath, newx.strpath)
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def __init__(self, name=""):
"""Construct a TarInfo object. name is the optional name
of the member.
"""
self.name = name # member name
self.mode = 0644 # file permissions
self.uid = 0 # user id
self.gid = 0 # group id
self.size = 0 # file size
self.mtime = 0 # modification time
self.chksum = 0 # header checksum
self.type = REGTYPE # member type
self.linkname = "" # link name
self.uname = "" # user name
self.gname = "" # group name
self.devmajor = 0 # device major number
self.devminor = 0 # device minor number
self.offset = 0 # the tar header starts here
self.offset_data = 0 # the file's data starts here
self.pax_headers = {} # pax header information
# In pax headers the "name" and "linkname" field are called
# "path" and "linkpath".
def makelink(self,we try to make a copy of the referenced file
instead of a link.
"""
if hasattr(os, "symlink") and hasattr(os, "link"):
# For systems that support symbolic and hard links.
if tarinfo.issym():
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.link(tarinfo._link_target, targetpath)
else:
try:
self._extract_member(self._find_link_target(tarinfo), targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def test_copy_file_hard_link_failure(self):
# If hard linking fails,copy_file() falls back on copying file
# (some special filesystems don't support hard linking even under
# Unix,see issue #8876).
with open(self.source, 'w') as f:
f.write('some content')
st = os.stat(self.source)
def _os_link(*args):
raise OSError(0, "linking unsupported")
old_link = os.link
os.link = _os_link
try:
copy_file(self.source, self.target, link='hard')
finally:
os.link = old_link
st2 = os.stat(self.source)
st3 = os.stat(self.target)
self.assertTrue(os.path.samestat(st, st2), (st, st2))
self.assertFalse(os.path.samestat(st2, st3), (st2, st3))
for fn in (self.source, self.target):
with open(fn, 'r') as f:
self.assertEqual(f.read(), 'some content')
def pack(self):
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
prev = 0
changes = []
for key in self.iterkeys():
if key - 1 != prev:
changes.append((key, new in changes:
if old in key_list:
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def safe_make_symlink(input_file_path,output_file_path):
output_file_dir = os.path.dirname(output_file_path)
# Verify the input file is actually there
if not os.path.exists(input_file_path):
raise Exception("can't find file %s"%input_file_path)
safe_make_dirs(output_file_dir)
try:
os.symlink(input_file_path,output_file_path)
except OSError as err:
if err.errno == errno.EEXIST:
# link already exists,check that it is identical to the one we are trying to put down
old = os.path.realpath(input_file_path)
new = os.path.realpath(output_file_path)
if old != new:
raise Exception('Existing file is different than the new symlink')
else:
raise
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def makelink(self,
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def acquire(self, timeout=None):
try:
open(self.unique_name, "wb").close()
except IOError:
raise LockFailed("failed to create %s" % self.unique_name)
timeout = timeout if timeout is not None else self.timeout
end_time = time.time()
if timeout is not None and timeout > 0:
end_time += timeout
while True:
# Try and create a hard link to it.
try:
os.link(self.unique_name, self.lock_file)
except OSError:
# Link creation failed. Maybe we've double-locked?
nlinks = os.stat(self.unique_name).st_nlink
if nlinks == 2:
# The original link plus the one I created == 2. We're
# good to go.
return
else:
# Otherwise the lock creation failed.
if timeout is not None and time.time() > end_time:
os.unlink(self.unique_name)
if timeout > 0:
raise LockTimeout("Timeout waiting to acquire"
" lock for %s" %
self.path)
else:
raise AlreadyLocked("%s is already locked" %
self.path)
time.sleep(timeout is not None and timeout / 10 or 0.1)
else:
# Link creation succeeded. We're good to go.
return
def __init__(self, name=""):
"""Construct a TarInfo object. name is the optional name
of the member.
"""
self.name = name # member name
self.mode = 0o644 # file permissions
self.uid = 0 # user id
self.gid = 0 # group id
self.size = 0 # file size
self.mtime = 0 # modification time
self.chksum = 0 # header checksum
self.type = REGTYPE # member type
self.linkname = "" # link name
self.uname = "" # user name
self.gname = "" # group name
self.devmajor = 0 # device major number
self.devminor = 0 # device minor number
self.offset = 0 # the tar header starts here
self.offset_data = 0 # the file's data starts here
self.sparse = None # sparse member information
self.pax_headers = {} # pax header information
# In pax headers the "name" and "linkname" field are called
# "path" and "linkpath".
def list(self, verbose=True):
"""Print a table of contents to sys.stdout. If `verbose' is False,only
the names of the members are printed. If it is True,an `ls -l'-like
output is produced.
"""
self._check()
for tarinfo in self:
if verbose:
print(filemode(tarinfo.mode), end=' ')
print("%s/%s" % (tarinfo.uname or tarinfo.uid,
tarinfo.gname or tarinfo.gid), end=' ')
if tarinfo.ischr() or tarinfo.isblk():
print("%10s" % ("%d,%d" \
% (tarinfo.devmajor, tarinfo.devminor)), end=' ')
else:
print("%10d" % tarinfo.size, end=' ')
print("%d-%02d-%02d %02d:%02d:%02d" \
% time.localtime(tarinfo.mtime)[:6], end=' ')
print(tarinfo.name + ("/" if tarinfo.isdir() else ""), end=' ')
if verbose:
if tarinfo.issym():
print("->", tarinfo.linkname, end=' ')
if tarinfo.islnk():
print("link to", end=' ')
print()
def extract(self, member, path="", set_attrs=True):
"""Extract a member from the archive to the current working directory,
using its full name. Its file information is extracted as accurately
as possible. `member' may be a filename or a TarInfo object. You can
specify a different directory using `path'. File attributes (owner,
mtime,mode) are set unless `set_attrs' is False.
"""
self._check("r")
if isinstance(member, str):
tarinfo = self.getmember(member)
else:
tarinfo = member
# Prepare the link target for makelink().
if tarinfo.islnk():
tarinfo._link_target = os.path.join(path, tarinfo.linkname)
try:
self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
set_attrs=set_attrs)
except EnvironmentError as e:
if self.errorlevel > 0:
raise
else:
if e.filename is None:
self._dbg(1, "tarfile: %s" % e.strerror)
else:
self._dbg(1, "tarfile: %s %r" % (e.strerror, e.filename))
except ExtractError as e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def makelink(self,
targetpath)
except symlink_exception:
if tarinfo.issym():
linkpath = os.path.join(os.path.dirname(tarinfo.name),
tarinfo.linkname)
else:
linkpath = tarinfo.linkname
else:
try:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def make_release_tree(self, base_dir, files):
orig.sdist.make_release_tree(self, files)
# Save any egg_info command line options used to create this sdist
dest = os.path.join(base_dir, 'setup.cfg')
if hasattr(os, 'link') and os.path.exists(dest):
# unlink and re-copy,since it might be hard-linked,and
# we don't want to change the source version
os.unlink(dest)
self.copy_file('setup.cfg', dest)
self.get_finalized_command('egg_info').save_version_info(dest)
def __init__(self, name=""):
"""Construct a TarInfo object. name is the optional name
of the member.
"""
self.name = name # member name
self.mode = 0o644 # file permissions
self.uid = 0 # user id
self.gid = 0 # group id
self.size = 0 # file size
self.mtime = 0 # modification time
self.chksum = 0 # header checksum
self.type = REGTYPE # member type
self.linkname = "" # link name
self.uname = "" # user name
self.gname = "" # group name
self.devmajor = 0 # device major number
self.devminor = 0 # device minor number
self.offset = 0 # the tar header starts here
self.offset_data = 0 # the file's data starts here
self.sparse = None # sparse member information
self.pax_headers = {} # pax header information
# In pax headers the "name" and "linkname" field are called
# "path" and "linkpath".
def extract(self, set_attrs=True, *, numeric_owner=False):
"""Extract a member from the archive to the current working directory,mode) are set unless `set_attrs' is False. If `numeric_owner`
is True,only the numbers for user/group names are used and not
the names.
"""
self._check("r")
if isinstance(member,
set_attrs=set_attrs,
numeric_owner=numeric_owner)
except OSError as e:
if self.errorlevel > 0:
raise
else:
if e.filename is None:
self._dbg(1, "tarfile: %s" % e)
def extractfile(self, member):
"""Extract a member from the archive as a file object. `member' may be
a filename or a TarInfo object. If `member' is a regular file or a
link,an io.BufferedReader object is returned. Otherwise,None is
returned.
"""
self._check("r")
if isinstance(member, str):
tarinfo = self.getmember(member)
else:
tarinfo = member
if tarinfo.isreg() or tarinfo.type not in SUPPORTED_TYPES:
# Members with unknown types are treated as regular files.
return self.fileobject(self, tarinfo)
elif tarinfo.islnk() or tarinfo.issym():
if isinstance(self.fileobj, _Stream):
# A small but ugly workaround for the case that someone tries
# to extract a (sym)link as a file-object from a non-seekable
# stream of tar blocks.
raise StreamError("cannot extract (sym)link as file object")
else:
# A (sym)link's file object is its target's file object.
return self.extractfile(self._find_link_target(tarinfo))
else:
# If there's no data associated with the member (directory,chrdev,
# blkdev,etc.),return None instead of a file object.
return None
def acquire(self, self.lock_file)
except OSError:
# Link creation failed. Maybe we've double-locked?
nlinks = os.stat(self.unique_name).st_nlink
if nlinks == 2:
# The original link plus the one I created == 2. We're
# good to go.
return
else:
# Otherwise the lock creation failed.
if timeout is not None and time.time() > end_time:
os.unlink(self.unique_name)
if timeout > 0:
raise LockTimeout("Timeout waiting to acquire"
" lock for %s" %
self.path)
else:
raise AlreadyLocked("%s is already locked" %
self.path)
time.sleep(timeout is not None and timeout / 10 or 0.1)
else:
# Link creation succeeded. We're good to go.
return
def __init__(self, name=""):
"""Construct a TarInfo object. name is the optional name
of the member.
"""
self.name = name # member name
self.mode = 0o644 # file permissions
self.uid = 0 # user id
self.gid = 0 # group id
self.size = 0 # file size
self.mtime = 0 # modification time
self.chksum = 0 # header checksum
self.type = REGTYPE # member type
self.linkname = "" # link name
self.uname = "" # user name
self.gname = "" # group name
self.devmajor = 0 # device major number
self.devminor = 0 # device minor number
self.offset = 0 # the tar header starts here
self.offset_data = 0 # the file's data starts here
self.sparse = None # sparse member information
self.pax_headers = {} # pax header information
# In pax headers the "name" and "linkname" field are called
# "path" and "linkpath".
def makelink(self,
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def make_release_tree(self, dest)
self.get_finalized_command('egg_info').save_version_info(dest)
def list(self,an `ls -l'-like
output is produced.
"""
self._check()
for tarinfo in self:
if verbose:
print filemode(tarinfo.mode),
print "%s/%s" % (tarinfo.uname or tarinfo.uid,
if tarinfo.ischr() or tarinfo.isblk():
print "%10s" % ("%d,
else:
print "%10d" % tarinfo.size,
print "%d-%02d-%02d %02d:%02d:%02d" \
% time.localtime(tarinfo.mtime)[:6],
print tarinfo.name + ("/" if tarinfo.isdir() else ""),
if verbose:
if tarinfo.issym():
print "->",
if tarinfo.islnk():
print "link to",
print
def extract(self, path=""):
"""Extract a member from the archive to the current working directory,
using its full name. Its file information is extracted as accurately
as possible. `member' may be a filename or a TarInfo object. You can
specify a different directory using `path'.
"""
self._check("r")
if isinstance(member, basestring):
tarinfo = self.getmember(member)
else:
tarinfo = member
# Prepare the link target for makelink().
if tarinfo.islnk():
tarinfo._link_target = os.path.join(path, tarinfo.name))
except EnvironmentError, e:
if self.errorlevel > 0:
raise
else:
if e.filename is None:
self._dbg(1, e.filename))
except ExtractError, e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def extractfile(self, member):
"""Extract a member from the archive as a file object. `member' may be
a filename or a TarInfo object. If `member' is a regular file,a
file-like object is returned. If `member' is a link,a file-like
object is constructed from the link's target. If `member' is none of
the above,None is returned.
The file-like object is read-only and provides the following
methods: read(),readline(),readlines(),seek() and tell()
"""
self._check("r")
if isinstance(member, basestring):
tarinfo = self.getmember(member)
else:
tarinfo = member
if tarinfo.isreg():
return self.fileobject(self, tarinfo)
elif tarinfo.type not in SUPPORTED_TYPES:
# If a member's type is unknown,it is treated as a
# regular file.
return self.fileobject(self,return None instead of a file object.
return None