Python codecs 模块,getencoder() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用codecs.getencoder()。
def __init__(self, stream, fieldnames, encoding='utf-8', **kwds):
"""Initialzer.
Args:
stream: Stream to write to.
fieldnames: Fieldnames to pass to the DictWriter.
encoding: Desired encoding.
kwds: Additional arguments to pass to the DictWriter.
"""
writer = codecs.getwriter(encoding)
if (writer is encodings.utf_8.StreamWriter or
writer is encodings.ascii.StreamWriter or
writer is encodings.latin_1.StreamWriter or
writer is encodings.cp1252.StreamWriter):
self.no_recoding = True
self.encoder = codecs.getencoder(encoding)
self.writer = csv.DictWriter(stream, **kwds)
else:
self.no_recoding = False
self.encoder = codecs.getencoder('utf-8')
self.queue = cStringIO.StringIO()
self.writer = csv.DictWriter(self.queue, **kwds)
self.stream = writer(stream)
def test_all(self):
api = (
"encode", "decode",
"register", "CodecInfo", "Codec", "IncrementalEncoder",
"IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
"getencoder", "getdecoder", "getincrementalencoder",
"getincrementaldecoder", "getreader", "getwriter",
"register_error", "lookup_error",
"strict_errors", "replace_errors", "ignore_errors",
"xmlcharrefreplace_errors", "backslashreplace_errors",
"open", "EncodedFile",
"iterencode", "iterdecode",
"BOM", "BOM_BE", "BOM_LE",
"BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
"BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
"BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented
"StreamReaderWriter", "StreamRecoder",
)
self.assertEqual(sorted(api), sorted(codecs.__all__))
for api in codecs.__all__:
getattr(codecs, api)
def get_tlsa_records(resolver, name):
"""Extracts all TLSA records for a given name"""
logging.debug("searching for TLSA record on %s", name)
s, r = resolver.resolve(name, rrtype=RR_TYPE_TLSA)
if 0 != s:
ub_strerror(s)
return
if r.data is None:
logging.warn("No TLSA record returned")
return set()
result = set()
for record in r.data.data:
hexencoder = codecs.getencoder('hex')
usage = ord(record[0])
selector = ord(record[1])
matching = ord(record[2])
data = record[3:]
result.add(TLSARecord(usage, selector, matching, data))
return result
def testInvalidUtf8(self):
def DoTest(self, raw_bytes, has_invalid_utf8):
error_collector = ErrorCollector(self.assert_)
cpplint.ProcessFileData(
'foo.cc', 'cc',
unicode(raw_bytes, 'utf8', 'replace').split('\n'),
error_collector)
# The warning appears only once.
self.assertEquals(
int(has_invalid_utf8),
error_collector.Results().count(
'Line contains invalid UTF-8'
' (or Unicode replacement character).'
' [readability/utf8] [5]'))
DoTest(self, 'Hello world\n', False)
DoTest(self, '\xe9\x8e\xbd\n', '\xe9x\x8e\xbd\n', True)
# This is the encoding of the replacement character itself (which
# you can see by evaluating codecs.getencoder('utf8')(u'\ufffd')).
DoTest(self, '\xef\xbf\xbd\n', True)
def test_all(self):
api = (
"encode",
)
self.assertCountEqual(api, codecs.__all__)
for api in codecs.__all__:
getattr(codecs, api)
def bind_processor(self, dialect):
if self.convert_unicode or dialect.convert_unicode:
if dialect.supports_unicode_binds and \
self.convert_unicode != 'force':
if self._warn_on_bytestring:
def process(value):
if isinstance(value, util.binary_type):
util.warn_limited(
"Unicode type received non-unicode "
"bind param value %r.",
(util.ellipses_string(value),))
return value
return process
else:
return None
else:
encoder = codecs.getencoder(dialect.encoding)
warn_on_bytestring = self._warn_on_bytestring
def process(value):
if isinstance(value, util.text_type):
return encoder(value, self.unicode_error)[0]
elif warn_on_bytestring and value is not None:
util.warn_limited(
"Unicode type received non-unicode bind "
"param value %r.",
(util.ellipses_string(value),))
return value
return process
else:
return None
def quote_value(self, value):
# The backend "mostly works" without this function and there are use
# cases for compiling Python without the sqlite3 libraries (e.g.
# security hardening).
import sqlite3
try:
value = sqlite3.adapt(value)
except sqlite3.ProgrammingError:
pass
# Manual emulation of sqlite parameter quoting
if isinstance(value, type(True)):
return str(int(value))
elif isinstance(value, (Decimal, float)):
return str(value)
elif isinstance(value, six.integer_types):
return str(value)
elif isinstance(value, six.string_types):
return "'%s'" % six.text_type(value).replace("\'", "\'\'")
elif value is None:
return "NULL"
elif isinstance(value, (bytes, bytearray, six.memoryview)):
# Bytes are only allowed for BLOB fields,encoded as string
# literals containing hexadecimal data and preceded by a single "X"
# character:
# value = b'\x01\x02' => value_hex = b'0102' => return X'0102'
value = bytes(value)
hex_encoder = codecs.getencoder('hex_codec')
value_hex, _length = hex_encoder(value)
# Use 'ascii' encoding for b'01' => '01',no need to use force_text here.
return "X'%s'" % value_hex.decode('ascii')
else:
raise ValueError("Cannot quote parameter value %r of type %s" % (value, type(value)))
def _bytes_as_hex_str(cls, b):
b_as_hex = codecs.getencoder('hex_codec')(b)[0]
b_as_hex_str = b_as_hex.decode(cls.DNS_CTYPE)
return b_as_hex_str
def iter(self, fields, files):
"""
:param fields: sequence of (name,value) elements for regular form fields
:param files: sequence of (name,filename,contenttype,filedata) elements for data to be uploaded as files
:return:
"""
encoder = codecs.getencoder('utf-8')
for (key, value) in fields:
key = self.u(key)
yield encoder('--{}\r\n'.format(self.boundary))
yield encoder(self.u('Content-disposition: form-data; name="{}"\r\n').format(key))
yield encoder('\r\n')
if isinstance(value, (int, float)):
value = str(value)
yield encoder(self.u(value))
yield encoder('\r\n')
for (key, filename, contenttype, fd) in files:
key = self.u(key)
filename = self.u(filename)
yield encoder('--{}\r\n'.format(self.boundary))
yield encoder(self.u('Content-disposition: form-data; name="{}"; filename="{}"\r\n').format(key, filename))
yield encoder('Content-Type: {}\r\n'.format(
contenttype or mimetypes.guess_type(filename)[0] or 'application/octet-stream'))
yield encoder('Content-transfer-encoding: binary\r\n')
yield encoder('\r\n')
yield (fd, len(fd))
yield encoder('\r\n')
yield encoder('--{}--\r\n'.format(self.boundary))
def iter(self, len(fd))
yield encoder('\r\n')
yield encoder('--{}--\r\n'.format(self.boundary))
def bind_processor(self,))
return value
return process
else:
return None
def hexencode(b):
if sys.version_info[0] < 3:
return b.encode('hex')
else:
import codecs
coder = codecs.getencoder('hex_codec')
return coder(b)[0]
def _bytes_to_hex(bs):
return codecs.getencoder('hex')(bs)[0]
def bind_processor(self, util.binary_type):
util.warn("Unicode type received non-unicode"
"bind param value.")
return value
return process
else:
return None
else:
encoder = codecs.getencoder(dialect.encoding)
warn_on_bytestring = self._warn_on_bytestring
def process(value):
if isinstance(value, self.unicode_error)[0]
elif warn_on_bytestring and value is not None:
util.warn("Unicode type received non-unicode bind "
"param value")
return value
return process
else:
return None
def bind_processor(self, self.unicode_error)[0]
elif warn_on_bytestring and value is not None:
util.warn("Unicode type received non-unicode bind "
"param value")
return value
return process
else:
return None
def test_encode_length(self):
# Issue 3739
encoder = codecs.getencoder("unicode_internal")
self.assertEqual(encoder("a")[1], 1)
self.assertEqual(encoder("\xe9\u0142")[1], 2)
self.assertEqual(codecs.escape_encode(br'\x00')[1], 4)
# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
def test_getencoder(self):
self.assertRaises(TypeError, codecs.getencoder)
self.assertRaises(LookupError, codecs.getencoder, "__spam__")
def test_bad_encode_args(self):
for encoding in all_unicode_encodings:
encoder = codecs.getencoder(encoding)
self.assertRaises(TypeError, encoder)
def test_basics(self):
binput = bytes(range(256))
for encoding in bytes_transform_encodings:
# generic codecs interface
(o, size) = codecs.getencoder(encoding)(binput)
self.assertEqual(size, len(binput))
(i, size) = codecs.getdecoder(encoding)(o)
self.assertEqual(size, len(o))
self.assertEqual(i, binput)
def bind_processor(self,))
return value
return process
else:
return None
def bind_processor(self,))
return value
return process
else:
return None
def bind_processor(self, dialect):
if self.convert_unicode or dialect.convert_unicode:
if dialect.supports_unicode_binds and \
self.convert_unicode != 'force':
if self._warn_on_bytestring:
def process(value):
# Py3K
#if isinstance(value,bytes):
# Py2K
if isinstance(value, str):
# end Py2K
util.warn("Unicode type received non-unicode bind "
"param value.")
return value
return process
else:
return None
else:
encoder = codecs.getencoder(dialect.encoding)
warn_on_bytestring = self._warn_on_bytestring
def process(value):
if isinstance(value, unicode):
return encoder(value, self.unicode_error)[0]
elif warn_on_bytestring and value is not None:
util.warn("Unicode type received non-unicode bind "
"param value")
return value
return process
else:
return None
def __init__(self, output_encoding='utf-8', input_encoding='utf-8',
fallback_encoding='iso-8859-1'):
self.output_codec = codecs.getencoder(output_encoding)
self.input_decoder = codecs.getdecoder(input_encoding)
self.fallback_decoder = codecs.getdecoder(fallback_encoding)
self._input_buffer = bytearray()
self._output_buffer = bytearray()
self._closed = False
def quote_value(self, value):
# The backend "mostly works" without this function and there are use
# cases for compiling Python without the sqlite3 libraries (e.g.
# security hardening).
import _sqlite3
try:
value = _sqlite3.adapt(value)
except _sqlite3.ProgrammingError:
pass
# Manual emulation of sqlite parameter quoting
if isinstance(value, type(value)))
def bind_processor(self,))
return value
return process
else:
return None
def generate_session_id():
"""Generates a random session ID for Steam.
Returns
-------
str
A random 12 byte hex string.
"""
return codecs.getencoder('hex')(Random.get_random_bytes(12))[0].decode('utf-8')
def test_encode_length(self):
# Issue 3739
encoder = codecs.getencoder("unicode_internal")
self.assertEqual(encoder(u"a")[1], 1)
self.assertEqual(encoder(u"\xe9\u0142")[1], 2)
encoder = codecs.getencoder("string-escape")
self.assertEqual(encoder(r'\x00')[1], 4)
# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
def test_getencoder(self):
self.assertRaises(TypeError, "__spam__")
def test_bad_encode_args(self):
for encoding in all_unicode_encodings:
encoder = codecs.getencoder(encoding)
self.assertRaises(TypeError, encoder)
def test_encode_length(self):
# Issue 3739
encoder = codecs.getencoder("unicode_internal")
self.assertEqual(encoder(u"a")[1], 4)
# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
def test_getencoder(self):
self.assertRaises(TypeError, "__spam__")
def test_bad_encode_args(self):
for encoding in all_unicode_encodings:
encoder = codecs.getencoder(encoding)
self.assertRaises(TypeError, encoder)
def test_basics(self):
s = "abc123"
for encoding in all_string_encodings:
(bytes, encoding))
def iter(self, files):
"""
fields is a sequence of (name,value) elements for regular form fields.
files is a sequence of (name,file-type) elements for data to be uploaded as files
Yield body's chunk as bytes
"""
encoder = codecs.getencoder('utf-8')
for (key, int) or isinstance(value, float):
value = str(value)
yield encoder(self.u(value))
yield encoder('\r\n')
for (key, filename))
yield encoder('Content-Type: {}\r\n'.format(mimetypes.guess_type(filename)[0] or 'application/octet-stream'))
yield encoder('\r\n')
with fd:
buff = fd.read()
yield (buff, len(buff))
yield encoder('\r\n')
yield encoder('--{}--\r\n'.format(self.boundary))
def bind_processor(self,))
return value
return process
else:
return None
def bind_processor(self,))
return value
return process
else:
return None
def quote_value(self, value):
try:
value = _sqlite3.adapt(value)
except _sqlite3.ProgrammingError:
pass
# Manual emulation of sqlite parameter quoting
if isinstance(value, type(value)))
def bind_processor(self,))
return value
return process
else:
return None
def test_encode_length(self):
with support.check_warnings(('unicode_internal codec has been '
'deprecated', DeprecationWarning)):
# Issue 3739
encoder = codecs.getencoder("unicode_internal")
self.assertEqual(encoder("a")[1], 1)
self.assertEqual(encoder("\xe9\u0142")[1], 2)
self.assertEqual(codecs.escape_encode(br'\x00')[1], 4)
# From http://www.gnu.org/software/libidn/draft-josefsson-idn-test-vectors.html
def test_getencoder(self):
self.assertRaises(TypeError, "__spam__")
def test_bad_encode_args(self):
for encoding in all_unicode_encodings:
encoder = codecs.getencoder(encoding)
with support.check_warnings():
# unicode-internal has been deprecated
self.assertRaises(TypeError, encoder)
def test_basics(self):
binput = bytes(range(256))
for encoding in bytes_transform_encodings:
# generic codecs interface
(o, binput)
def bind_processor(self,))
return value
return process
else:
return None
def quote_value(self, value):
# The backend "mostly works" without this function and there are use
# cases for compiling Python without the sqlite3 libraries (e.g.
# security hardening).
try:
import sqlite3
value = sqlite3.adapt(value)
except ImportError:
pass
except sqlite3.ProgrammingError:
pass
# Manual emulation of sqlite parameter quoting
if isinstance(value, type(value)))
def __repr__(self):
hexencoder = codecs.getencoder('hex')
return '<TLSA %d %d %d %s>' % (self._usage, self._selector, self._matching, hexencoder(self._payload)[0].decode())
def codepage(self, codepage):
if not isinstance(codepage, CodePage):
raise TypeError("codepage should be a CodePage,not a %r" % type(codepage))
Meta = self._Meta
if Meta.status != READ_WRITE:
raise DbfError('%s not in read/write mode,unable to change codepage' % Meta.filename)
Meta.header.codepage(codepage.code)
Meta.decoder = codecs.getdecoder(codepage.name)
Meta.encoder = codecs.getencoder(codepage.name)
self._update_disk(headeronly=True)