Python os 模块,chmod() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用os.chmod()。
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file,and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def test_zipfile_attributes():
# With the change from ZipFile.write() to .writestr(),we need to manually
# set member attributes.
with temporary_directory() as tempdir:
files = (('foo', 0o644), ('bar', 0o755))
for filename, mode in files:
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
os.chmod(path, mode)
zip_base_name = os.path.join(tempdir, 'dummy')
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for filename, mode in files:
info = zf.getinfo(os.path.join(tempdir, filename))
assert info.external_attr == (mode | 0o100000) << 16
assert info.compress_type == zipfile.ZIP_DEFLATED
def copymode(src, dst, *, follow_symlinks=True):
"""copy mode bits from src to dst.
If follow_symlinks is not set,symlinks aren't followed if and only
if both `src` and `dst` are symlinks. If `lchmod` isn't available
(e.g. Linux) this method does nothing.
"""
if not follow_symlinks and os.path.islink(src) and os.path.islink(dst):
if hasattr(os, 'lchmod'):
stat_func, chmod_func = os.lstat, os.lchmod
else:
return
elif hasattr(os, 'chmod'):
stat_func, chmod_func = os.stat, os.chmod
else:
return
st = stat_func(src)
chmod_func(dst, stat.S_IMODE(st.st_mode))
def _test_NoAccessDir(self, nodeName):
devBooter, devMgr = self.launchdeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
device = devMgr._get_registeredDevices()[0]
fileMgr = self._domMgr._get_fileMgr()
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0000)
else:
os.chmod(testdir, 0000)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)
finally:
os.rmdir(testdir)
def test_ExistsException(self):
self.assertNotEqual(self._domMgr, None)
fileMgr = self._domMgr._get_fileMgr()
# Makes sure that FileSystem::exists() throws correct exception and
# doesn't kill domain for files in directories it cannot access
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 0644)
else:
os.chmod(testdir, 0644)
try:
self.assertFalse(os.access(testdir, 'Current user can still access directory')
self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
finally:
os.rmdir(testdir)
def handle(self, *args, **options):
commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg')
hook_exists = os.path.exists(commit_msg_path)
if hook_exists:
with open(commit_msg_path, 'r') as fp:
hook_content = fp.read()
else:
hook_content = '#!/usr/bin/env bash\n\n'
if 'ZERodoWNTIME_COMMIT_MSG_HOOK' not in hook_content:
hook_content += COMMIT_MSG_HOOK
with open(commit_msg_path, 'w') as fp:
fp.write(hook_content)
st = os.stat(commit_msg_path)
os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
if binary_ok:
import stat
# save the permissions from the current binary
old_stat = os.stat(binary_path)
# rename the current binary in order to replace it with the latest
os.rename(binary_path, path + "/old")
os.rename(temp_path, binary_path)
# set the same permissions that had the prevIoUs binary
os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
# delete the old binary
os.remove(path + "/old")
print("mongoaudit updated,restarting...")
os.execl(binary_path, *sys.argv)
else:
os.remove(temp_path)
print("Couldn't download the latest binary")
def download_driver_file(whichbin, url, base_path):
if url.endswith('.tar.gz'):
ext = '.tar.gz'
else:
ext = '.zip'
print("Downloading from: {}".format(url))
download_file(url, '/tmp/pwr_temp{}'.format(ext))
if ext == '.tar.gz':
import tarfile
tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz")
tar.extractall('{}/'.format(base_path))
tar.close()
else:
import zipfile
with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z:
z.extractall('{}/'.format(base_path))
# if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url:
# os.rename('{}/geckodriver'.format(base_path),
# '{}/wires'.format(base_path))
# os.chmod('{}/wires'.format(base_path),0o775)
if whichbin == 'wires':
os.chmod('{}/geckodriver'.format(base_path), 0o775)
else:
os.chmod('{}/chromedriver'.format(base_path), 0o775)
def execute(self):
mode = self.rest(1)
if not mode:
mode = str(self.quantifier)
try:
mode = int(mode, 8)
if mode < 0 or mode > 0o777:
raise ValueError
except ValueError:
self.fm.notify("Need an octal number between 0 and 777!", bad=True)
return
for file in self.fm.thistab.get_selection():
try:
os.chmod(file.path, mode)
except Exception as ex:
self.fm.notify(ex)
try:
# reloading directory. maybe its better to reload the selected
# files only.
self.fm.thisdir.load_content()
except Exception:
pass
def postprocess(self,and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0x16D) & 0xFFF # 0555,07777
os.chmod(tempname, mode)
def rmtree_errorhandler(func, exc_info):
"""On Windows,the files in .svn are read-only,so when rmtree() tries to
remove them,an exception is thrown. We catch that here,remove the
read-only attribute,and hopefully continue without problems."""
exctype, value = exc_info[:2]
if not ((exctype is WindowsError and value.args[0] == 5) or #others
(exctype is OSError and value.args[0] == 13) or #python2.4
(exctype is PermissionError and value.args[3] == 5) #python3.3
):
raise
# file type should currently be read only
if ((os.stat(path).st_mode & stat.S_IREAD) != stat.S_IREAD):
raise
# convert to read/write
os.chmod(path, stat.S_IWRITE)
# use the original function to repeat the operation
func(path)
def decrypt_file(file, key):
"""
Decrypts the file ``file``.
The encrypted file is assumed to end with the ``.enc`` extension. The
decrypted file is saved to the same location without the ``.enc``
extension.
The permissions on the decrypted file are automatically set to 0o600.
See also :func:`doctr.local.encrypt_file`.
"""
if not file.endswith('.enc'):
raise ValueError("%s does not end with .enc" % file)
fer = Fernet(key)
with open(file, 'rb') as f:
decrypted_file = fer.decrypt(f.read())
with open(file[:-4], 'wb') as f:
f.write(decrypted_file)
os.chmod(file[:-4], 0o600)
def write_torque_script(command, outfile, walltime, queue, name, out, err, print_exec=True):
with open(outfile, 'w') as script:
script.write('#PBS -l walltime={}\n'.format(walltime))
script.write('#PBS -q regular\n')
script.write('#PBS -N {}\n'.format(name))
script.write('#PBS -o {}.o\n'.format(out))
script.write('#PBS -e {}.e\n'.format(err))
script.write('cd ${PBS_O_workdir}\n')
script.write(command)
os.chmod(outfile, 0o755)
if print_exec:
print('qsub {};'.format(outfile))
return outfile
def make_run_all_script(wd):
run_all = """#!/bin/bash
for i in *.pdb; do
echo "Running ${i}..."
seq=`echo ${i} | cut -d. -f1`
curr_dir=`pwd`
cp ${i} temp.pdb
tleap -f leaprc
rm temp.pdb
mv temp_mod.pdb xleap_modified/${seq}_xleap.pdb
mv temp_mod.prmtop Amber_minimized/${seq}.prmtop
mv temp_mod.inpcrd Amber_minimized/${seq}.inpcrd
done
"""
my_file = op.join(wd, 'run_all.sh')
with open(my_file, 'w') as f:
f.write(run_all)
os.chmod(my_file, 0o755)
return my_file
def get_stream(playlist, directory, own_name):
modellist.append(str(own_name))
start_time = "_" + str(datetime.datetime.Now().hour) + "-" + str(datetime.datetime.Now().minute)
ffs_script = directory + "ts_to_mp4.sh"
merged_file = encoded + own_name + start_time + ".mp4"
stream_file = directory + own_name + start_time + ".ts"
with open(ffs_script, "a+", encoding="utf8") as ffs:
ffs.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n")
ffs.write("chmod 666 " + merged_file + "\n")
os.chmod(ffs_script, 0o777)
with open(oneclick_file, encoding="utf8") as ocf:
ocf.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n")
ocf.write("chmod 666 " + merged_file + "\n")
os.chmod(oneclick_file, 0o777)
hlsvar = "hlsvariant://" + playlist
print("Retrieving the Streamfile for " + str(own_name))
baitlist_file = cwd + "baitlist.txt"
with open(baitlist_file, encoding="utf8") as bl:
bl.write(own_name + "\n")
if not os.path.isfile(stream_file):
subprocess.check_call(["livestreamer", "--hls-segment-threads", str(numcpucores), "--retry-streams", "5", "--retry-open", "--hls-segment-attempts", "--hls-segment-timeout", "20", "--http-header", "User-Agent=Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0", "-o", stream_file , hlsvar, "best"])
os.chmod(stream_file, 0o666)
modellist.remove(str(own_name))
return
def apply_copy(self):
Utils.def_attrs(self, fun=copy_func)
self.default_install_path = 0
lst = self.to_list(self.source)
self.meths.remove('process_source')
for filename in lst:
node = self.path.find_resource(filename)
if not node: raise Errors.WafError('cannot find input file %s for processing' % filename)
target = self.target
if not target or len(lst)>1: target = node.name
# Todo the file path may be incorrect
newnode = self.path.find_or_declare(target)
tsk = self.create_task('copy', node, newnode)
tsk.fun = self.fun
tsk.chmod = getattr(self, 'chmod', Utils.O644)
if not tsk.env:
tsk.debug()
raise Errors.WafError('task without an environment')
def apply_copy(self):
Utils.def_attrs(self, Utils.O644)
if not tsk.env:
tsk.debug()
raise Errors.WafError('task without an environment')
def test_binary(self, enable_randomness=True, times=1, timeout=15):
"""
Test the binary generated
"""
# dump the binary code
pov_binary_filename = tempfile.mktemp(dir='/tmp', prefix='rex-pov-')
self.dump_binary(filename=pov_binary_filename)
os.chmod(pov_binary_filename, 0755)
pov_tester = CGCPovSimulator()
result = pov_tester.test_binary_pov(
pov_binary_filename,
self.crash.binary,
enable_randomness=enable_randomness,
timeout=timeout,
times=times)
# remove the generated pov
os.remove(pov_binary_filename)
return result
def upload_file(self, filename, content, mode=None):
ftp = self.client.open_sftp()
file = ftp.file(filename, -1)
file.write(content)
file.flush()
if mode:
ftp.chmod(self.script_filename, 0o744)
ftp.close()
def run_script(self, script):
fd, self.tmp_script_filename = tempfile.mkstemp()
os.close(fd)
f = open(self.tmp_script_filename, 'w')
f.write(script)
f.close()
os.chmod(self.tmp_script_filename, 0o744)
p = subprocess.Popen(
self.tmp_script_filename, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
self.process = p
self.stream = p.stdout
def extractall(self, path=".", members=None):
"""Extract all members from the archive to the current working
directory and set owner,modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers().
"""
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o700
# Do not set_attrs directories,as we will do that further down
self.extract(tarinfo, set_attrs=not tarinfo.isdir())
# Reverse sort directories.
directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner,mtime and filemode on directories.
for tarinfo in directories:
dirpath = os.path.join(path, tarinfo.name)
try:
self.chown(tarinfo, dirpath)
self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath)
except ExtractError as e:
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def chmod(self, tarinfo, targetpath):
"""Set file permissions of targetpath according to tarinfo.
"""
if hasattr(os, 'chmod'):
try:
os.chmod(targetpath, tarinfo.mode)
except EnvironmentError as e:
raise ExtractError("Could not change mode")
def copystat(src, dst):
"""copy all stat info (mode bits,atime,mtime,flags) from src to dst"""
st = os.stat(src)
mode = stat.S_IMODE(st.st_mode)
if hasattr(os, 'utime'):
os.utime(dst, (st.st_atime, st.st_mtime))
if hasattr(os, 'chmod'):
os.chmod(dst, mode)
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
try:
os.chflags(dst, st.st_flags)
except OSError as why:
if (not hasattr(errno, 'EOPNOTSUPP') or
why.errno != errno.EOPNOTSUPP):
raise
def rmtree_errorhandler(func,and hopefully continue without problems."""
# if file type currently read only
if os.stat(path).st_mode & stat.S_IREAD:
# convert to read/write
os.chmod(path, stat.S_IWRITE)
# use the original function to repeat the operation
func(path)
return
else:
raise
def unzip_file(filename, location, flatten=True):
"""
Unzip the file (with path `filename`) to the destination `location`. All
files are written based on system defaults and umask (i.e. permissions are
not preserved),except that regular file members with any execute
permissions (user,group,or world) have "chmod +x" applied after being
written. Note that for windows,any execute changes using os.chmod are
no-ops per the python docs.
"""
ensure_dir(location)
zipfp = open(filename, 'rb')
try:
zip = zipfile.ZipFile(zipfp, allowZip64=True)
leading = has_leading_dir(zip.namelist()) and flatten
for info in zip.infolist():
name = info.filename
data = zip.read(name)
fn = name
if leading:
fn = split_leading_dir(name)[1]
fn = os.path.join(location, fn)
dir = os.path.dirname(fn)
if fn.endswith('/') or fn.endswith('\\'):
# A directory
ensure_dir(fn)
else:
ensure_dir(dir)
fp = open(fn, 'wb')
try:
fp.write(data)
finally:
fp.close()
mode = info.external_attr >> 16
# if mode and regular file and any execute permissions for
# user/group/world?
if mode and stat.S_ISREG(mode) and mode & 0o111:
# make dest file have execute for user/group/world
# (chmod +x) no-op on windows per python docs
os.chmod(fn, (0o777 - current_umask() | 0o111))
finally:
zipfp.close()
def unpack_zipfile(filename, extract_dir, progress_filter=default_filter):
"""Unpack zip `filename` to `extract_dir`
Raises ``UnrecognizedFormat`` if `filename` is not a zipfile (as determined
by ``zipfile.is_zipfile()``). See ``unpack_archive()`` for an explanation
of the `progress_filter` argument.
"""
if not zipfile.is_zipfile(filename):
raise UnrecognizedFormat("%s is not a zip file" % (filename,))
with ContextualZipFile(filename) as z:
for info in z.infolist():
name = info.filename
# don't extract absolute paths or ones with .. in them
if name.startswith('/') or '..' in name.split('/'):
continue
target = os.path.join(extract_dir, *name.split('/'))
target = progress_filter(name, target)
if not target:
continue
if name.endswith('/'):
# directory
ensure_directory(target)
else:
# file
ensure_directory(target)
data = z.read(info.filename)
with open(target, 'wb') as f:
f.write(data)
unix_attributes = info.external_attr >> 16
if unix_attributes:
os.chmod(target, unix_attributes)
def extractall(self, members=None, numeric_owner=False):
"""Extract all members from the archive to the current working
directory and set owner,modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers(). If `numeric_owner` is True,only
the numbers for user/group names are used and not the names.
"""
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 0o700
# Do not set_attrs directories, set_attrs=not tarinfo.isdir(),
numeric_owner=numeric_owner)
# Reverse sort directories.
directories.sort(key=lambda a: a.name)
directories.reverse()
# Set correct owner, dirpath, numeric_owner=numeric_owner)
self.utime(tarinfo, "tarfile: %s" % e)
def _setupSock(self):
if self.sockStr.lower().startswith('inet:'):
junk , ip , port = self.sockStr.split(':')
self.sock = socket.socket(socket.AF_INET , socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((ip , int(port)))
else:
if os.path.exists(self.sockStr):
os.unlink(self.sockStr)
self.sock = socket.socket(socket.AF_UNIX , socket.SOCK_STREAM)
self.sock.bind(self.sockStr)
os.chmod(self.sockStr , self.sockChmod)
self.sock.settimeout(3)
self.sock.listen(self.listenq)
def _setupSock(self):
if self.sockStr.lower().startswith('inet:'):
junk , self.sockChmod)
self.sock.settimeout(3)
self.sock.listen(self.listenq)
def _create_key_file(self, repo_name, data):
key_path = self._get_key_path(repo_name)
with open(key_path, 'w') as key_file:
key_file.write(data)
os.chmod(key_path, 0o600)
def chmod(self, tarinfo.mode)
except EnvironmentError as e:
raise ExtractError("Could not change mode")