Python io 模块,UnsupportedOperation() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.UnsupportedOperation()。
def super_len(o):
if hasattr(o, '__len__'):
return len(o)
if hasattr(o, 'len'):
return o.len
if hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
return os.fstat(fileno).st_size
if hasattr(o, 'getvalue'):
# e.g. BytesIO,cStringIO.StringI
return len(o.getvalue())
def chunks(self, chunk_size=None):
"""
Read the file and yield chunks of ``chunk_size`` bytes (defaults to
``UploadedFile.DEFAULT_CHUNK_SIZE``).
"""
if not chunk_size:
chunk_size = self.DEFAULT_CHUNK_SIZE
try:
self.seek(0)
except (AttributeError, UnsupportedOperation):
pass
while True:
data = self.read(chunk_size)
if not data:
break
yield data
def chunks(self, UnsupportedOperation):
pass
while True:
data = self.read(chunk_size)
if not data:
break
yield data
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def _is_daemon():
# The process group for a foreground process will match the
# process group of the controlling terminal. If those values do
# not match,or ioctl() fails on the stdout file handle,we assume
# the process is running in the background as a daemon.
# http://www.gnu.org/software/bash/manual/bashref.html#Job-Control-Basics
try:
is_daemon = os.getpgrp() != os.tcgetpgrp(sys.stdout.fileno())
except io.UnsupportedOperation:
# Could not get the fileno for stdout,so we must be a daemon.
is_daemon = True
except OSError as err:
if err.errno == errno.ENottY:
# Assume we are a daemon because there is no terminal.
is_daemon = True
else:
raise
return is_daemon
def chunks(self, UnsupportedOperation):
pass
while True:
data = self.read(chunk_size)
if not data:
break
yield data
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def chunks(self, UnsupportedOperation):
pass
while True:
data = self.read(chunk_size)
if not data:
break
yield data
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def super_len(o):
if hasattr(o,cStringIO.StringI
return len(o.getvalue())
def total_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def total_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def __init__(self, stdin=None):
self.stdin = stdin or sys.stdin
# The input object should be a TTY.
assert self.stdin.isatty()
# Test whether the given input object has a file descriptor.
# (Idle reports stdin to be a TTY,but fileno() is not implemented.)
try:
# This should not raise,but can return 0.
self.stdin.fileno()
except io.UnsupportedOperation:
if 'idlelib.run' in sys.modules:
raise io.UnsupportedOperation(
'Stdin is not a terminal. Running from Idle is not supported.')
else:
raise io.UnsupportedOperation('Stdin is not a terminal.')
def __init__(self,but can return 0.
self.stdin.fileno()
except io.UnsupportedOperation:
if 'idlelib.run' in sys.modules:
raise io.UnsupportedOperation(
'Stdin is not a terminal. Running from Idle is not supported.')
else:
raise io.UnsupportedOperation('Stdin is not a terminal.')
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def super_len(o):
if hasattr(o,cStringIO.StringI
return len(o.getvalue())
def super_len(o):
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
def chunks(self, UnsupportedOperation):
pass
while True:
data = self.read(chunk_size)
if not data:
break
yield data
def super_len(o):
if hasattr(o,cStringIO.StringI
return len(o.getvalue())
def deal_with_empty_file_stream(data):
"""???????????0???????????"""
if hasattr(data, 'fileno') and hasattr(data, 'tell'):
try:
fileno = data.fileno()
total_length = os.fstat(fileno).st_size
current_position = data.tell()
if total_length - current_position == 0:
return ""
except io.UnsupportedOperation:
return ""
return data
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o,cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length,we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however,the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0,support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def seek(self, position: int, whence: int = 0) -> None:
"""
Seek the file at the given position.
.. note:: The :exc:`io.UnsupportedOperation` will be raised if the underlying file-object is not
:meth:`.seekable`.
:param position: the position to seek on.
:param whence: optional whence argument.
"""
raise NotImplementedError('Seek operation is not supported by this object: %r' % self) # pragma: no cover
def read(self, size=None):
raise io.UnsupportedOperation('read')
def readinto(self, buf):
raise io.UnsupportedOperation('readinto')
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def total_len(o):
# Stolen from requests_toolbelt and modified
if hasattr(o,cStringIO.StringIO
return len(o.getvalue())
try:
current_pos = o.tell()
length = o.seek(0, 2)
o.seek(current_pos, 0)
return length
except IOError:
pass
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def test_writing_to_stdout_is_diverted_if_broken(self, sys):
def sample(**kwargs):
assert kwargs['stdin'] is None
assert kwargs['stdout'] is not None
assert kwargs['stdout'] is not sys.stdout
sys.stdout.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stdin=None, stdout=None)
def test_arbitrary_outputs_are_not_replaced_even_if_stdout_is_broken(self, sys):
output = io.StringIO()
def sample(**kwargs):
assert kwargs['stdin'] is None
assert kwargs['stdout'] is output
sys.stdout.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stdin=None, stdout=output)
def test_writing_to_broken_stderr_is_output_after_returning(self, sys):
def sample(**kwargs):
kwargs['stderr'].write('Output')
# The output isn't written until we return
sys.stderr.write.assert_not_called()
sys.stderr.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stderr=None)
sys.stderr.write.assert_called_once_with('Output')
def wrap_subprocess_call(func, wrap_stdout=True):
@functools.wraps(func)
def wrapper(*popenargs, **kwargs):
out = kwargs.get('stdout', None)
err = kwargs.get('stderr', None)
replay_out = False
replay_err = False
if out is None and wrap_stdout:
try:
sys.stdout.fileno()
except io.UnsupportedOperation:
kwargs['stdout'] = tempfile.NamedTemporaryFile()
replay_out = True
if err is None:
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
kwargs['stderr'] = tempfile.NamedTemporaryFile()
replay_err = True
try:
return func(*popenargs, **kwargs)
finally:
if replay_out:
kwargs['stdout'].seek(0)
sys.stdout.write(kwargs['stdout'].read())
if replay_err:
kwargs['stderr'].seek(0)
sys.stderr.write(kwargs['stderr'].read())
return wrapper
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def open_stream(self, file_stream, **keywords):
"""open ods file stream"""
if not hasattr(file_stream, 'seek'):
# python 2
# Hei zipfile in odfpy would do a seek
# but stream from urlib cannot do seek
file_stream = BytesIO(file_stream.read())
try:
file_stream.seek(0)
except UnsupportedOperation:
# python 3
file_stream = BytesIO(file_stream.read())
BookReader.open_stream(self, **keywords)
self._load_from_memory()
def has_fileno(obj):
if not hasattr(obj, "fileno"):
return False
# check BytesIO case and maybe others
try:
obj.fileno()
except (AttributeError, IOError, io.UnsupportedOperation):
return False
return True
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, total_length - current_position)
def __read(self, n):
if not self._readable:
raise UnsupportedOperation('read')
while True:
try:
return _read(self._fileno, n)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._read_event)
def write(self, b):
if not self._writable:
raise UnsupportedOperation('write')
while True:
try:
return _write(self._fileno, b)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._write_event)
def __read(self, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._read_event)