Python os 模块,symlink() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.symlink()。
def src_proc_dispatcher(pkg_name, src_tbl_name, src_loc):
tobj = tempfile.mkdtemp(dir='/var/cache/acbs/build/', prefix='acbs.')
src_tbl_loc = os.path.join(src_loc, src_tbl_name)
shadow_ark_loc = os.path.join(tobj, src_tbl_name)
if os.path.isdir(src_tbl_loc):
print('[I] Making a copy of the source directory...', end='')
try:
shutil.copytree(src=src_tbl_loc, dst=shadow_ark_loc)
except:
print('Failed!')
return False
print('Done!')
return True, tobj
else:
os.symlink(src_tbl_loc, shadow_ark_loc)
# print('[D] Source location: {},Shadow link: {}'.format(src_tbl_loc,shadow_ark_loc))
return decomp_file(shadow_ark_loc, tobj), tobj
def makelink(self, tarinfo, targetpath):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation),we try to make a copy of the referenced file
instead of a link.
"""
try:
# For systems that support symbolic and hard links.
if tarinfo.issym():
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
os.link(tarinfo._link_target, targetpath)
else:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except symlink_exception:
try:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def makelink(self,we try to make a copy of the referenced file
instead of a link.
"""
if hasattr(os, "symlink") and hasattr(os, "link"):
# For systems that support symbolic and hard links.
if tarinfo.issym():
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
else:
# See extract().
if os.path.exists(tarinfo._link_target):
if os.path.lexists(targetpath):
os.unlink(targetpath)
os.link(tarinfo._link_target, targetpath)
else:
try:
self._extract_member(self._find_link_target(tarinfo), targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def execute(self):
from ranger.container.file import File
new_path = self.rest(1)
cf = self.fm.thisfile
if not new_path:
return self.fm.notify('Syntax: relink <newpath>', bad=True)
if not cf.is_link:
return self.fm.notify('%s is not a symlink!' % cf.relative_path, bad=True)
if new_path == os.readlink(cf.path):
return
try:
os.remove(cf.path)
os.symlink(new_path, cf.path)
except OSError as err:
self.fm.notify(err)
self.fm.reset()
self.fm.thisdir.pointed_obj = cf
self.fm.thisfile = cf
def copytree(src, dst, symlinks = False, ignore = None):
if not os.path.exists(dst):
os.makedirs(dst)
shutil.copystat(src, dst)
lst = os.listdir(src)
if ignore:
excl = ignore(src, lst)
lst = [x for x in lst if x not in excl]
for item in lst:
s = os.path.join(src, item)
d = os.path.join(dst, item)
if symlinks and os.path.islink(s):
if os.path.lexists(d):
os.remove(d)
os.symlink(os.readlink(s), d)
try:
st = os.lstat(s)
mode = stat.S_IMODE(st.st_mode)
os.lchmod(d, mode)
except:
pass # lchmod not available
elif os.path.isdir(s):
copytree(s, d, symlinks, ignore)
else:
shutil.copy2(s, d)
def setUp(self):
self.template_env = jinja2.Environment(
loader=jinja2.FileSystemLoader("config")
)
# create working dir
self.working_dir = os.path.join(build_path + "run", self.id())
if os.path.exists(self.working_dir):
shutil.rmtree(self.working_dir)
os.makedirs(self.working_dir)
try:
# update the last_run link
if os.path.islink(build_path + "last_run"):
os.unlink(build_path + "last_run")
os.symlink(build_path + "run/{}".format(self.id()),
build_path + "last_run")
except:
# symlink is best effort and can fail when
# running tests in parallel
pass
def clone(self, name, user):
"""
create a clone of self with a given name,owned by user
"""
self.expiration = None
# create the branch on the database
new_branch = Branch(name, self.project, self, user)
db.session.add(new_branch)
db.session.commit()
# clone repository in file system
branch_path = os.path.abspath(join(os.getcwd(), 'repos',
self.project.name,
name, 'source'))
self.get_repo().clone(branch_path, branch=self.name)
os.symlink(os.path.abspath(join('repos', self.project.name,
'_resources/low_resolution')),
join(branch_path, '_resources'))
branch_repo = git.Repo(branch_path)
branch_repo.git.checkout('HEAD', b=name)
config_repo(branch_repo, user.username, user.email)
# build the source
new_branch.build(timeout=60)
return new_branch
def mk_tar_dir(issuer, test_profile):
wd = os.getcwd()
# Make sure there is a tar directory
tardirname = wd
for part in ["tar", issuer, test_profile]:
tardirname = os.path.join(tardirname, part)
if not os.path.isdir(tardirname):
os.mkdir(tardirname)
# Now walk through the log directory and make symlinks from
# the log files to links in the tar directory
logdirname = os.path.join(wd, "log", test_profile)
for item in os.listdir(logdirname):
if item.startswith("."):
continue
ln = os.path.join(logdirname, item)
tn = os.path.join(tardirname, "{}.txt".format(item))
if os.path.isfile(tn):
os.unlink(tn)
if not os.path.islink(tn):
os.symlink(ln, tn)
def close_compressed(filename, hdf5_file, compression_type='bz2', create_link=False):
"""Closes the compressed hdf5_file that was opened with open_compressed.
When the file was opened for writing (using the 'w' flag in open_compressed),the created HDF5 file is compressed into the given file name.
To be able to read the data using the real tools,a link with the correct extension might is created,when create_link is set to True.
"""
hdf5_file_name = hdf5_file.filename
is_writable = hdf5_file.writable
hdf5_file.close()
if is_writable:
# create compressed tar file
tar = tarfile.open(filename, mode="w:" + compression_type)
tar.add(hdf5_file_name, os.path.basename(filename))
tar.close()
if create_link:
extension = {'': '.tar', 'bz2': '.tar.bz2',
'gz': 'tar.gz'}[compression_type]
link_file = filename + extension
if not os.path.exists(link_file):
os.symlink(os.path.basename(filename), link_file)
# clean up locally generated files
os.remove(hdf5_file_name)
def reorganize(dataset_dir):
dirs = {}
dirs['trainA'] = os.path.join(dataset_dir, 'link_trainA')
dirs['trainB'] = os.path.join(dataset_dir, 'link_trainB')
dirs['testA'] = os.path.join(dataset_dir, 'link_testA')
dirs['testB'] = os.path.join(dataset_dir, 'link_testB')
mkdir(dirs.values())
for key in dirs:
try:
os.remove(os.path.join(dirs[key], '0'))
except:
pass
os.symlink(os.path.abspath(os.path.join(dataset_dir, key)),
os.path.join(dirs[key], '0'))
return dirs
def build_same_files_and_dirs(tmpdir, source_is_symlink, dest_is_symlink, use_files):
"""Build temporary files or directories to test indication of same source and destination.
:param bool source_is_symlink: Should the source be a symlink to the destination (both cannot be True)
:param bool dest is symlink: Should the destination be a symlink to the source (both cannot be True)
:param bool use_files: Should files be created (if False,directories are used instead)
"""
if use_files:
real = tmpdir.join('real')
real.write('some data')
else:
real = tmpdir.mkdir('real')
link = tmpdir.join('link')
if source_is_symlink or dest_is_symlink:
os.symlink(str(real), str(link))
if source_is_symlink:
return str(link), str(real)
elif dest_is_symlink:
return str(real), str(link)
return str(real), str(real)
def test_process_single_file_destination_is_symlink_to_source(
tmpdir,
patch_process_single_operation,
standard_handler
):
source = tmpdir.join('source')
source.write('some data')
destination = str(tmpdir.join('destination'))
os.symlink(str(source), destination)
with patch('aws_encryption_sdk_cli.internal.io_handling.open', create=True) as mock_open:
standard_handler.process_single_file(
stream_args=sentinel.stream_args,
source=str(source),
destination=destination
)
assert not mock_open.called
assert not patch_process_single_operation.called
def test_file_to_file_cycle_target_through_symlink(tmpdir):
plaintext = tmpdir.join('source_plaintext')
output_dir = tmpdir.mkdir('output')
os.symlink(str(output_dir), str(tmpdir.join('output_link')))
ciphertext = tmpdir.join('output_link', 'ciphertext')
decrypted = tmpdir.join('decrypted')
with open(str(plaintext), 'wb') as f:
f.write(os.urandom(1024))
encrypt_args = encrypt_args_template().format(
source=str(plaintext),
target=str(ciphertext)
)
decrypt_args = decrypt_args_template().format(
source=str(ciphertext),
target=str(decrypted)
)
aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))
assert filecmp.cmp(str(plaintext), str(decrypted))
def render(self):
# Render all instances
for instance in self.instances:
instance.render()
# Render human readable links to each instance
for instance in self.instances:
try:
os.symlink(instance.node_root, os.path.join(
self.fixture_root, instance.instance_name))
except Exception:
log.exception('Failed to create symlink to %s',
instance.instance_name)
# Render the master control script
self.write_file(self.fixture_control,
self.render_control(), perm=0o755)
# Render the fixture specification file
self.write_file(self.fixture_spec,
self.render_spec())
def run(self):
venv_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'venv', 'cthulhu')
venv_cmd = [
'virtualenv',
venv_path
]
print('Creating virtual environment in ', venv_path)
subprocess.check_call(venv_cmd)
print('Linking `activate` to top level of project.\n')
print('To activate,simply run `source activate`.')
try:
os.symlink(
os.path.join(venv_path, 'bin', 'activate'),
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'activate')
)
except OSError:
print('Unable to create symlink,you may have a stale symlink from a previous invocation.')
def symlink_ms(source, linkname):
"""Python 2 doesn't have os.symlink on windows so we do it ourselfs
:param source: sourceFile
:type source: str
:param linkname: symlink path
:type linkname: str
:raises: WindowsError,raises when it fails to create the symlink if the user permissions
are incorrect
"""
import ctypes
csl = ctypes.windll.kernel32.CreateSymbolicLinkW
csl.argtypes = (ctypes.c_wchar_p, ctypes.c_wchar_p, ctypes.c_uint32)
csl.restype = ctypes.c_ubyte
flags = 1 if os.path.isdir(source) else 0
try:
if csl(linkname, source.replace('/', '\\'), flags) == 0:
raise ctypes.WinError()
except WindowsError:
raise WindowsError("Failed to create symbolicLink due to user permissions")
def symlinks_supported():
"""
A function to check if creating symlinks are supported in the
host platform and/or if they are allowed to be created (e.g.
on Windows it requires admin permissions).
"""
tmpdir = tempfile.mkdtemp()
original_path = os.path.join(tmpdir, 'original')
symlink_path = os.path.join(tmpdir, 'symlink')
os.makedirs(original_path)
try:
os.symlink(original_path, symlink_path)
supported = True
except (OSError, NotImplementedError, AttributeError):
supported = False
else:
os.remove(symlink_path)
finally:
os.rmdir(original_path)
os.rmdir(tmpdir)
return supported
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = os.path.dirname(tarinfo.name) + "/" + tarinfo.linkname
limit = None
else:
# Search the archive before the link,because a hard link is
# just a reference to an already archived file.
linkname = tarinfo.linkname
limit = tarinfo
member = self._getmember(linkname, tarinfo=limit, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def enable_sv(sv, sv_dir, runsvdir):
"""
Enable the specified service,'sv'.
Assemble 'sv_path' and 'service_path' from 'sv','sv_dir',and 'runsvdir'.
Make a symlink from 'sv_path' to 'service_path' and bail if we get one
of a couple different exceptions.
"""
sv_path = os.path.join(sv_dir, sv)
service_path = os.path.join(runsvdir, sv)
try:
check_sv_path(sv_path)
os.symlink(sv_path, service_path)
return True
except NoSuchSvError:
raise NoSuchSvError
except PermissionError:
raise NeedSudoError
except FileExistsError:
raise SvAlreadyEnabledError
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def makelink(self, targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, tarinfo):
"""Find the target member of a symlink or hardlink member in the
archive.
"""
if tarinfo.issym():
# Always search the entire archive.
linkname = "/".join(filter(None, (os.path.dirname(tarinfo.name), tarinfo.linkname)))
limit = None
else:
# Search the archive before the link, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def link(repo):
cheat_dir = get_cheat_path()
sheet_dir = get_sheet_path(repo)
if not os.path.isdir(sheet_dir):
raise CheatExtException(
"%s hadn't been installed yet at %s" % (repo, sheet_dir))
sheets = get_available_sheets_at(sheet_dir)
state_sheets = get_sheets_with_state(cheat_dir, sheet_dir, sheets)
_check_sheets_availability(state_sheets)
for sheet, _ in filter_by_state(STATE_UNLINK, state_sheets):
os.symlink(
os.path.join(sheet_dir, sheet),
os.path.join(cheat_dir, sheet))
def gen_stream(self, i):
cwd = os.getcwd()+'/'
res, prob_id = self.gen_data(i)
if res == None or prob_id == None:
return
path = os.path.join('stream', prob_id)
path2 = os.path.join('json', 'todo')
path2 = os.path.join(d[prob_id], path2)
data = concat_data(res)
md5 = hashlib.md5(data).hexdigest()
outfname = os.path.join(path, self.basename + '_' + md5 + '.json')
if (md5 + '.json') in fset[prob_id]:
logging.info('Skip %s due to same' % md5)
return
logging.info('Save Packet Stream: %s' % outfname)
fset[prob_id].add(md5 + '.json')
f = file(outfname, 'w')
json.dump(res, f)
outfname2 = os.path.join(path2, self.basename + '_' + md5 + '.json')
outfname = cwd+outfname
outfname2 = cwd+outfname2
os.symlink(outfname,outfname2)
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def link_to_tutorials():
# Linking to the directory does not work well with
# nbsphinx. We link to the files themselves
from glob import glob
from plotnine_examples.tutorials import TUTPATH
dest_dir = os.path.join(CUR_PATH, 'tutorials')
# Unlink files from previous build
for old_file in glob(dest_dir + '/*.ipynb'):
os.unlink(old_file)
# Link files for this build
for file in glob(TUTPATH + '/*.ipynb'):
basename = os.path.basename(file)
dest = os.path.join(dest_dir, basename)
os.symlink(file, dest)
def config_mv(args):
"""[Not Supported Yet]
Relocate config DB from its current location
:return: None for success,string for error
"""
if not args.force:
return err_out(DB_REF + " move to {} ".format(args.to) +
"requires '--force' flag to execute the request.")
# TODO:
# this is pure convenience code,so it is very low priority; still,here are the steps:
# checks if target exists upfront,and fail if it does
# cp the DB instance 'to',and flip the symlink.
# refresh service (not really needed as next vmci_command handlers will pick it up)
# need --dryrun or --confirm
# issue: works really with discovery only,as others need to find it out
printMessage(args.output_format, "Sorry,configuration move ('config mv' command) is not supported yet")
return None
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def safe_make_symlink(input_file_path,output_file_path):
output_file_dir = os.path.dirname(output_file_path)
# Verify the input file is actually there
if not os.path.exists(input_file_path):
raise Exception("can't find file %s"%input_file_path)
safe_make_dirs(output_file_dir)
try:
os.symlink(input_file_path,output_file_path)
except OSError as err:
if err.errno == errno.EEXIST:
# link already exists,check that it is identical to the one we are trying to put down
old = os.path.realpath(input_file_path)
new = os.path.realpath(output_file_path)
if old != new:
raise Exception('Existing file is different than the new symlink')
else:
raise
def pipetteSynchronousRunner(args):
full_script_path = os.path.abspath(args[0])
scriptDir = os.path.dirname(full_script_path)
communicationDirBase = args[1]
fh_outdir = args[2]
pipelineCmdStr = ' '.join(args[3:])
if os.path.exists(communicationDirBase):
fns = os.listdir(communicationDirBase)
if len(fns)>0:
raise Exception('For running single pipelines,comm_dir must start empty')
if communicationDirBase not in pipelineCmdStr:
raise Exception('Pipeline script must accept the comm_dir as an argument')
os.symlink(fh_outdir,os.path.join(communicationDirBase,'firehose_outdir'))
# run the pipeline,write its description to communicationDirBase/launch
subprocess.check_call(pipelineCmdStr,shell=True)
runMode = 'runone'
retryMode = 'False'
main = pipetteServer.Main()
main.run_server(communicationDirBase, scriptDir, runMode, retryMode)
# if one or more pipelines fail,an exception will be thrown once all that can be run has been attempted.
def add_gstreamer_packages():
import os
import sys
from distutils.sysconfig import get_python_lib
dest_dir = get_python_lib()
packages = ['gobject', 'glib', 'pygst', 'pygst.pyc', 'pygst.pth',
'gst-0.10', 'pygtk.pth', 'pygtk.py', 'pygtk.pyc']
python_version = sys.version[:3]
global_path = os.path.join('/usr/lib', 'python' + python_version)
global_sitepackages = [os.path.join(global_path,
'dist-packages'), # for Debian-based
os.path.join(global_path,
'site-packages')] # for others
for package in packages:
for pack_dir in global_sitepackages:
src = os.path.join(pack_dir, package)
dest = os.path.join(dest_dir, package)
if not os.path.exists(dest) and os.path.exists(src):
os.symlink(src, dest)
def manually_disable_aa_profile(self):
"""
Manually disable an apparmor profile.
If aa-profile-mode is set to disabled (default) this is required as the
template has been written but apparmor is yet unaware of the profile
and aa-disable aa-profile fails. Without this the profile would kick
into enforce mode on the next service restart.
"""
profile_path = '/etc/apparmor.d'
disable_path = '/etc/apparmor.d/disable'
if not os.path.lexists(os.path.join(disable_path, self.aa_profile)):
os.symlink(os.path.join(profile_path, self.aa_profile),
os.path.join(disable_path, self.aa_profile))
def acquire(self, timeout=None):
# Hopefully unnecessary for symlink.
# try:
# open(self.unique_name,"wb").close()
# except IOError:
# raise LockFailed("failed to create %s" % self.unique_name)
timeout = timeout if timeout is not None else self.timeout
end_time = time.time()
if timeout is not None and timeout > 0:
end_time += timeout
while True:
# Try and create a symbolic link to it.
try:
os.symlink(self.unique_name, self.lock_file)
except OSError:
# Link creation failed. Maybe we've double-locked?
if self.i_am_locking():
# Linked to out unique name. Proceed.
return
else:
# Otherwise the lock creation failed.
if timeout is not None and time.time() > end_time:
if timeout > 0:
raise LockTimeout("Timeout waiting to acquire"
" lock for %s" %
self.path)
else:
raise AlreadyLocked("%s is already locked" %
self.path)
time.sleep(timeout / 10 if timeout is not None else 0.1)
else:
# Link creation succeeded. We're good to go.
return
def makelink(self,
targetpath)
except symlink_exception:
if tarinfo.issym():
linkpath = os.path.join(os.path.dirname(tarinfo.name),
tarinfo.linkname)
else:
linkpath = tarinfo.linkname
else:
try:
self._extract_member(self._find_link_target(tarinfo),
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def move(src, dst):
"""Recursively move a file or directory to another location. This is
similar to the Unix "mv" command.
If the destination is a directory or a symlink to a directory,the source
is moved inside the directory. The destination path must not already
exist.
If the destination already exists but is not a directory,it may be
overwritten depending on os.rename() semantics.
If the destination is on our current filesystem,then rename() is used.
Otherwise,src is copied to the destination and then removed.
A lot more could be done here... A look at a mv.c shows a lot of
the issues this implementation glosses over.
"""
real_dst = dst
if os.path.isdir(dst):
if _samefile(src, dst):
# We might be on a case insensitive filesystem,
# perform the rename anyway.
os.rename(src, dst)
return
real_dst = os.path.join(dst, _basename(src))
if os.path.exists(real_dst):
raise Error("Destination path '%s' already exists" % real_dst)
try:
os.rename(src, real_dst)
except OSError:
if os.path.isdir(src):
if _destinsrc(src, dst):
raise Error("Cannot move a directory '%s' into itself '%s'." % (src, dst))
copytree(src, real_dst, symlinks=True)
rmtree(src)
else:
copy2(src, real_dst)
os.unlink(src)
def copyfile(src, *, follow_symlinks=True):
"""Copy data from src to dst.
If follow_symlinks is not set and src is a symbolic link,a new
symlink will be created instead of copying the file it points to.
"""
if _samefile(src, dst):
raise SameFileError("{!r} and {!r} are the same file".format(src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets,devices...)
if stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn)
if not follow_symlinks and os.path.islink(src):
os.symlink(os.readlink(src), dst)
else:
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst)
return dst
def acquire(self, self.lock_file)
except OSError:
# Link creation failed. Maybe we've double-locked?
if self.i_am_locking():
# Linked to out unique name. Proceed.
return
else:
# Otherwise the lock creation failed.
if timeout is not None and time.time() > end_time:
if timeout > 0:
raise LockTimeout("Timeout waiting to acquire"
" lock for %s" %
self.path)
else:
raise AlreadyLocked("%s is already locked" %
self.path)
time.sleep(timeout / 10 if timeout is not None else 0.1)
else:
# Link creation succeeded. We're good to go.
return
def makelink(self,
targetpath)
except KeyError:
raise ExtractError("unable to resolve link inside archive")
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def test_ValgrindOption(self):
# Make sure that valgrind exists and is in the path
valgrind = scatest.which('valgrind')
if not valgrind:
raise RuntimeError('Valgrind is not installed')
# Let the device manager find valgrind on the path
self._test_Valgrind('')
# Set an explicit path to valgrind,using a symbolic link to a non-path
# location as an additional check
altpath = os.path.join(scatest.getSdrPath(), 'valgrind')
os.symlink(valgrind, altpath)
# patch for ubuntu valgrind script
ub_patch=False
try:
if 'UBUNTU' in platform.linux_distribution()[0].upper():
ub_patch=True
valgrind_bin = scatest.which('valgrind.bin')
os.symlink(valgrind_bin, altpath+'.bin')
except:
pass
try:
self._test_Valgrind(altpath)
finally:
os.unlink(altpath)
if ub_patch:
os.unlink(altpath+'.bin')
def updateLink(source, target):
if os.path.islink(target):
# Remove old,possibly stale link.
os.unlink(target)
if not os.path.exists(target):
# Do not replace existing files.
os.symlink(source, target)
def symlink(self, source: str, dest: str, **kwargs):
os.symlink(self.absolute(source), self.absolute(dest), **kwargs)
def manually_disable_aa_profile(self):
"""
Manually disable an apparmor profile.
If aa-profile-mode is set to disabled (default) this is required as the
template has been written but apparmor is yet unaware of the profile
and aa-disable aa-profile fails. Without this the profile would kick
into enforce mode on the next service restart.
"""
profile_path = '/etc/apparmor.d'
disable_path = '/etc/apparmor.d/disable'
if not os.path.lexists(os.path.join(disable_path, self.aa_profile))
def relative_symlink(src, force=False):
"""
Create a symbolic link in any directory.
force -- if True,then overwrite an existing file/symlink
"""
if force:
try:
os.remove(dst)
except FileNotFoundError:
pass
target = os.path.relpath(os.path.abspath(src), start=os.path.dirname(dst))
os.symlink(target, dst)
def _find_link_target(self, normalize=True)
if member is None:
raise KeyError("linkname %r not found" % linkname)
return member
def move(src, _basename(src))
if os.path.exists(real_dst):
raise Error, "Destination path '%s' already exists" % real_dst
try:
os.rename(src, dst):
raise Error, "Cannot move a directory '%s' into itself '%s'." % (src, dst)
copytree(src, real_dst)
os.unlink(src)