Python os 模块,strerror() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.strerror()。
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
def decorator(func):
def _handle_timeout(signum, frame):
raise TimeoutError(error_message)
def wrapper(*args, **kwargs):
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(seconds)
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def make_filesystem(blk_device, fstype='ext4', timeout=10):
"""Make a new filesystem on the specified block device."""
count = 0
e_noent = os.errno.ENOENT
while not os.path.exists(blk_device):
if count >= timeout:
log('Gave up waiting on block device %s' % blk_device,
level=ERROR)
raise IOError(e_noent, os.strerror(e_noent), blk_device)
log('Waiting for block device %s to appear' % blk_device,
level=DEBUG)
count += 1
time.sleep(1)
else:
log('Formatting block device %s as filesystem %s.' %
(blk_device, fstype), level=INFO)
check_call(['mkfs', '-t', fstype, blk_device])
def make_filesystem(blk_device, blk_device])
def test_connection_refused(self):
cleanup_func, port = refusing_port()
self.addCleanup(cleanup_func)
with ExpectLog(gen_log, ".*", required=False):
self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
response = self.wait()
self.assertEqual(599, response.code)
if sys.platform != 'cygwin':
# cygwin returns EPERM instead of ECONNREFUSED here
contains_errno = str(errno.ECONNREFUSED) in str(response.error)
if not contains_errno and hasattr(errno, "WSAECONNREFUSED"):
contains_errno = str(errno.WSAECONNREFUSED) in str(response.error)
self.assertTrue(contains_errno, response.error)
# This is usually "Connection refused".
# On windows,strerror is broken and returns "Unknown error".
expected_message = os.strerror(errno.ECONNREFUSED)
self.assertTrue(expected_message in str(response.error),
response.error)
def _handle_connect(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
self.error = socket.error(err, os.strerror(err))
# IOLoop implementations may vary: some of them return
# an error state before the socket becomes writable,so
# in that case a connection failure would be handled by the
# error path in _handle_events instead of here.
if self._connect_future is None:
gen_log.warning("Connect error on fd %s: %s",
self.socket.fileno(), errno.errorcode[err])
self.close()
return
if self._connect_callback is not None:
callback = self._connect_callback
self._connect_callback = None
self._run_callback(callback)
if self._connect_future is not None:
future = self._connect_future
self._connect_future = None
future.set_result(self)
self._connecting = False
def test_connection_refused(self):
cleanup_func,
response.error)
def test_connection_refused(self):
cleanup_func,
response.error)
def _handle_connect(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, errno.errorcode[err])
self.close()
return
if self._connect_callback is not None:
callback = self._connect_callback
self._connect_callback = None
self._run_callback(callback)
if self._connect_future is not None:
future = self._connect_future
self._connect_future = None
future.set_result(self)
self._connecting = False
def _check(ret):
if ret is None:
errmsg = _lib.usb_strerror()
else:
if hasattr(ret, 'value'):
ret = ret.value
if ret < 0:
errmsg = _lib.usb_strerror()
# No error means that we need to get the error
# message from the return code
# Thanks to Nicholas Wheeler to point out the problem...
# Also see issue #2860940
if errmsg.lower() == 'no error':
errmsg = os.strerror(-ret)
else:
return ret
raise USBError(errmsg, ret)
def formatExit(self, process):
if CPU_I386:
regname = "eax"
elif CPU_X86_64:
regname = "rax"
elif CPU_PPC:
regname = "result"
else:
raise NotImplementedError()
self.result = process.getreg(regname)
if self.restype.endswith("*"):
text = formatAddress(self.result)
else:
uresult = self.result
self.result = ulong2long(self.result)
if self.result < 0:
text = "%s (%s)" % (
self.result, strerror(-self.result))
elif not(0 <= self.result <= 9):
text = "%s (%s)" % (self.result, formatWordHex(uresult))
else:
text = str(self.result)
return text
def decorator(minutes=1, error_message=os.strerror(errno.ETIME)):
def dec(func):
def _handle_timeout(signum, frame):
msg = 'Timeout Error: %s' % (error_message)
add_test_note(msg)
raise TimeoutException(error_message)
def wrapper(*args, **kwargs):
if minutes > 0:
signal.signal(signal.SIGALRM, _handle_timeout)
signal.alarm(int(minutes * 60))
try:
result = func(*args, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return dec
def __init__(self, cloexec=True, nonblock=True):
self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
flags = 0
if cloexec:
flags |= self.CLOEXEC
if nonblock:
flags |= self.NONBLOCK
self._inotify_fd = self._init1(flags)
if self._inotify_fd == -1:
raise INotifyError(os.strerror(ctypes.get_errno()))
self._buf = ctypes.create_string_buffer(5000)
self.fenc = get_preferred_file_name_encoding()
self.hdr = struct.Struct(b'iIII')
# We keep a reference to os to prevent it from being deleted
# during interpreter shutdown,which would lead to errors in the
# __del__ method
self.os = os
def read(self, get_name=True):
buf = []
while True:
num = self._read(self._inotify_fd, self._buf, len(self._buf))
if num == 0:
break
if num < 0:
en = ctypes.get_errno()
if en == errno.EAGAIN:
break # No more data
if en == errno.EINTR:
continue # Interrupted,try again
raise OSError(en, self.os.strerror(en))
buf.append(self._buf.raw[:num])
raw = b''.join(buf)
pos = 0
lraw = len(raw)
while lraw - pos >= self.hdr.size:
wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos)
pos += self.hdr.size
name = None
if get_name:
name = raw[pos:pos + name_len].rstrip(b'\0')
pos += name_len
self.process_event(wd, name)
def timeout(seconds=10, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
# special test case for running ssh commands at module level
def timeout(seconds=10, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def chdir(self, path):
"""
Change the "current directory" of this SFTP session. Since SFTP
doesn't really have the concept of a current working directory,this
is emulated by paramiko. Once you use this method to set a working
directory,all operations on this SFTPClient object will be relative
to that path. You can pass in C{None} to stop using a current working
directory.
@param path: new current working directory
@type path: str
@raise IOError: if the requested path doesn't exist on the server
@since: 1.4
"""
if path is None:
self._cwd = None
return
if not stat.S_ISDIR(self.stat(path).st_mode):
raise SFTPError(errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path))
self._cwd = self.normalize(path).encode('utf-8')
def timeout(seconds=10, **kwargs)
finally:
signal.alarm(0)
return result
return wraps(func)(wrapper)
return decorator
def ccid_xfr_block(self, data, timeout=0.1):
"""Encapsulate host command *data* into an PC/SC Escape command to
send to the device and extract the chip response if received
within *timeout* seconds.
"""
frame = struct.pack("<BI5B", 0x6F, len(data), 0, 0) + data
self.transport.write(bytearray(frame))
frame = self.transport.read(int(timeout * 1000))
if not frame or len(frame) < 10:
log.error("insufficient data for decoding ccid response")
raise IOError(errno.EIO, os.strerror(errno.EIO))
if frame[0] != 0x80:
log.error("expected a RDR_to_PC_DataBlock")
raise IOError(errno.EIO, os.strerror(errno.EIO))
if len(frame) != 10 + struct.unpack("<I", buffer(frame, 1, 4))[0]:
log.error("RDR_to_PC_DataBlock length mismatch")
raise IOError(errno.EIO, os.strerror(errno.EIO))
return frame[10:]
def command(self, cmd_code, cmd_data, timeout):
"""Send a host command and return the chip response.
"""
log.log(logging.DEBUG-1, self.CMD[cmd_code]+" "+hexlify(cmd_data))
frame = bytearray([0xD4, cmd_code]) + bytearray(cmd_data)
frame = bytearray([0xFF, 0x00, len(frame)]) + frame
frame = self.ccid_xfr_block(frame, timeout)
if not frame or len(frame) < 4:
log.error("insufficient data for decoding chip response")
raise IOError(errno.EIO, os.strerror(errno.EIO))
if not (frame[0] == 0xD5 and frame[1] == cmd_code + 1):
log.error("received invalid chip response")
raise IOError(errno.EIO, os.strerror(errno.EIO))
if not (frame[-2] == 0x90 and frame[-1] == 0x00):
log.error("received pseudo apdu with error status")
raise IOError(errno.EIO, os.strerror(errno.EIO))
return frame[2:-2]
def read(self, timeout):
if self.tty is not None:
self.tty.timeout = max(timeout/1E3, 0.05)
frame = bytearray(self.tty.read(6))
if frame is None or len(frame) == 0:
raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
if frame.startswith(b"\x00\x00\xff\x00\xff\x00"):
log.log(logging.DEBUG-1, "<<< %s", str(frame).encode("hex"))
return frame
LEN = frame[3]
if LEN == 0xFF:
frame += self.tty.read(3)
LEN = frame[5] << 8 | frame[6]
frame += self.tty.read(LEN + 1)
log.log(logging.DEBUG-1, hexlify(frame))
return frame
def read(self, timeout=0):
if self.usb_inp is not None:
try:
ep_addr = self.usb_inp.getAddress()
frame = self.usb_dev.bulkRead(ep_addr, 300, timeout)
except libusb.USBErrorTimeout:
raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
except libusb.USBErrorNoDevice:
raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
except libusb.USBError as error:
log.error("%r", error)
raise IOError(errno.EIO, os.strerror(errno.EIO))
if len(frame) == 0:
log.error("bulk read returned zero data")
raise IOError(errno.EIO, os.strerror(errno.EIO))
frame = bytearray(frame)
log.log(logging.DEBUG-1, hexlify(frame))
return frame
def __init__(self, chipset, logger):
self.chipset = chipset
self.log = logger
try:
chipset_communication = self.chipset.diagnose('line')
except Chipset.Error:
chipset_communication = False
if chipset_communication is False:
self.log.error("chipset communication test failed")
raise IOError(errno.EIO, os.strerror(errno.EIO))
# for line in self._print_ciu_register_page(0,1,2,3):
# self.log.debug(line)
# for addr in range(0,0x03FF,16):
# xram = self.chipset.read_register(*range(addr,addr+16))
# xram = ' '.join(["%02X" % x for x in xram])
# self.log.debug("0x%04X: %s",addr,xram)
def formatError(self, errorcode):
"""
Returns the string associated with a Windows error message,such as the
ones found in socket.error.
Attempts direct lookup against the win32 API via ctypes and then
pywin32 if available),then in the error table in the socket module,
then finally defaulting to C{os.strerror}.
@param errorcode: the Windows error code
@type errorcode: C{int}
@return: The error message string
@rtype: C{str}
"""
if self.winError is not None:
return self.winError(errorcode)[1]
if self.formatMessage is not None:
return self.formatMessage(errorcode)
if self.errorTab is not None:
result = self.errorTab.get(errorcode)
if result is not None:
return result
return os.strerror(errorcode)
def formatError(self,
then finally defaulting to C{os.strerror}.
@param errorcode: the Windows error code
@type errorcode: C{int}
@return: The error message string
@rtype: C{str}
"""
if self.winError is not None:
return self.winError(errorcode)[1]
if self.formatMessage is not None:
return self.formatMessage(errorcode)
if self.errorTab is not None:
result = self.errorTab.get(errorcode)
if result is not None:
return result
return os.strerror(errorcode)
def formatError(self,
then finally defaulting to C{os.strerror}.
@param errorcode: the Windows error code
@type errorcode: C{int}
@return: The error message string
@rtype: C{str}
"""
if self.winError is not None:
return self.winError(errorcode)[1]
if self.formatMessage is not None:
return self.formatMessage(errorcode)
if self.errorTab is not None:
result = self.errorTab.get(errorcode)
if result is not None:
return result
return os.strerror(errorcode)
def formatError(self,
then finally defaulting to C{os.strerror}.
@param errorcode: the Windows error code
@type errorcode: C{int}
@return: The error message string
@rtype: C{str}
"""
if self.winError is not None:
return self.winError(errorcode)[1]
if self.formatMessage is not None:
return self.formatMessage(errorcode)
if self.errorTab is not None:
result = self.errorTab.get(errorcode)
if result is not None:
return result
return os.strerror(errorcode)
def formatError(self,
then finally defaulting to C{os.strerror}.
@param errorcode: the Windows error code
@type errorcode: C{int}
@return: The error message string
@rtype: C{str}
"""
if self.winError is not None:
return self.winError(errorcode)[1]
if self.formatMessage is not None:
return self.formatMessage(errorcode)
if self.errorTab is not None:
result = self.errorTab.get(errorcode)
if result is not None:
return result
return os.strerror(errorcode)
def formatError(self,
then finally defaulting to C{os.strerror}.
@param errorcode: the Windows error code
@type errorcode: C{int}
@return: The error message string
@rtype: C{str}
"""
if self.winError is not None:
return self.winError(errorcode)[1]
if self.formatMessage is not None:
return self.formatMessage(errorcode)
if self.errorTab is not None:
result = self.errorTab.get(errorcode)
if result is not None:
return result
return os.strerror(errorcode)
def _serve_one_listener(listener, handler_nursery, handler):
async with listener:
while True:
try:
stream = await listener.accept()
except OSError as exc:
if exc.errno in ACCEPT_CAPACITY_ERRNOS:
LOGGER.error(
"accept returned %s (%s); retrying in %s seconds",
errno.errorcode[exc.errno],
os.strerror(exc.errno),
SLEEP_TIME,
exc_info=True
)
await trio.sleep(SLEEP_TIME)
else:
raise
else:
handler_nursery.start_soon(_run_handler, stream, handler)
def make_filesystem(blk_device, blk_device])
def formatError(self,
then finally defaulting to C{os.strerror}.
@param errorcode: the Windows error code
@type errorcode: C{int}
@return: The error message string
@rtype: C{str}
"""
if self.winError is not None:
return str(self.winError(errorcode))
if self.formatMessage is not None:
return self.formatMessage(errorcode)
if self.errorTab is not None:
result = self.errorTab.get(errorcode)
if result is not None:
return result
return os.strerror(errorcode)