Python codecs 模块,StreamReaderWriter() 实例源码
我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用codecs.StreamReaderWriter()。
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr,
codecs.getreader('utf8'),
codecs.getwriter('utf8'),
'replace')
_cpplint_state.ResetErrorCounts()
for filename in filenames:
ProcessFile(filename, _cpplint_state.verbose_level)
_cpplint_state.PrintErrorCounts()
sys.exit(_cpplint_state.error_count > 0)
def test_all(self):
api = (
"encode", "decode",
"register", "CodecInfo", "Codec", "IncrementalEncoder",
"IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
"getencoder", "getdecoder", "getincrementalencoder",
"getincrementaldecoder", "getreader", "getwriter",
"register_error", "lookup_error",
"strict_errors", "replace_errors", "ignore_errors",
"xmlcharrefreplace_errors", "backslashreplace_errors",
"open", "EncodedFile",
"iterencode", "iterdecode",
"BOM", "BOM_BE", "BOM_LE",
"BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
"BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
"BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented
"StreamReaderWriter", "StreamRecoder",
)
self.assertEqual(sorted(api), sorted(codecs.__all__))
for api in codecs.__all__:
getattr(codecs, api)
def read_with_encoding(self, filename, document, codec_info, encoding):
f = None
try:
f = codecs.StreamReaderWriter(open(filename, 'rb'), codec_info[2],
codec_info[3], 'strict')
lines = f.readlines()
lines = dedent_lines(lines, self.options.get('dedent'))
return lines
except (IOError, OSError):
return [document.reporter.warning(
'Include file %r not found or reading it Failed' % filename,
line=self.lineno)]
except UnicodeError:
return [document.reporter.warning(
'Encoding %r used for reading included file %r seems to '
'be wrong,try giving an :encoding: option' %
(encoding, filename))]
finally:
if f is not None:
f.close()
def __init__(
self,
file_path: str,
mode: Optional[str] = 'a',
encoding: Optional[str] = 'utf8',
errors: Optional[str] = 'strict',
buffering: Optional[int] = 1,
name: Optional[str] = None,
level: Optional[LogLevel] = None
):
"""Instantiates a new ``FileHandler``
:param file_path: the path (full or relative) to the log file
:param mode: the file mode
:param encoding: the file encoding
:param errors: how should errors be handled
:param buffering: should the line be buffered
:param name: the name of the handler
:param level: the minimum level of verbosity/priority of the messages this will log
"""
self.fh: StreamReaderWriter = codecs.open(
file_path, mode=mode, encoding=encoding, errors=errors,
buffering=buffering)
super().__init__(name=name, level=level)
self.encoding: str = encoding
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr, _cpplint_state.verbose_level)
_cpplint_state.PrintErrorCounts()
sys.exit(_cpplint_state.error_count > 0)
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr, _cpplint_state.verbose_level)
_cpplint_state.PrintErrorCounts()
sys.exit(_cpplint_state.error_count > 0)
def test_all(self):
api = (
"encode",
)
self.assertCountEqual(api, codecs.__all__)
for api in codecs.__all__:
getattr(codecs, api)
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr, _cpplint_state.verbose_level)
_cpplint_state.PrintErrorCounts()
sys.exit(_cpplint_state.error_count > 0)
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr, _cpplint_state.verbose_level)
_cpplint_state.PrintErrorCounts()
sys.exit(_cpplint_state.error_count > 0)
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr, _cpplint_state.verbose_level)
_cpplint_state.PrintErrorCounts()
sys.exit(_cpplint_state.error_count > 0)
def readline(self):
"Read a line from elyxer.file"
self.current = self.file.readline()
if not isinstance(self.file, codecs.StreamReaderWriter):
self.current = self.current.decode('utf-8')
if len(self.current) == 0:
self.depleted = True
self.current = self.current.rstrip('\n\r')
self.linenumber += 1
self.mustread = False
Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
if self.linenumber % 1000 == 0:
Trace.message('Parsing')
def readline(self):
"Read a line from elyxer.file"
self.current = self.file.readline()
if not isinstance(self.file, codecs.StreamReaderWriter):
self.current = self.current.decode('utf-8')
if len(self.current) == 0:
self.depleted = True
self.current = self.current.rstrip('\n\r')
self.linenumber += 1
self.mustread = False
Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
if self.linenumber % 1000 == 0:
Trace.message('Parsing')
def test_streamreaderwriter(self):
f = io.BytesIO(b"\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, info.streamreader,
info.streamwriter, 'strict') as srw:
self.assertEqual(srw.read(), "\xfc")
def test_streamreaderwriter(self):
f = StringIO.StringIO("\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, u"\xfc")
def test_streamreaderwriter(self):
f = StringIO.StringIO("\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, u"\xfc")
def readline(self):
"Read a line from elyxer.file"
self.current = self.file.readline()
if not isinstance(self.file, codecs.StreamReaderWriter):
self.current = self.current.decode('utf-8')
if len(self.current) == 0:
self.depleted = True
self.current = self.current.rstrip('\n\r')
self.linenumber += 1
self.mustread = False
Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
if self.linenumber % 1000 == 0:
Trace.message('Parsing')
def test_streamreaderwriter(self):
f = io.BytesIO(b"\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, "\xfc")
def read_with_encoding(self, encoding):
global cache
f = None
try:
if not self.arguments[0] in cache:
f = codecs.StreamReaderWriter(urllib2.urlopen(self.arguments[0]),
codec_info[3], 'strict')
lines = f.readlines()
cache[self.arguments[0]] = lines
else:
lines = cache[self.arguments[0]]
lines = dedent_lines(lines, OSError, urllib2.URLError):
return [document.reporter.warning(
'Include file %r not found or reading it Failed' % self.arguments[0], self.arguments[0]))]
finally:
if f is not None:
f.close()
def test_streamreaderwriter(self):
f = StringIO.StringIO("\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, u"\xfc")
def test_open(self):
self.addCleanup(support.unlink, support.TESTFN)
for mode in ('w', 'r', 'r+', 'w+', 'a', 'a+'):
with self.subTest(mode), \
codecs.open(support.TESTFN, mode, 'ascii') as file:
self.assertisinstance(file, codecs.StreamReaderWriter)
def test_streamreaderwriter(self):
f = io.BytesIO(b"\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, "\xfc")
def _gettextwriter(out, encoding):
if out is None:
import sys
return sys.stdout
if isinstance(out, io.TextIOBase):
# use a text writer as is
return out
if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)):
# use a codecs stream writer as is
return out
# wrap a binary writer with TextIOWrapper
if isinstance(out, io.RawIOBase):
# Keep the original file open when the TextIOWrapper is
# destroyed
class _wrapper:
__class__ = out.__class__
def __getattr__(self, name):
return getattr(out, name)
buffer = _wrapper()
buffer.close = lambda: None
else:
# This is to handle passed objects that aren't in the
# IOBase hierarchy,but just have a write method
buffer = io.BufferedioBase()
buffer.writable = lambda: True
buffer.write = out.write
try:
# TextIOWrapper uses this methods to determine
# if BOM (for UTF-16,etc) should be added
buffer.seekable = out.seekable
buffer.tell = out.tell
except AttributeError:
pass
return io.TextIOWrapper(buffer,
errors='xmlcharrefreplace',
newline='\n',
write_through=True)
def test_streamreaderwriter(self):
f = StringIO.StringIO("\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, u"\xfc")
def readline(self):
"Read a line from elyxer.file"
self.current = self.file.readline()
if not isinstance(self.file, codecs.StreamReaderWriter):
self.current = self.current.decode('utf-8')
if len(self.current) == 0:
self.depleted = True
self.current = self.current.rstrip('\n\r')
self.linenumber += 1
self.mustread = False
Trace.prefix = 'Line ' + str(self.linenumber) + ': '
if self.linenumber % 1000 == 0:
Trace.message('Parsing')
def test_streamreaderwriter(self):
f = io.BytesIO(b"\xc3\xbc")
info = codecs.lookup("utf-8")
with codecs.StreamReaderWriter(f, "\xfc")
def readline(self):
"Read a line from elyxer.file"
self.current = self.file.readline()
if not isinstance(self.file, codecs.StreamReaderWriter):
self.current = self.current.decode('utf-8')
if len(self.current) == 0:
self.depleted = True
self.current = self.current.rstrip('\n\r')
self.linenumber += 1
self.mustread = False
Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
if self.linenumber % 1000 == 0:
Trace.message('Parsing')
def readline(self):
"Read a line from elyxer.file"
self.current = self.file.readline()
if not isinstance(self.file, codecs.StreamReaderWriter):
self.current = self.current.decode('utf-8')
if len(self.current) == 0:
self.depleted = True
self.current = self.current.rstrip('\n\r')
self.linenumber += 1
self.mustread = False
Trace.prefix = 'Line ' + unicode(self.linenumber) + ': '
if self.linenumber % 1000 == 0:
Trace.message('Parsing')
def main():
filenames = ParseArguments(sys.argv[1:])
# Change stderr to write with replacement characters so we don't die
# if we try to print something containing non-ASCII characters.
sys.stderr = codecs.StreamReaderWriter(sys.stderr,
'replace')
_cpplint_state.ResetErrorCounts()
f = open(_output_file, 'w')
title = 'path,dir,'
for i in range(1, 16):
title += 'PFM%d' % i + ','
for i in range(1, 21):
title += 'PRM%d' % i + ', 26):
title += 'PLM%d' % i + ','
f.write(title)
for filename in filenames:
#added by Robert
global _current_filenmae
_current_filenmae = filename
print 'current file name: ' + filename
global _stat
_stat = _Stat()
#added by Robert End
ProcessFile(filename, _cpplint_state.verbose_level, [ExtraCheckLine])
#added by Roebrt
global _output_file
_stat.WriteFile(filename, f)
#added by Robert end
f.close()
_cpplint_state.PrintErrorCounts()
_stat.PrintValidFiles() #added by Robert
sys.exit(_cpplint_state.error_count > 0)