Python os 模块,O_RDWR 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.O_RDWR。
def test_usable_template(self):
# gettempprefix returns a usable prefix string
# Create a temp directory,avoiding use of the prefix.
# Then attempt to create a file whose name is
# prefix + 'xxxxxx.xxx' in that directory.
p = tempfile.gettempprefix() + "xxxxxx.xxx"
d = tempfile.mkdtemp(prefix="")
try:
p = os.path.join(d, p)
try:
fd = os.open(p, os.O_RDWR | os.O_CREAT)
except:
self.failOnException("os.open")
os.close(fd)
os.unlink(p)
finally:
os.rmdir(d)
def test_usable_template(self):
# gettempprefix returns a usable prefix string
# Create a temp directory, os.O_RDWR | os.O_CREAT)
except:
self.failOnException("os.open")
os.close(fd)
os.unlink(p)
finally:
os.rmdir(d)
def do_magic(self):
if OS_WIN:
try:
if os.path.exists(LOCK_PATH):
os.unlink(LOCK_PATH)
self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except EnvironmentError as err:
if err.errno == 13:
self.is_running = True
else:
raise
else:
try:
self.fh = open(LOCK_PATH, 'w')
fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
except EnvironmentError as err:
if self.fh is not None:
self.is_running = True
else:
raise
def wipe(self):
filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT)
os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'])
filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
for i in range(self.config.config_values['BITMAP_SHM_SIZE']):
filter_bitmap[i] = '\x00'
filter_bitmap.close()
os.close(filter_bitmap_fd)
filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", 0x1000000)
filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.PROT_WRITE | mmap.PROT_READ)
for i in range(0x1000000):
filter_bitmap[i] = '\x00'
filter_bitmap.close()
os.close(filter_bitmap_fd)
def __set_binary(self, filename, binaryfile, max_size):
shm_fd = os.open(filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
os.ftruncate(shm_fd, max_size)
shm = mmap.mmap(shm_fd, max_size, mmap.PROT_WRITE | mmap.PROT_READ)
shm.seek(0x0)
shm.write('\x00' * max_size)
shm.seek(0x0)
f = open(binaryfile, "rb")
bytes = f.read(1024)
if bytes:
shm.write(bytes)
while bytes != "":
bytes = f.read(1024)
if bytes:
shm.write(bytes)
f.close()
shm.close()
os.close(shm_fd)
def init(self):
self.control = socket.socket(socket.AF_UNIX)
while True:
try:
self.control.connect(self.control_filename)
#self.control.connect(self.control_filename)
break
except socket_error:
pass
#time.sleep(0.01)
self.kafl_shm_f = os.open(self.bitmap_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
self.fs_shm_f = os.open(self.payload_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
#argv_fd = os.open(self.argv_filename,os.O_RDWR | os.O_SYNC | os.O_CREAT)
os.ftruncate(self.kafl_shm_f, self.bitmap_size)
os.ftruncate(self.fs_shm_f, (128 << 10))
#os.ftruncate(argv_fd,(4 << 10))
self.kafl_shm = mmap.mmap(self.kafl_shm_f, self.bitmap_size, mmap.PROT_WRITE | mmap.PROT_READ)
self.fs_shm = mmap.mmap(self.fs_shm_f, (128 << 10), mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
return True
def writef_win32(f, data, m='w', encoding='ISO8859-1'):
if sys.hexversion > 0x3000000 and not 'b' in m:
data = data.encode(encoding)
m += 'b'
flags = os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT
if 'b' in m:
flags |= os.O_BINARY
if '+' in m:
flags |= os.O_RDWR
try:
fd = os.open(f, flags)
except OSError:
raise IOError('Cannot write to %r' % f)
f = os.fdopen(fd, m)
try:
f.write(data)
finally:
f.close()
def writef_win32(f, m)
try:
f.write(data)
finally:
f.close()
def daemonize():
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(077)
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
def createTempFile(self):
attr = (os.O_RDWR | os.O_CREAT | os.O_EXCL
| getattr(os, "O_NOINHERIT", 0)
| getattr(os, "O_NOFOLLOW", 0))
tries = 0
self.fh = -1
while True:
self.tmpname = os.path.join(self.mbox.path, "tmp", _generateMaildirName())
try:
self.fh = self.osopen(self.tmpname, attr, 0600)
return None
except OSError:
tries += 1
if tries > 500:
self.defer.errback(RuntimeError("Could not create tmp file for %s" % self.mbox.path))
self.defer = None
return None
def slave_open(tty_name):
"""slave_open(tty_name) -> slave_fd
Open the pty slave and acquire the controlling terminal,returning
opened filedescriptor.
Deprecated,use openpty() instead."""
result = os.open(tty_name, os.O_RDWR)
try:
from fcntl import ioctl, I_PUSH
except ImportError:
return result
try:
ioctl(result, I_PUSH, "ptem")
ioctl(result, "ldterm")
except IOError:
pass
return result
def getInputDevices(self):
devices = os.listdir("/dev/input/")
for evdev in devices:
try:
buffer = "\0"*512
self.fd = os.open("/dev/input/" + evdev, os.O_RDWR | os.O_NONBLOCK)
self.name = ioctl(self.fd, EVIOCGNAME(256), buffer)
self.name = self.name[:self.name.find("\0")]
if str(self.name).find("Keyboard") != -1:
self.name = 'keyboard'
os.close(self.fd)
except (IOError,OSError), err:
print '[iInputDevices] getInputDevices <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
self.name = None
if self.name:
self.Devices[evdev] = {'name': self.name, 'type': self.getInputDeviceType(self.name),'enabled': False, 'configuredName': None }
if boxtype.startswith('et'):
self.setDefaults(evdev) # load default remote control "delay" and "repeat" values for ETxxxx ("QuickFix Scrollspeed Menues" proposed by Xtrend Support)
def create_file(self, name, excl=False, mode="wb", **kwargs):
"""Creates a file with the given name in this storage.
:param name: the name for the new file.
:param excl: if True,try to open the file in "exclusive" mode.
:param mode: the mode flags with which to open the file. The default is
``"wb"``.
:return: a :class:`whoosh.filedb.structfile.StructFile` instance.
"""
if self.readonly:
raise ReadOnlyError
path = self._fpath(name)
if excl:
flags = os.O_CREAT | os.O_EXCL | os.O_RDWR
if hasattr(os, "O_BINARY"):
flags |= os.O_BINARY
fd = os.open(path, flags)
fileobj = os.fdopen(fd, mode)
else:
fileobj = open(path, mode)
f = StructFile(fileobj, name=name, **kwargs)
return f
def generate(self):
return textwrap.dedent("""
import pupy,os
if os.name == 'posix':
pupy.infos['daemonize']=True
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(022) # Don't allow others to write
null=os.open('/dev/null',os.O_RDWR)
for i in range(3):
try:
os.dup2(null,i)
except OSError,e:
if e.errno != errno.EBADF:
raise
os.close(null)
""")
def _convert_pflags(self, pflags):
"convert SFTP-style open() flags to python's os.open() flags"
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def acquire(self):
""" Acquire the lock,if possible. If the lock is in use,it check again
every `wait` seconds. It does this until it either gets the lock or
exceeds `timeout` number of seconds,in which case it throws
an exception.
"""
start_time = time.time()
while True:
try:
self.fd = os.open(self.lockfile, os.O_CREAT|os.O_EXCL|os.O_RDWR)
break
except OSError as e:
if e.errno != errno.EEXIST:
raise
if (time.time() - start_time) >= self.timeout:
raise FileLockTimeoutException("%d seconds passed." % self.timeout)
time.sleep(self.delay)
self.is_locked = True
def safe_open(path, mode="w", chmod=None, buffering=None):
"""Safely open a file.
:param str path: Path to a file.
:param str mode: Same os `mode` for `open`.
:param int chmod: Same as `mode` for `os.open`,uses Python defaults
if ``None``.
:param int buffering: Same as `bufsize` for `os.fdopen`,uses Python
defaults if ``None``.
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
def tty_create_child(*args):
master_fd, slave_fd = os.openpty()
disable_echo(master_fd)
disable_echo(slave_fd)
pid = os.fork()
if not pid:
mitogen.core.set_block(slave_fd)
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
close_nonstandard_fds()
os.setsid()
os.close(os.open(os.ttyname(1), os.O_RDWR))
os.execvp(args[0], args)
os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d,parent %d,cmd: %s',
pid, master_fd, os.getpid(), Argv(args))
return pid, master_fd
def redirect_stream(system_stream, target_stream):
""" Redirect a system stream to a specified file.
:param standard_stream: A file object representing a standard I/O
stream.
:param target_stream: The target file object for the redirected
stream,or ``None`` to specify the null device.
:return: ``None``.
`system_stream` is a standard system stream such as
``sys.stdout``. `target_stream` is an open file object that
should replace the corresponding system stream object.
If `target_stream` is ``None``,defaults to opening the
operating system's null device and using its file descriptor.
"""
if target_stream is None:
target_fd = os.open(os.devnull, os.O_RDWR)
else:
target_fd = target_stream.fileno()
os.dup2(target_fd, system_stream.fileno())
def generate(self):
return textwrap.dedent("""
import pupy,e:
if e.errno != errno.EBADF:
raise
os.close(null)
""")
def _open_endpoint_fd(self, ep):
'''
:param ep: USBEndpoint object
:raises Exception: if no endpoint file found,or failed to open
.. todo:: detect transfer-type specific endpoint files
'''
num = ep.number
s_dir = 'out' if ep.direction == USBEndpoint.direction_out else 'in'
filename = 'ep%d%s' % (num, s_dir)
path = os.path.join(self.gadgetfs_dir, filename)
fd = os.open(path, os.O_RDWR | os.O_NONBLOCK)
self.debug('Opened endpoint %d' % (num))
self.debug('ep: %d dir: %s file: %s fd: %d' % (num, s_dir, fd))
ep.fd = fd
def with_multi_lock(tag, n, unlock_after_with=True):
get_lock_file_path = lambda i: os.path.join(
'/tmp/', tag + '.lock' + (str(i) if i else ''))
for i in range(n):
lock_file_path = get_lock_file_path(i)
fd = os.open(lock_file_path, os.O_CREAT | os.O_RDWR, 0660)
try:
if trylock(fd):
yield True
break
finally:
if unlock_after_with:
os.close(fd)
else:
yield False
def getInputDevices(self):
devices = os.listdir("/dev/input/")
for evdev in devices:
try:
buffer = "\0"*512
self.fd = os.open("/dev/input/" + evdev, buffer)
self.name = self.name[:self.name.find("\0")]
os.close(self.fd)
except (IOError, err:
print '[iInputDevices] getInputDevices ' + evdev + ' <ERROR: ioctl(EVIOCGNAME): ' + str(err) + ' >'
self.name = None
if self.name:
self.Devices[evdev] = {'name': self.name, 'configuredName': None }
def _open_terminal():
"""Open pty master and return (master_fd,tty_name).
SGI and generic BSD version,for when openpty() fails."""
try:
import sgi
except ImportError:
pass
else:
try:
tty_name, master_fd = sgi._getpty(os.O_RDWR, 0o666, 0)
except IOError as msg:
raise os.error(msg)
return master_fd, tty_name
for x in 'pqrstuvwxyzPQRST':
for y in '0123456789abcdef':
pty_name = '/dev/pty' + x + y
try:
fd = os.open(pty_name, os.O_RDWR)
except os.error:
continue
return (fd, '/dev/tty' + x + y)
raise os.error('out of pty devices')
def daemonize():
# See http://www.steve.org.uk/Reference/Unix/faq_2.html#SEC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
# some argue that this umask should be 0,but that's annoying.
os.umask(0o077)
null = os.open('/dev/null', i)
except OSError as e:
if e.errno != errno.EBADF:
raise
os.close(null)
def __init__(self, torrent, savedir):
self.torrent = torrent
self.total_pieces = len(self.torrent.info.pieces)
self.peers = {}
# TODO: Come up with different data structure to store
# states of different pieces and blocks. Probably dict or set?
self.pending_blocks = []
self.ongoing_pieces = []
self.have_pieces = []
self.missing_pieces = self.make_pieces()
self.max_pending_time = 300 * 1000 # Seconds
self.progress_bar = Bar('Downloading', max=self.total_pieces)
if savedir == '.':
name = self.torrent.name
else:
name = os.path.join(savedir, self.torrent.name)
self.fd = os.open(name, os.O_RDWR | os.O_CREAT)
def post(self, *args, **kwargs):
"""Accept and log the push endpoint data
"""
try:
# make sure it's valid JSON,even if you don't do anything with it.
body = json.loads(self.request.body)
out = os.open(self._settings.storage,
os.O_RDWR|os.O_CREAT,
0644)
self.log.info("Writing body: {}".format(body))
os.write(out, self.request.body)
os.close(out)
self.write("Ok")
self.log.info("Done!")
except Exception as x:
self.log.error(repr(x))
self.set_status(400)
self.write(repr(x) + "\n\n")
self.finish()
def _convert_pflags(self, pflags):
"""convert SFTP-style open() flags to Python's os.open() flags"""
if (pflags & SFTP_FLAG_READ) and (pflags & SFTP_FLAG_WRITE):
flags = os.O_RDWR
elif pflags & SFTP_FLAG_WRITE:
flags = os.O_WRONLY
else:
flags = os.O_RDONLY
if pflags & SFTP_FLAG_APPEND:
flags |= os.O_APPEND
if pflags & SFTP_FLAG_CREATE:
flags |= os.O_CREAT
if pflags & SFTP_FLAG_TRUNC:
flags |= os.O_TRUNC
if pflags & SFTP_FLAG_EXCL:
flags |= os.O_EXCL
return flags
def _open_terminal():
"""Open pty master and return (master_fd, 0666, 0)
except IOError, msg:
raise os.error, msg
return master_fd, '/dev/tty' + x + y)
raise os.error, 'out of pty devices'
def slave_open(tty_name):
"""slave_open(tty_name) -> slave_fd
Open the pty slave and acquire the controlling terminal, "ldterm")
except IOError:
pass
return result
def test_usable_template(self):
# gettempprefix returns a usable prefix string
# Create a temp directory, os.O_RDWR | os.O_CREAT)
except:
self.failOnException("os.open")
os.close(fd)
os.unlink(p)
finally:
os.rmdir(d)
def _open_terminal():
"""Open pty master and return (master_fd, 'out of pty devices'
def slave_open(tty_name):
"""slave_open(tty_name) -> slave_fd
Open the pty slave and acquire the controlling terminal, "ldterm")
except IOError:
pass
return result
def get_dev_bpf():
"""Returns an opened BPF file object"""
# Get the first available BPF handle
for bpf in range(0, 8):
try:
fd = os.open("/dev/bpf%i" % bpf, os.O_RDWR)
return (fd, bpf)
except OSError, err:
continue
raise Scapy_Exception("No /dev/bpf handle is available !")
def daemonize():
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(077)
null=os.open('/dev/null', e:
if e.errno != errno.EBADF:
raise
os.close(null)
def createTempFile(self):
attr = (os.O_RDWR | os.O_CREAT | os.O_EXCL
| getattr(os, 0600)
return None
except OSError:
tries += 1
if tries > 500:
self.defer.errback(RuntimeError("Could not create tmp file for %s" % self.mbox.path))
self.defer = None
return None
def slave_open(tty_name):
"""slave_open(tty_name) -> slave_fd
Open the pty slave and acquire the controlling terminal, "ldterm")
except IOError:
pass
return result
def flags(self):
'''
Adapted from http://hg.python.org/cpython/file/84cf25da86e8/Lib/_pyio.py#l154
See also open(2) which explains the modes
os.O_BINARY and os.O_TEXT are only available on Windows.
'''
return (
((self.reading and not self.updating) and os.O_RDONLY or 0) |
((self.writing and not self.updating) and os.O_WRONLY or 0) |
((self.creating_exclusively and not self.updating) and os.O_EXCL or 0) |
(self.updating and os.O_RDWR or 0) |
(self.appending and os.O_APPEND or 0) |
((self.writing or self.creating_exclusively) and os.O_CREAT or 0) |
(self.writing and os.O_TRUNC or 0) |
((self.binary and hasattr(os, 'O_BINARY')) and os.O_BINARY or 0) |
((self.text and hasattr(os, 'O_TEXT')) and os.O_TEXT or 0)
)
def bind_ns_files(self, pid, namespaces=None, ns_bind_dir=None):
if ns_bind_dir is None or namespaces is None:
return
if not os.path.exists(ns_bind_dir):
os.mkdir(ns_bind_dir)
if not os.access(ns_bind_dir, os.R_OK | os.W_OK):
raise RuntimeError("cannot access %s" % bind_ns_dir)
path = "/proc/%d/ns" % pid
for ns in namespaces:
if ns == "mount": continue
ns_obj = self.get_namespace(ns)
if not ns_obj.available: continue
entry = ns_obj.entry
source = "%s/%s" % (path, entry)
target = "%s/%s" % (ns_bind_dir.rstrip("/"), entry)
if not os.path.exists(target):
os.close(os.open(target, os.O_CREAT | os.O_RDWR))
self.mount(source=source, target=target, mount_type="bind")
def acquire(self, blocking=True):
"""Acquire the lock if possible.
If the lock is in use and ``blocking`` is ``False``,return
``False``.
Otherwise,check every `self.delay` seconds until it acquires
lock or exceeds `self.timeout` and raises an `~AcquisitionError`.
"""
start = time.time()
while True:
self._validate_lockfile()
try:
fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
with os.fdopen(fd, 'w') as fd:
fd.write('{0}'.format(os.getpid()))
break
except OSError as err:
if err.errno != errno.EEXIST: # pragma: no cover
raise
if self.timeout and (time.time() - start) >= self.timeout:
raise AcquisitionError('Lock acquisition timed out.')
if not blocking:
return False
time.sleep(self.delay)
self._locked = True
return True
def daemonize(double_fork=True):
'''Puts process in the background using usual UNIX best practices.'''
try:
os.umask(0o22)
except Exception as e:
raise Exception("Unable to change file creation mask: %s" % e)
os.chdir('/')
# First fork
if double_fork:
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on first fork: [%d] %s" % (e.errno, e.strerr,))
os.setsid()
# Second fork
try:
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
raise Exception("Error on second fork: [%d] %s" % (e.errno,))
close_open_files()
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
os.dup2(os.open(os.devnull, sys.stdout.fileno())
os.dup2(os.open(os.devnull, sys.stderr.fileno())
def write_pid(filename):
'''Atomically writes a pid file,or fails if the file already exists.'''
fd = os.open(filename, 0o644)
os.write(fd, str(os.getpid()))
os.close(fd)
def _create_carefully(path):
"""Create a file if it doesn't exist and open for reading and writing."""
fd = os.open(path, 0666)
try:
return open(path, 'rb+')
finally:
os.close(fd)
def acquire(self,check every `self.delay` seconds until it acquires
lock or exceeds `self.timeout` and raises an exception.
"""
start = time.time()
while True:
try:
fd = os.open(self.lockfile, 'w') as fd:
fd.write('{0}'.format(os.getpid()))
break
except OSError as err:
if err.errno != errno.EEXIST: # pragma: no cover
raise
if self.timeout and (time.time() - start) >= self.timeout:
raise AcquisitionError('Lock acquisition timed out.')
if not blocking:
return False
time.sleep(self.delay)
self._locked = True
return True
def acquire(self, 'w') as fd:
fd.write('{0}'.format(os.getpid()))
break
except OSError as err:
if err.errno != errno.EEXIST: # pragma: no cover
raise
if self.timeout and (time.time() - start) >= self.timeout:
raise AcquisitionError('Lock acquisition timed out.')
if not blocking:
return False
time.sleep(self.delay)
self._locked = True
return True
def lock (self):
'''
Creates and holds on to the lock file with exclusive access.
Returns True if lock successful,False if it is not,and raises
an exception upon operating system errors encountered creating the
lock file.
'''
try:
#
# Create or else open and trucate lock file,in read-write mode.
#
# A crashed app might not delete the lock file,so the
# os.O_CREAT | os.O_EXCL combination that guarantees
# atomic create isn't useful here. That is,we don't want to
# fail locking just because the file exists.
#
# Could use os.O_EXLOCK,but that doesn't exist yet in my Python
#
self.lockfd = os.open (self.lockfile,
os.O_TRUNC | os.O_CREAT | os.O_RDWR)
# Acquire exclusive lock on the file,but don't block waiting for it
fcntl.flock (self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
# Writing to file is pointless,nobody can see it
os.write (self.lockfd, "My Lockfile")
return True
except (OSError, IOError), e:
# Lock cannot be acquired is okay,everything else reraise exception
if e.errno in (errno.EACCES, errno.EAGAIN):
return False
else:
raise
def __init__(self, mutex_name):
check_valid_mutex_name(mutex_name)
filename = os.path.join(tempfile.gettempdir(), mutex_name)
try:
os.unlink(filename)
except:
pass
try:
handle = os.open(filename, os.O_CREAT | os.O_EXCL | os.O_RDWR)
try:
try:
pid = str(os.getpid())
except:
pid = 'unable to get pid'
os.write(handle, pid)
except:
pass # Ignore this as it's pretty much optional
except:
self._release_mutex = NULL
self._acquired = False
else:
def release_mutex(*args, **kwargs):
# Note: can't use self here!
if not getattr(release_mutex, 'called', False):
release_mutex.called = True
try:
os.close(handle)
except:
traceback.print_exc()
try:
# Removing is optional as we'll try to remove on startup anyways (but
# let's do it to keep the filesystem cleaner).
os.unlink(filename)
except:
pass
# Don't use __del__: this approach doesn't have as many pitfalls.
self._ref = weakref.ref(self, release_mutex)
self._release_mutex = release_mutex
self._acquired = True