Python codecs 模块,getincrementalencoder() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getincrementalencoder()。
def test_stateful(self):
# jisx0213 encoder is stateful for a few codepoints. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
self.assertEqual(encoder.encode('\u00e6'), b'')
self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
self.assertEqual(encoder.encode('\u00e6'), b'')
self.assertEqual(encoder.encode('', b'\xa9\xdc')
self.assertEqual(encoder.encode('', b'')
def test_all(self):
api = (
"encode", "decode",
"register", "CodecInfo", "Codec", "IncrementalEncoder",
"IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
"getencoder", "getdecoder", "getincrementalencoder",
"getincrementaldecoder", "getreader", "getwriter",
"register_error", "lookup_error",
"strict_errors", "replace_errors", "ignore_errors",
"xmlcharrefreplace_errors", "backslashreplace_errors",
"open", "EncodedFile",
"iterencode", "iterdecode",
"BOM", "BOM_BE", "BOM_LE",
"BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
"BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
"BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented
"StreamReaderWriter", "StreamRecoder",
)
self.assertEqual(sorted(api), sorted(codecs.__all__))
for api in codecs.__all__:
getattr(codecs, api)
def test_stateful(self):
# jisx0213 encoder is stateful for a few code points. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode(u'\u00e6\u0300'), '\xab\xc4')
self.assertEqual(encoder.encode(u'\u00e6'), '')
self.assertEqual(encoder.encode(u'\u0300'), '\xab\xc4')
self.assertEqual(encoder.encode(u'\u00e6', '\xa9\xdc')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode(u'\u0300'), '\xab\xdc')
self.assertEqual(encoder.encode(u'\u00e6'), '')
self.assertEqual(encoder.encode('', '\xa9\xdc')
self.assertEqual(encoder.encode('', '')
def test_stateful(self):
# jisx0213 encoder is stateful for a few code points. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode(u'\u00e6\u0300'), '')
def test_stateful(self):
# jisx0213 encoder is stateful for a few codepoints. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode('\u00e6\u0300'), b'')
def test_stateful(self):
# jisx0213 encoder is stateful for a few codepoints. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode(u'\u00e6\u0300'), '')
def test_all(self):
api = (
"encode",
)
self.assertCountEqual(api, codecs.__all__)
for api in codecs.__all__:
getattr(codecs, api)
def test_stateful(self):
# jisx0213 encoder is stateful for a few codepoints. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode(u'\u00e6\u0300'), '')
def test_stateful(self):
# jisx0213 encoder is stateful for a few codepoints. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode('\u00e6\u0300'), b'')
def __init__(self, f, dialect=csv.excel, encoding="utf-8-sig", **kwds):
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def _get_encoder(self):
make_encoder = codecs.getincrementalencoder(self._encoding)
self._encoder = make_encoder(self._errors)
return self._encoder
def __init__(self, encoding="utf-8", **kwds):
"""Init method."""
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def _get_encoder(self):
make_encoder = codecs.getincrementalencoder(self._encoding)
self._encoder = make_encoder(self._errors)
return self._encoder
def __init__(self, encoding, **kwds):
# Redirect output to a queue
import cStringIO
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, **kwds)
self.stream = f
self.encoding = encoding
self.encoder = codecs.getincrementalencoder(self.encoding)()
def set_tx_encoding(self, errors='replace'):
self.output_encoding = encoding
self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
def __init__(self, **kwargs):
"""
Instantiates the UnicodeWriter instance
:param f: File like object to write CSV data to
:param dialect: The dialect for the CSV
:param encoding: The CSV encoding
:param kwargs: Keyword args
"""
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, **kwargs)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def set_tx_encoding(self, errors='replace'):
self.output_encoding = encoding
self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
def __init__(self, fieldnames, newfile=True, **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.DictWriter(self.queue, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
if newfile:
self.writebom()
def __init__(self, **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def set_tx_encoding(self, errors='replace'):
"""set encoding for transmitted data"""
self.output_encoding = encoding
self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
def check_state_handling_encode(self, u, s):
for i in range(len(u)+1):
d = codecs.getincrementalencoder(encoding)()
part1 = d.encode(u[:i])
state = d.getstate()
d = codecs.getincrementalencoder(encoding)()
d.setstate(state)
part2 = d.encode(u[i:], True)
self.assertEqual(s, part1+part2)
def test_incremental_encode(self):
self.assertEqual(
b"".join(codecs.iterencode("python.org", "idna")),
b"python.org"
)
self.assertEqual(
b"".join(codecs.iterencode("python.org.",
b"python.org."
)
self.assertEqual(
b"".join(codecs.iterencode("pyth\xf6n.org.",
b"xn--pythn-mua.org."
)
self.assertEqual(
b"".join(codecs.iterencode("pyth\xf6n.org.",
b"xn--pythn-mua.org."
)
encoder = codecs.getincrementalencoder("idna")()
self.assertEqual(encoder.encode("\xe4x"), b"")
self.assertEqual(encoder.encode("ample.org"), b"xn--xample-9ta.")
self.assertEqual(encoder.encode("", b"org")
encoder.reset()
self.assertEqual(encoder.encode("\xe4x"), b"")
self.assertEqual(encoder.encode("ample.org."), b"xn--xample-9ta.org.")
self.assertEqual(encoder.encode("", b"")
def check_newline_decoding(self, decoder, encoding):
result = []
if encoding is not None:
encoder = codecs.getincrementalencoder(encoding)()
def _decode_bytewise(s):
# Decode one byte at a time
for b in encoder.encode(s):
result.append(decoder.decode(bytes([b])))
else:
encoder = None
def _decode_bytewise(s):
# Decode one char at a time
for c in s:
result.append(decoder.decode(c))
self.assertEqual(decoder.newlines, None)
_decode_bytewise("abc\n\r")
self.assertEqual(decoder.newlines, '\n')
_decode_bytewise("\nabc")
self.assertEqual(decoder.newlines, ('\n', '\r\n'))
_decode_bytewise("abc\r")
self.assertEqual(decoder.newlines, '\r\n'))
_decode_bytewise("abc")
self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
_decode_bytewise("abc\r")
self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
decoder.reset()
input = "abc"
if encoder is not None:
encoder.reset()
input = encoder.encode(input)
self.assertEqual(decoder.decode(input), "abc")
self.assertEqual(decoder.newlines, None)
def test_stateless(self):
# cp949 encoder isn't stateful at all.
encoder = codecs.getincrementalencoder('cp949')()
self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode('\u2606\u223c\u2606',
b'\xa1\xd9\xa1\xad\xa1\xd9')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode('', False), b'')
self.assertEqual(encoder.reset(), None)
def test_issue5640(self):
encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
self.assertEqual(encoder.encode('\xff'), b'\\xff')
self.assertEqual(encoder.encode('\n'), b'\n')
def test_incrementalencoder(self):
encoder = codecs.getincrementalencoder(self.encoding)()
output = b''.join(
encoder.encode(char)
for char in self.text)
self.assertEqual(output, self.expected)
def test_incrementalencoder_final(self):
encoder = codecs.getincrementalencoder(self.encoding)()
last_index = len(self.text) - 1
output = b''.join(
encoder.encode(char, index == last_index)
for index, char in enumerate(self.text))
self.assertEqual(output, self.expected_reset)
def _get_encoder(self):
make_encoder = codecs.getincrementalencoder(self._encoding)
self._encoder = make_encoder(self._errors)
return self._encoder
def set_tx_encoding(self, errors='replace'):
"""set encoding for transmitted data"""
self.output_encoding = encoding
self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
def set_tx_encoding(self, errors='replace'):
self.output_encoding = encoding
self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
def __init__(self, errors='replace', **kwds):
# Redirect output to a queue
self.queue = cStringIO.StringIO()
self.writer = csv.writer(self.queue, **kwds)
self.stream = f
encoder_cls = codecs.getincrementalencoder(encoding)
self.encoder = encoder_cls(errors=errors)
def test_incremental_encode(self):
self.assertEqual(
"".join(codecs.iterencode(u"python.org",
"python.org"
)
self.assertEqual(
"".join(codecs.iterencode(u"python.org.",
"python.org."
)
self.assertEqual(
"".join(codecs.iterencode(u"pyth\xf6n.org.",
"xn--pythn-mua.org."
)
self.assertEqual(
"".join(codecs.iterencode(u"pyth\xf6n.org.",
"xn--pythn-mua.org."
)
encoder = codecs.getincrementalencoder("idna")()
self.assertEqual(encoder.encode(u"\xe4x"), "")
self.assertEqual(encoder.encode(u"ample.org"), "xn--xample-9ta.")
self.assertEqual(encoder.encode(u"", "org")
encoder.reset()
self.assertEqual(encoder.encode(u"\xe4x"), "")
self.assertEqual(encoder.encode(u"ample.org."), "xn--xample-9ta.org.")
self.assertEqual(encoder.encode(u"", "")
def check_newline_decoding(self, encoding):
result = []
if encoding is not None:
encoder = codecs.getincrementalencoder(encoding)()
def _decode_bytewise(s):
# Decode one byte at a time
for b in encoder.encode(s):
result.append(decoder.decode(b))
else:
encoder = None
def _decode_bytewise(s):
# Decode one char at a time
for c in s:
result.append(decoder.decode(c))
self.assertEqual(decoder.newlines, None)
def test_stateless(self):
# cp949 encoder isn't stateful at all.
encoder = codecs.getincrementalencoder('cp949')()
self.assertEqual(encoder.encode(u'\ud30c\uc774\uc36c \ub9c8\uc744'),
'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode(u'\u2606\u223c\u2606',
'\xa1\xd9\xa1\xad\xa1\xd9')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode(u'', '')
self.assertEqual(encoder.encode(u'', '')
self.assertEqual(encoder.reset(), None)
def test_issue5640(self):
encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
self.assertEqual(encoder.encode(u'\xff'), b'\\xff')
self.assertEqual(encoder.encode(u'\n'), b'\n')
def test_incrementalencoder(self):
encoder = codecs.getincrementalencoder(self.encoding)()
output = b''.join(
encoder.encode(char)
for char in self.text)
self.assertEqual(output, self.expected)
def test_incrementalencoder_final(self):
encoder = codecs.getincrementalencoder(self.encoding)()
last_index = len(self.text) - 1
output = b''.join(
encoder.encode(char, self.expected_reset)
def _get_encoder(self):
make_encoder = codecs.getincrementalencoder(self._encoding)
self._encoder = make_encoder(self._errors)
return self._encoder
def test_incremental_encode(self):
self.assertEqual(
"".join(codecs.iterencode(u"python.org", "")
def test_stateless(self):
# cp949 encoder isn't stateful at all.
encoder = codecs.getincrementalencoder('cp949')()
self.assertEqual(encoder.encode(u'\ud30c\uc774\uc36c \ub9c8\uc744'), None)
def test_issue5640(self):
encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
self.assertEqual(encoder.encode(u'\xff'), b'\n')
def test_incrementalencoder(self):
encoder = codecs.getincrementalencoder(self.encoding)()
output = b''.join(
encoder.encode(char)
for char in self.text)
self.assertEqual(output, self.expected)
def test_incrementalencoder_final(self):
encoder = codecs.getincrementalencoder(self.encoding)()
last_index = len(self.text) - 1
output = b''.join(
encoder.encode(char, self.expected_reset)
def _get_encoder(self):
make_encoder = codecs.getincrementalencoder(self._encoding)
self._encoder = make_encoder(self._errors)
return self._encoder
def __init__(self, **kwds):
# Redirect output to a queue
self.queue = io.StringIO()
self.writer = csv.writer(self.queue, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def __init__(self, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()