Python os 模块,W_OK 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.W_OK。
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process,then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def get_config_file_path(cls):
if not cls.IS_GLOBAL:
# local to this directory
base_path = os.path.join('.')
else:
base_path = os.path.expanduser('~')
if not os.access(base_path, os.W_OK):
base_path = '/tmp'
base_path = os.path.join(base_path, '.polyaxon')
if not os.path.exists(base_path):
try:
os.makedirs(base_path)
except OSError:
# Except permission denied and potential race conditions
# in multi-threaded environments.
logger.error('Could not create config directory `{}`'.format(base_path))
return os.path.join(base_path, cls.CONFIG_FILE_NAME)
def safeInstall():
FACTORIOPATH = getFactorioPath()
try:
if not os.path.isdir("%s" % (FACTORIOPATH) ):
if os.access("%s/.." % (FACTORIOPATH), os.W_OK):
os.mkdir(FACTORIOPATH, 0o777)
else:
subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH])
subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH])
os.mkdir(os.path.join(FACTORIOPATH, "saves"))
os.mkdir(os.path.join(FACTORIOPATH, "config"))
with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc:
lines = bashrc.read()
if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1:
bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n")
print("You'll want to restart your shell for command autocompletion. Tab is your friend.")
updateFactorio()
except IOError as e:
print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e))
sys.exit(1)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def _test_resource_paths(my):
path = my.server.get_resource_path('admin')
# not a very accurate test
my.assertEquals(True, 'etc/admin.tacticrc' in path)
paths = my.server.create_resource_paths()
sys_login = getpass.getuser()
dir = my.server.get_home_dir()
is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir)
if dir and is_dir_writeable:
dir = "%s/.tactic/etc" % dir
else:
if os.name == 'nt':
dir = 'C:/sthpw/etc'
else:
dir = '/tmp/sthpw/etc'
compared = '%s/%s.tacticrc' %(dir, sys_login) in paths
my.assertEquals(True, compared)
# since we use admin to get resource path,my.login should also be admin
my.assertEquals('admin', my.server.get_login())
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_access(filename, write_required=True):
"""
Checks if user has read and optionaly write access to specified file.
Uses acl first and possix file permisions if acl cannot be used.
Returns true only if user has both required access rights.
"""
if HAVE_POSIX1E:
for pset in posix1e.ACL(file=filename):
if pset.tag_type == posix1e.ACL_USER and pset.qualifier == os.geteuid():
if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
return True
if pset.tag_type == posix1e.ACL_GROUP and pset.qualifier in os.getgroups():
if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
return True
if write_required:
return os.access(filename, os.R_OK | os.W_OK)
return os.access(filename, os.R_OK)
def verify_path(self, val):
try:
n_path = os.path.expanduser(val)
logger.debug("Expanded user path.")
except:
n_path = val
if not n_path:
logger.warning("Please specify a path.")
return False
elif not os.path.isdir(n_path):
logger.warning("This is not a valid path.")
return False
# elif not os.access(n_path,os.W_OK):
elif not writable_dir(n_path):
logger.warning("Do not have write access to '{}'.".format(n_path))
return False
else:
return n_path
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def save_to_disk(self):
"""
Commit the data from memory to disk.
Returns True or False depending on success/failure.
"""
# Create file if it doesn't exist.
if not os.path.exists(self.file_path):
open(self.file_path, "w").close()
# Write new data to specified file.
if os.access(self.file_path, os.W_OK):
f = open(self.file_path, "w+")
f.write(json.dumps(self.data, sort_keys=True, indent=4))
f.close()
return True
else:
return False
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def test_dir_isWritable(self, path_mocker_stopall):
# mock
mock_os_access = path_mocker_stopall.MagicMock(name="mock_os_access")
mock_os_path_isdir = path_mocker_stopall.MagicMock(name="mock_os_path_isdir")
# patch
path_mocker_stopall.patch.object(scarlett_os.internal.path.os, 'access', mock_os_access)
path_mocker_stopall.patch.object(scarlett_os.internal.path.os.path, 'isdir', mock_os_path_isdir)
path = 'file:///tmp'
# patch return values
mock_os_path_isdir.return_value = True
mock_os_access.return_value = True
# run test
result = s_path.isWritable(path)
# tests
mock_os_path_isdir.assert_called_once_with('file:///tmp')
mock_os_access.assert_called_once_with('file:///tmp', os.W_OK)
assert result == True
def isWritable(path):
"""Returns whether the file/path is writable."""
try:
if os.path.isdir(path):
# The given path is an existing directory.
# To properly check if it is writable,you need to use os.access.
return os.access(path, os.W_OK)
else:
# The given path is supposed to be a file.
# Avoid using open(path,"w"),as it might corrupt existing files.
# And yet,even if the parent directory is actually writable,
# open(path,"rw") will IOError if the file doesn't already exist.
# Therefore,simply check the directory permissions instead:
# path = 'file:///etc/fstab'
# In [22]: os.path.dirname(path)
# Out[22]: 'file:///etc'
return os.access(os.path.dirname(path), os.W_OK)
# In [23]: os.access(os.path.dirname(path),os.W_OK)
# Out[23]: False
except UnicodeDecodeError:
unicode_error_dialog()
def setup(self):
"""
Defers loading until needed.
"""
from haystack import connections
new_index = False
# Make sure the index is there.
if self.use_file_storage and not os.path.exists(self.path):
os.makedirs(self.path)
new_index = True
if self.use_file_storage and not os.access(self.path, os.W_OK):
raise IOError("The path to your Whoosh index '%s' is not writable for the current user/group." % self.path)
if self.use_file_storage:
self.storage = FileStorage(self.path)
else:
global LOCALS
if getattr(LOCALS, 'RAM_STORE', None) is None:
LOCALS.RAM_STORE = RamStorage()
self.storage = LOCALS.RAM_STORE
self.content_field_name, self.schema = self.build_schema(connections[self.connection_alias].get_unified_index().all_searchfields())
self.parser = QueryParser(self.content_field_name, schema=self.schema)
if new_index is True:
self.index = self.storage.create_index(self.schema)
else:
try:
self.index = self.storage.open_index(schema=self.schema)
except index.EmptyIndexError:
self.index = self.storage.create_index(self.schema)
self.setup_complete = True
def ExecRecursiveMirror(self, source, dest):
"""Emulation of rm -rf out && cp -af in out."""
if os.path.exists(dest):
if os.path.isdir(dest):
def _on_error(fn, path, dummy_excinfo):
# The operation failed,possibly because the file is set to
# read-only. If that's why,make it writable and try the op again.
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWRITE)
fn(path)
shutil.rmtree(dest, onerror=_on_error)
else:
if not os.access(dest, os.W_OK):
# Attempt to make the file writable before deleting it.
os.chmod(dest, stat.S_IWRITE)
os.unlink(dest)
if os.path.isdir(source):
shutil.copytree(source, dest)
else:
shutil.copy2(source, dest)
# Try to diagnose crbug.com/741603
if not os.path.exists(dest):
raise Exception("Copying of %s to %s failed" % (source, dest))
def run():
if Data.image_path == "":
output_label["text"] = "Choose the image file first."
return
if os.access("{}/AppIcon.appiconset".format(Data.save_path), os.W_OK) == False:
os.mkdir("{}/AppIcon.appiconset".format(Data.save_path))
file = open("{}/AppIcon.appiconset/Contents.json".format(Data.save_path), mode="w")
json.dump(json_data, file, indent=4, separators=(',', ':'))
file.close()
with open("{}/AppIcon.appiconset/Contents.json".format(Data.save_path), mode="r") as data_file:
data = json.load(data_file)
images = data["images"]
try:
im = Image.open(Data.image_path)
output_label["text"] = "Image specs: {} {} {}\n{}\n".format(im.format, im.size, im.mode, "-"*35)
for image in images:
size = get_size(image)
out = im.resize(size, Image.ANTIALIAS)
out.save("{}/AppIcon.appiconset/{}".format(Data.save_path, image["filename"]), format="PNG")
output_label["text"] += "Image generated: {}\n".format(size)
open_saved_button.grid(row=1, column=0, columnspan=2, sticky=tk.W+tk.E)
except IOError:
output_label["text"] = "Please select a supported image file."
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def apply(self, software, theme):
new_files = theme.files[software]
files = self.get_supported_software()[software]
for name, theme_path in new_files.items():
if name in files:
for orig_path in files[name]:
filename = os.path.expanduser(orig_path)
if os.access(filename, os.W_OK):
theme.themes.backup_if_necessary(software, name, filename)
# reload orig files because the original theme might have
# changed (in case a backup was necessary)
orig_files = theme.themes.original.files[software]
with open(filename, "w") as f:
logging.debug("Merging %s with %s" %
(orig_files[name], theme_path))
f.writelines(self.merge(software,
orig_files[name], theme_path))
break
def try_init_cgroup():
euid = geteuid()
cgroups_to_init = list()
if not (path.isdir(CPUACCT_CGROUP_ROOT) and access(CPUACCT_CGROUP_ROOT, W_OK)):
cgroups_to_init.append(CPUACCT_CGROUP_ROOT)
if not (path.isdir(MEMORY_CGROUP_ROOT) and access(MEMORY_CGROUP_ROOT, W_OK)):
cgroups_to_init.append(MEMORY_CGROUP_ROOT)
if not (path.isdir(PIDS_CGROUP_ROOT) and access(PIDS_CGROUP_ROOT, W_OK)):
cgroups_to_init.append(PIDS_CGROUP_ROOT)
if cgroups_to_init:
if euid == 0:
logger.info('Initializing cgroup: %s', ','.join(cgroups_to_init))
for cgroup_to_init in cgroups_to_init:
makedirs(cgroup_to_init, exist_ok=True)
elif __stdin__.isatty():
logger.info('Initializing cgroup: %s','.join(cgroups_to_init))
call(['sudo', 'sh', '-c', 'mkdir -p "{1}" && chown -R "{0}" "{1}"'.format(
euid, '" "'.join(cgroups_to_init))])
else:
logger.error('Cgroup not initialized')
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def is_admin(user=lib.global_parameters['USER'], activeroot='/'):
""" Check if the specified user has administrative privileges on given system"""
platform = get_system_type_from_active_root(activeroot)
ip = get_address_from_active_root(activeroot)
if activeroot.startswith('/'):
if platform.startswith('linux'):
# on linux check if euid is 0
from pwd import getpwnam
if getpwnam(user).pw_uid == 0:
return True
return False
elif platform.startswith('win'):
from win32net import NetUserGetLocalGroups
return 'Administrators' in NetUserGetLocalGroups(ip, user) # should work on remote systems too
# on Windows only admins can write to C:\Windows\temp
#if os.access(os.path.join(os.environ.get('SystemRoot','C:\\Windows'),'temp'),os.W_OK):
# return True
#return False
else:
log.warn('Cannot check root privileges,platform is not fully supported (%s).' % platform)
return False
else:
log.warn('Cannot check root privileges,remote system analysis (%s) not supported yet.' % activeroot)
return False
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def CheckOutputFile(filename):
"""Check that the given file does not exist and can be opened for writing.
Args:
filename: The name of the file.
Raises:
FileExistsError: if the given filename is not found
FileNotWritableError: if the given filename is not readable.
"""
full_path = os.path.abspath(filename)
if os.path.exists(full_path):
raise FileExistsError('%s: output file exists' % filename)
elif not os.access(os.path.dirname(full_path), os.W_OK):
raise FileNotWritableError(
'%s: not writable' % os.path.dirname(full_path))
def CheckOutputFile(filename):
"""Check that the given file does not exist and can be opened for writing.
Args:
filename: The name of the file.
Raises:
FileExistsError: if the given filename is not found
FileNotWritableError: if the given filename is not readable.
"""
full_path = os.path.abspath(filename)
if os.path.exists(full_path):
raise FileExistsError('%s: output file exists' % filename)
elif not os.access(os.path.dirname(full_path), os.W_OK):
raise FileNotWritableError(
'%s: not writable' % os.path.dirname(full_path))
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, os.path.dirname(path)
def export(self):
''' Export visible layers and layer groups to CoaSprite '''
if os.path.isfile(os.path.join(self.path, self.name)):
show_error_msg('ABORTING!\nDestination is not a folder.\n {path}/{name}'.format(path=self.path, name=self.name))
return
if not os.access(self.path, os.W_OK):
show_error_msg('ABORTING!\nDestination is not a writable.\n {path}'.format(path=self.path))
return
if os.path.isdir(os.path.join(self.path, self.name)):
show_error_msg('Destination exists,I may have overwritten something in {path}/{name}'.format(path=self.path, name=self.name))
self.mkdir()
# Loop through visible layers
self.img = self.original_img.duplicate()
self.img.undo_group_start()
for layer in self.img.layers:
if layer.visible:
name = '{name}.png'.format(name=layer.name)
pdb.gimp_image_set_active_layer(self.img, layer)
# Crop and the layer position
pdb.plug_in_autocrop_layer(self.img, layer)
z = 0 - pdb.gimp_image_get_item_position(self.img, layer)
if isinstance(layer, gimp.GroupLayer):
if len(layer.children) > 0:
self.sprites.append(self.export_sprite_sheet(layer, layer.offsets, z))
else:
self.sprites.append(self.export_sprite(layer, z))
self.write_json()
self.img.undo_group_end()
pdb.gimp_image_delete(self.img)
def is_writable(self, path):
result = False
while not result:
if os.path.exists(path):
result = os.access(path, os.W_OK)
break
parent = os.path.dirname(path)
if parent == path:
break
path = parent
return result
def get_cache_base(suffix=None):
"""
Return the default base location for distlib caches. If the directory does
not exist,it is created. Use the suffix provided for the base directory,
and default to '.distlib' if it isn't provided.
On Windows,if LOCALAPPDATA is defined in the environment,then it is
assumed to be a directory,and will be the parent directory of the result.
On POSIX,and on Windows if LOCALAPPDATA is not defined,the user's home
directory - using os.expanduser('~') - will be the parent directory of
the result.
The result is just the directory '.distlib' in the parent directory as
determined above,or with the name specified with ``suffix``.
"""
if suffix is None:
suffix = '.distlib'
if os.name == 'nt' and 'LOCALAPPDATA' in os.environ:
result = os.path.expandvars('$localappdata')
else:
# Assume posix,or old Windows
result = os.path.expanduser('~')
# we use 'isdir' instead of 'exists',because we want to
# fail if there's a file with that name
if os.path.isdir(result):
usable = os.access(result, os.W_OK)
if not usable:
logger.warning('Directory exists but is not writable: %s', result)
else:
try:
os.makedirs(result)
usable = True
except OSError:
logger.warning('Unable to create %s', result, exc_info=True)
usable = False
if not usable:
result = tempfile.mkdtemp()
logger.warning('Default location unusable,using %s', result)
return os.path.join(result, suffix)
def _mkstemp_inner(dir, pre, suf, flags, output_type):
"""Code common to mkstemp,TemporaryFile,and NamedTemporaryFile."""
names = _get_candidate_names()
if output_type is bytes:
names = map(_os.fsencode, names)
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, 0o600)
except FileExistsError:
continue # try again
except PermissionError:
# This exception is thrown when a directory with the chosen name
# already exists on windows.
if (_os.name == 'nt' and _os.path.isdir(dir) and
_os.access(dir, _os.W_OK)):
continue
else:
raise
return (fd, _os.path.abspath(file))
raise FileExistsError(_errno.EEXIST,
"No usable temporary file name found")
# User visible interfaces.
def test_get_default_keystore_path(self):
"""
Checks we the default keystore directory exists or create it.
Verify the path is correct and that we have read/write access to it.
"""
keystore_dir = PyWalib.get_default_keystore_path()
if not os.path.exists(keystore_dir):
os.makedirs(keystore_dir)
# checks path correctness
self.assertTrue(keystore_dir.endswith(".config/pyethapp/keystore/"))
# checks read/write access
self.assertEqual(os.access(keystore_dir, os.R_OK), True)
self.assertEqual(os.access(keystore_dir, os.W_OK), True)
def is_writable(self, os.W_OK)
break
parent = os.path.dirname(path)
if parent == path:
break
path = parent
return result
def get_cache_base(suffix=None):
"""
Return the default base location for distlib caches. If the directory does
not exist, suffix)
def check_folder_writable(checkfolder):
if not os.access(checkfolder, os.W_OK):
logging.error("You don't have access to %s" % checkfolder)
sys.exit(1)
# Function that gets protected packages or returns an empty dictionary
def make_writeable(self, filename):
"""
Make sure that the file is writeable.
Useful if our source is read-only.
"""
if sys.platform.startswith('java'):
# On Jython there is no os.access()
return
if not os.access(filename, os.W_OK):
st = os.stat(filename)
new_permissions = stat.S_IMODE(st.st_mode) | stat.S_IWUSR
os.chmod(filename, new_permissions)
def main(args, outs):
hostname = socket.gethostname()
print "Checking run folder..."
tk_preflight.check_rta_complete(args.run_path)
print "Checking RunInfo.xml..."
tk_preflight.check_runinfo_xml(args.run_path)
print "Checking system environment..."
ok, msg = tk_preflight.check_ld_library_path()
if not ok:
martian.exit(msg)
print "Checking barcode whitelist..."
tk_preflight.check_barcode_whitelist(args.barcode_whitelist)
if args.check_executables:
print "Checking bcl2fastq..."
(rta_version, rc_i2_read, bcl_params) = tk_bcl.get_rta_version(args.run_path)
martian.log_info("RTA Version: %s" % rta_version)
martian.log_info("BCL Params: %s" % str(bcl_params))
(major_ver, full_ver) = tk_bcl.check_bcl2fastq(hostname, rta_version)
martian.log_info("Running bcl2fastq mode: %s. Version: %s" % (major_ver, full_ver))
ok, msg = tk_preflight.check_open_fh()
if not ok:
martian.exit(msg)
if args.output_path is not None:
tk_preflight.check_folder_or_create("--output-dir", args.output_path, hostname, permission=os.W_OK|os.X_OK)
if args.interop_output_path is not None:
tk_preflight.check_folder_or_create("--interop-dir", args.interop_output_path, permission=os.W_OK|os.X_OK)
if args.max_bcl2fastq_threads < 1:
msg = "Cannot run bcl2fastq with zero threads."
martian.exit(msg)