Python os 模块,open() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.open()。
def _validate_lockfile(self):
"""Check existence and validity of lockfile.
If the lockfile exists,but contains an invalid PID
or the PID of a non-existant process,it is removed.
"""
try:
with open(self.lockfile) as fp:
s = fp.read()
except Exception:
return
try:
pid = int(s)
except ValueError:
return self.release()
from background import _process_exists
if not _process_exists(pid):
self.release()
def atomic_writer(file_path, mode):
"""Atomic file writer.
:param file_path: path of file to write to.
:type file_path: ``unicode``
:param mode: sames as for `func:open`
:type mode: string
.. versionadded:: 1.12
Context manager that ensures the file is only written if the write
succeeds. The data is first written to a temporary file.
"""
temp_suffix = '.aw.temp'
temp_file_path = file_path + temp_suffix
with open(temp_file_path, mode) as file_obj:
try:
yield file_obj
os.rename(temp_file_path, file_path)
finally:
try:
os.remove(temp_file_path)
except (OSError, IOError):
pass
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path, open_flags, open_mode)
pidfile = os.fdopen(pidfile_fd, 'w')
@H_797_404@# According to the FHS 2.3 section on PID files in /var/run:
@H_797_404@#
@H_797_404@# The file must consist of the process identifier in
@H_797_404@# ASCII-encoded decimal,followed by a newline character. For
@H_797_404@# example,if crond was process number 25,/var/run/crond.pid
@H_797_404@# would contain three characters: two,five,and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def read_text_file(filename):
"""Return the contents of *filename*.
Try to decode the file contents with utf-8,the preferred system encoding
(e.g.,cp1252 on some Windows machines),and latin1,in that order.
Decoding a byte string with latin1 will never raise an error. In the worst
case,the returned string will contain some garbage characters.
"""
with open(filename, 'rb') as fp:
data = fp.read()
encodings = ['utf-8', locale.getpreferredencoding(False), 'latin1']
for enc in encodings:
try:
data = data.decode(enc)
except UnicodeDecodeError:
continue
break
assert type(data) != bytes @H_797_404@# latin1 should have worked.
return data
def run_script(self, script_name, namespace):
script = 'scripts/' + script_name
if not self.has_Metadata(script):
raise ResolutionError("No script named %r" % script_name)
script_text = self.get_Metadata(script).replace('\r\n', '\n')
script_text = script_text.replace('\r', '\n')
script_filename = self._fn(self.egg_info, script)
namespace['__file__'] = script_filename
if os.path.exists(script_filename):
source = open(script_filename).read()
code = compile(source, script_filename, 'exec')
exec(code, namespace, namespace)
else:
from linecache import cache
cache[script_filename] = (
len(script_text), 0, script_text.split('\n'), script_filename
)
script_code = compile(script_text, 'exec')
exec(script_code, namespace)
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path,and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def bz2open(cls, name, mode="r", fileobj=None, compresslevel=9, **kwargs):
"""Open bzip2 compressed tar archive name for reading or writing.
Appending is not allowed.
"""
if len(mode) > 1 or mode not in "rw":
raise ValueError("mode must be 'r' or 'w'.")
try:
import bz2
except ImportError:
raise CompressionError("bz2 module is not available")
if fileobj is not None:
fileobj = _BZ2Proxy(fileobj, mode)
else:
fileobj = bz2.BZ2File(name, mode, compresslevel=compresslevel)
try:
t = cls.Taropen(name, fileobj, **kwargs)
except (IOError, EOFError):
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
@H_797_404@# All *open() methods are registered here.
def _mkstemp_inner(dir, pre, suf, flags):
"""Code common to mkstemp,TemporaryFile,and NamedTemporaryFile."""
names = _get_candidate_names()
for seq in xrange(TMP_MAX):
name = names.next()
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, flags, 0600)
_set_cloexec(fd)
return (fd, _os.path.abspath(file))
except OSError, e:
if e.errno == _errno.EEXIST:
continue @H_797_404@# try again
raise
raise IOError, (_errno.EEXIST, "No usable temporary file name found")
@H_797_404@# User visible interfaces.
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
subpath = self._lookup(key)
f = open(os.path.join(self._path, subpath), 'r')
try:
if self._factory:
msg = self._factory(f)
else:
msg = MaildirMessage(f)
finally:
f.close()
subdir, name = os.path.split(subpath)
msg.set_subdir(subdir)
if self.colon in name:
msg.set_info(name.split(self.colon)[-1])
msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
return msg
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
path = os.path.join(self._path, str(key))
try:
f = open(path, 'rb+')
except IOError, e:
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
else:
raise
try:
if self._locked:
_lock_file(f)
try:
os.close(os.open(path, os.O_WRONLY | os.O_Trunc))
self._dump_message(message, f)
if isinstance(message, MHMessage):
self._dump_sequences(message, key)
finally:
if self._locked:
_unlock_file(f)
finally:
_sync_close(f)
def get_terminal_size():
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIocgWINSZ,
'1234'))
except:
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.env['LInes'], os.env['COLUMNS'])
except:
cr = (25, 80)
return int(cr[1]), int(cr[0])
def __save(self):
if self.__asynchronous == 0:
state = {
"version" : _BobState.CUR_VERSION,
"byNameDirs" : self.__byNameDirs,
"results" : self.__results,
"inputs" : self.__inputs,
"jenkins" : self.__jenkins,
"dirstates" : self.__dirstates,
"buildState" : self.__buildState,
}
tmpFile = self.__path+".new"
try:
with open(tmpFile, "wb") as f:
pickle.dump(state, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmpFile, self.__path)
except OSError as e:
raise ParseError("Error saving workspace state: " + str(e))
self.__dirty = False
else:
self.__dirty = True
def dump(cls, obj, file_obj):
"""Serialize object ``obj`` to open pickle file.
.. versionadded:: 1.8
:param obj: Python object to serialize
:type obj: Python object
:param file_obj: file handle
:type file_obj: ``file`` object
"""
return pickle.dump(obj, file_obj, protocol=-1)
@H_797_404@# Set up default manager and register built-in serializers
def get_path_uid(path):
"""
Return path's uid.
Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in backwardcompat due to differences on AIX and Jython,
that should eventually go away.
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, 'O_nofollow'):
fd = os.open(path, os.O_RDONLY | os.O_nofollow)
file_uid = os.fstat(fd).st_uid
os.close(fd)
else: @H_797_404@# AIX and Jython
@H_797_404@# WARNING: time of check vulnerabity,but best we can do w/o nofollow
if not os.path.islink(path):
@H_797_404@# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
@H_797_404@# raise OSError for parity with os.O_nofollow above
raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
return file_uid
def writeKeyToFile(key, filename):
"""Write **key** to **filename**,with ``0400`` permissions.
If **filename** doesn't exist,it will be created. If it does exist
already,and is writable by the owner of the current process,then it will
be truncated to zero-length and overwritten.
:param bytes key: A key (or some other private data) to write to
**filename**.
:param str filename: The path of the file to write to.
:raises: Any exceptions which may occur.
"""
logging.info("Writing key to file: %r", filename)
flags = os.O_WRONLY | os.O_Trunc | os.O_CREAT | getattr(os, "O_BIN", 0)
fd = os.open(filename, 0400)
os.write(fd, key)
os.fsync(fd)
os.close(fd)
def write_pid_to_pidfile(pidfile_path):
""" Write the PID in the named PID file.
Get the numeric process ID (“PID”) of the current process
and write it to the named file as a line of text.
"""
open_flags = (os.O_CREAT | os.O_EXCL | os.O_WRONLY)
open_mode = 0o644
pidfile_fd = os.open(pidfile_path,and newline.
pid = os.getpid()
pidfile.write("%s\n" % pid)
pidfile.close()
def do_magic(self):
if OS_WIN:
try:
if os.path.exists(LOCK_PATH):
os.unlink(LOCK_PATH)
self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except EnvironmentError as err:
if err.errno == 13:
self.is_running = True
else:
raise
else:
try:
self.fh = open(LOCK_PATH, 'w')
fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except EnvironmentError as err:
if self.fh is not None:
self.is_running = True
else:
raise
def wipe(self):
filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT)
os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'])
filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
for i in range(self.config.config_values['BITMAP_SHM_SIZE']):
filter_bitmap[i] = '\x00'
filter_bitmap.close()
os.close(filter_bitmap_fd)
filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", 0x1000000)
filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.PROT_WRITE | mmap.PROT_READ)
for i in range(0x1000000):
filter_bitmap[i] = '\x00'
filter_bitmap.close()
os.close(filter_bitmap_fd)
def load(cls, file_obj):
"""Load serialized object from open JSON file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from JSON file
:rtype: object
"""
return json.load(file_obj)
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return cPickle.load(file_obj)
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return pickle.load(file_obj)
def acquire(self, blocking=True):
"""Acquire the lock if possible.
If the lock is in use and ``blocking`` is ``False``,return
``False``.
Otherwise,check every `self.delay` seconds until it acquires
lock or exceeds `self.timeout` and raises an `~AcquisitionError`.
"""
start = time.time()
while True:
self._validate_lockfile()
try:
fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
with os.fdopen(fd, 'w') as fd:
fd.write('{0}'.format(os.getpid()))
break
except OSError as err:
if err.errno != errno.EEXIST: @H_797_404@# pragma: no cover
raise
if self.timeout and (time.time() - start) >= self.timeout:
raise AcquisitionError('Lock acquisition timed out.')
if not blocking:
return False
time.sleep(self.delay)
self._locked = True
return True
def _load(self):
"""Load cached settings from JSON file `self._filepath`."""
self._nosave = True
d = {}
with open(self._filepath, 'rb') as file_obj:
for key, value in json.load(file_obj, encoding='utf-8').items():
d[key] = value
self.update(d)
self._original = deepcopy(d)
self._nosave = False
def debugging(self):
"""Whether Alfred's debugger is open.
:returns: ``True`` if Alfred's debugger is open.
:rtype: ``bool``
"""
if self._debugging is None:
if self.alfred_env.get('debug') == 1:
self._debugging = True
else:
self._debugging = False
return self._debugging
def cached_data(self, data_func=None, max_age=60):
"""Return cached data if younger than ``max_age`` seconds.
Retrieve data from cache or re-generate and re-cache data if
stale/non-existant. If ``max_age`` is 0,return cached data no
matter how old.
:param name: name of datastore
:param data_func: function to (re-)generate data.
:type data_func: ``callable``
:param max_age: maximum age of cached data in seconds
:type max_age: ``int``
:returns: cached data,return value of ``data_func`` or ``None``
if ``data_func`` is not set
"""
serializer = manager.serializer(self.cache_serializer)
cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))
age = self.cached_data_age(name)
if (age < max_age or max_age == 0) and os.path.exists(cache_path):
with open(cache_path, 'rb') as file_obj:
self.logger.debug('Loading cached data from : %s',
cache_path)
return serializer.load(file_obj)
if not data_func:
return None
data = data_func()
self.cache_data(name, data)
return data
def open_log(self):
"""Open :attr:`logfile` in default app (usually Console.app)."""
subprocess.call(['open', self.logfile])
def open_cachedir(self):
"""Open the workflow's :attr:`cachedir` in Finder."""
subprocess.call(['open', self.cachedir])
def open_workflowdir(self):
"""Open the workflow's :attr:`workflowdir` in Finder."""
subprocess.call(['open', self.workflowdir])
def open_terminal(self):
"""Open a Terminal window at workflow's :attr:`workflowdir`."""
subprocess.call(['open', '-a', 'Terminal',
self.workflowdir])
def open_help(self):
"""Open :attr:`help_url` in default browser."""
subprocess.call(['open', self.help_url])
return 'opening workflow help URL in browser'
@H_797_404@####################################################################
@H_797_404@# Helper methods
@H_797_404@####################################################################
def read_pid_from_pidfile(pidfile_path):
""" Read the PID recorded in the named PID file.
Read and return the numeric PID recorded as text in the named
PID file. If the PID file cannot be read,or if the content is
not a valid PID,return ``None``.
"""
pid = None
try:
pidfile = open(pidfile_path, 'r')
except IOError:
pass
else:
@H_797_404@# According to the FHS 2.3 section on PID files in /var/run:
@H_797_404@#
@H_797_404@# The file must consist of the process identifier in
@H_797_404@# ASCII-encoded decimal,followed by a newline character.
@H_797_404@#
@H_797_404@# Programs that read PID files should be somewhat flexible
@H_797_404@# in what they accept; i.e.,they should ignore extra
@H_797_404@# whitespace,leading zeroes,absence of the trailing
@H_797_404@# newline,or additional lines in the PID file.
line = pidfile.readline().strip()
try:
pid = int(line)
except ValueError:
pass
pidfile.close()
return pid
def get_resource_stream(self, manager, resource_name):
return open(self._fn(self.module_path, resource_name), 'rb')
def _get(self, path):
with open(path, 'rb') as stream:
return stream.read()
def _is_current(self, file_path, zip_path):
"""
Return True if the file_path is current for this zip_path
"""
timestamp, size = self._get_date_and_size(self.zipinfo[zip_path])
if not os.path.isfile(file_path):
return False
stat = os.stat(file_path)
if stat.st_size != size or stat.st_mtime != timestamp:
return False
@H_797_404@# check that the contents match
zip_contents = self.loader.get_data(zip_path)
with open(file_path, 'rb') as f:
file_contents = f.read()
return zip_contents == file_contents
def _mkstemp(*args, **kw):
old_open = os.open
try:
@H_797_404@# temporarily bypass sandBoxing
os.open = os_open
return tempfile.mkstemp(*args, **kw)
finally:
@H_797_404@# and then put it back
os.open = old_open
@H_797_404@# Silence the PEP440Warning by default,so that end users don't get hit by it
@H_797_404@# randomly just because they use pkg_resources. We want to append the rule
@H_797_404@# because we want earlier uses of filterwarnings to take precedence over this
@H_797_404@# one.
def _secure_open_write(filename, fmode):
@H_797_404@# We only want to write to this file,so open it in write only mode
flags = os.O_WRONLY
@H_797_404@# os.O_CREAT | os.O_EXCL will fail if the file already exists,so we only
@H_797_404@# will open *new* files.
@H_797_404@# We specify this because we want to ensure that the mode we pass is the
@H_797_404@# mode of the file.
flags |= os.O_CREAT | os.O_EXCL
@H_797_404@# Do not follow symlinks to prevent someone from making a symlink that
@H_797_404@# we follow and insecurely open a cache file.
if hasattr(os, "O_nofollow"):
flags |= os.O_nofollow
@H_797_404@# On Windows we'll mark this file as binary
if hasattr(os, "O_BINARY"):
flags |= os.O_BINARY
@H_797_404@# Before we open our file,we want to delete any existing file that is
@H_797_404@# there
try:
os.remove(filename)
except (IOError, OSError):
@H_797_404@# The file must not exist already,so we can just skip ahead to opening
pass
@H_797_404@# Open our file,the use of os.O_CREAT | os.O_EXCL will ensure that if a
@H_797_404@# race condition happens between the os.remove and this line,that an
@H_797_404@# error will be raised. Because we utilize a lockfile this should only
@H_797_404@# happen if someone is attempting to attack us.
fd = os.open(filename, fmode)
try:
return os.fdopen(fd, "wb")
except:
@H_797_404@# An error occurred wrapping our FD in a file object
os.close(fd)
raise
def get(self, key):
name = self._fn(key)
if not os.path.exists(name):
return None
with open(name, 'rb') as fh:
return fh.read()
def get_path_uid(path):
"""
Return path's uid.
Does not follow symlinks:
https://github.com/pypa/pip/pull/935#discussion_r5307003
Placed this function in compat due to differences on AIX and
Jython,that should eventually go away.
:raises OSError: When path is a symlink or can't be read.
"""
if hasattr(os, os.O_RDONLY | os.O_nofollow)
file_uid = os.fstat(fd).st_uid
os.close(fd)
else: @H_797_404@# AIX and Jython
@H_797_404@# WARNING: time of check vulnerability,but best we can do w/o nofollow
if not os.path.islink(path):
@H_797_404@# older versions of Jython don't have `os.fstat`
file_uid = os.stat(path).st_uid
else:
@H_797_404@# raise OSError for parity with os.O_nofollow above
raise OSError(
"%s is a symlink; Will not return uid for symlinks" % path
)
return file_uid