Python encodings 模块,idna() 实例源码
我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用encodings.idna()。
def test_nameprep(self):
from encodings.idna import nameprep
for pos, (orig, prepped) in enumerate(nameprep_tests):
if orig is None:
# Skipped
continue
# The Unicode strings are given in UTF-8
orig = str(orig, "utf-8", "surrogatepass")
if prepped is None:
# Input contains prohibited characters
self.assertRaises(UnicodeError, nameprep, orig)
else:
prepped = str(prepped, "surrogatepass")
try:
self.assertEqual(nameprep(orig), prepped)
except Exception as e:
raise support.TestFailed("Test 3.%d: %s" % (pos+1, str(e)))
def test_nameprep(self):
from encodings.idna import nameprep
for pos, str(e)))
def __call__(self, value):
if not(isinstance(value, (basestring, unicodeT))) or not value or '@' not in value:
return (value, translate(self.error_message))
body, domain = value.rsplit('@', 1)
try:
match_body = self.body_regex.match(body)
match_domain = self.domain_regex.match(domain)
if not match_domain:
# check for Internationalized Domain Names
# see https://docs.python.org/2/library/codecs.html#module-encodings.idna
domain_encoded = to_unicode(domain).encode('idna').decode('ascii')
match_domain = self.domain_regex.match(domain_encoded)
match = (match_body is not None) and (match_domain is not None)
except (TypeError, UnicodeError):
# Value may not be a string where we can look for matches.
# Example: we're calling ANY_OF formatter and IS_EMAIL is asked to validate a date.
match = None
if match:
if (not self.banned or not self.banned.match(domain)) \
and (not self.forced or self.forced.match(domain)):
return (value, None)
return (value, translate(self.error_message))
def _write_SOCKS5_address(self, addr, file):
"""
Return the host and port packed for the SOCKS5 protocol,
and the resolved address as a tuple object.
"""
host, port = addr
proxy_type, _, rdns, username, password = self.proxy
if ":" in host:
addr_bytes = socket.inet_pton(socket.AF_INET6, host)
file.write(b"\x04" + addr_bytes)
elif check_ip_valid(host):
addr_bytes = socket.inet_pton(socket.AF_INET, host)
file.write(b"\x01" + addr_bytes)
else:
if rdns:
# Resolve remotely
host_bytes = host.encode('idna')
file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
else:
# Resolve locally
addr_bytes = socket.inet_aton(socket.gethostbyname(host))
file.write(b"\x01" + addr_bytes)
host = socket.inet_ntoa(addr_bytes)
file.write(struct.pack(">H", port))
return host, port
def test_builtin_decode(self):
self.assertEqual(str(b"python.org", "idna"), "python.org")
self.assertEqual(str(b"python.org.", "python.org.")
self.assertEqual(str(b"xn--pythn-mua.org", "pyth\xf6n.org")
self.assertEqual(str(b"xn--pythn-mua.org.", "pyth\xf6n.org.")
def test_builtin_encode(self):
self.assertEqual("python.org".encode("idna"), b"python.org")
self.assertEqual("python.org.".encode("idna"), b"python.org.")
self.assertEqual("pyth\xf6n.org".encode("idna"), b"xn--pythn-mua.org")
self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
def test_stream(self):
r = codecs.getreader("idna")(io.BytesIO(b"abc"))
r.read(3)
self.assertEqual(r.read(), "")
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
"python.org"
)
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org."),
"python.org."
)
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."),
"pyth\xf6n.org."
)
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."),
"pyth\xf6n.org."
)
decoder = codecs.getincrementaldecoder("idna")()
self.assertEqual(decoder.decode(b"xn--xam", ), "")
self.assertEqual(decoder.decode(b"ple-9ta.o", "\xe4xample.")
self.assertEqual(decoder.decode(b"rg"), "")
self.assertEqual(decoder.decode(b"", True), "org")
decoder.reset()
self.assertEqual(decoder.decode(b"xn--xam", "\xe4xample.")
self.assertEqual(decoder.decode(b"rg."), "org.")
self.assertEqual(decoder.decode(b"", "")
def test_errors(self):
"""Only supports "strict" error handler"""
"python.org".encode("idna", "strict")
b"python.org".decode("idna", "strict")
for errors in ("ignore", "replace", "backslashreplace",
"surrogateescape"):
self.assertRaises(Exception, "python.org".encode, "idna", errors)
self.assertRaises(Exception,
b"python.org".decode, errors)
def test_basics_capi(self):
from _testcapi import codec_incrementalencoder, codec_incrementaldecoder
s = "abc123" # all codecs should be able to encode these
for encoding in all_unicode_encodings:
if encoding not in broken_unicode_with_stateful:
# check incremental decoder/encoder (fetched via the C API)
try:
cencoder = codec_incrementalencoder(encoding)
except LookupError: # no IncrementalEncoder
pass
else:
# check C API
encodedresult = b""
for c in s:
encodedresult += cencoder.encode(c)
encodedresult += cencoder.encode("", True)
cdecoder = codec_incrementaldecoder(encoding)
decodedresult = ""
for c in encodedresult:
decodedresult += cdecoder.decode(bytes([c]))
decodedresult += cdecoder.decode(b"", True)
self.assertEqual(decodedresult, s,
"encoding=%r" % encoding)
if encoding not in ("idna", "mbcs"):
# check incremental decoder/encoder with errors argument
try:
cencoder = codec_incrementalencoder(encoding, "ignore")
except LookupError: # no IncrementalEncoder
pass
else:
encodedresult = b"".join(cencoder.encode(c) for c in s)
cdecoder = codec_incrementaldecoder(encoding, "ignore")
decodedresult = "".join(cdecoder.decode(bytes([c]))
for c in encodedresult)
self.assertEqual(decodedresult,
"encoding=%r" % encoding)
def test_seek(self):
# all codecs should be able to encode these
s = "%s\n%s\n" % (100*"abc123", 100*"def456")
for encoding in all_unicode_encodings:
if encoding == "idna": # FIXME: See SF bug #1163178
continue
if encoding in broken_unicode_with_stateful:
continue
reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
for t in range(5):
# Test that calling seek resets the internal codec state and buffers
reader.seek(0, 0)
data = reader.read()
self.assertEqual(s, data)
def test_bad_decode_args(self):
for encoding in all_unicode_encodings:
decoder = codecs.getdecoder(encoding)
self.assertRaises(TypeError, decoder)
if encoding not in ("idna", "punycode"):
self.assertRaises(TypeError, decoder, 42)
def test_builtin_decode(self):
self.assertEqual(str(b"python.org", "pyth\xf6n.org.")
def test_builtin_encode(self):
self.assertEqual("python.org".encode("idna"), b"xn--pythn-mua.org.")
def test_stream(self):
r = codecs.getreader("idna")(io.BytesIO(b"abc"))
r.read(3)
self.assertEqual(r.read(), "")
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "")
def test_basics_capi(self):
from _testcapi import codec_incrementalencoder, codec_incrementaldecoder
s = "abc123" # all codecs should be able to encode these
for encoding in all_unicode_encodings:
if encoding not in broken_incremental_coders:
# check incremental decoder/encoder (fetched via the C API)
try:
cencoder = codec_incrementalencoder(encoding)
except LookupError: # no IncrementalEncoder
pass
else:
# check C API
encodedresult = b""
for c in s:
encodedresult += cencoder.encode(c)
encodedresult += cencoder.encode("",
"encoding=%r" % encoding)
def test_seek(self):
# all codecs should be able to encode these
s = "%s\n%s\n" % (100*"abc123", 100*"def456")
for encoding in all_unicode_encodings:
if encoding == "idna": # FIXME: See SF bug #1163178
continue
if encoding in broken_unicode_with_streams:
continue
reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
for t in range(5):
# Test that calling seek resets the internal codec state and buffers
reader.seek(0, data)
def test_bad_decode_args(self):
for encoding in all_unicode_encodings:
decoder = codecs.getdecoder(encoding)
self.assertRaises(TypeError, 42)
def unicode_to_ascii_authority(authority):
"""
Follows the steps in RFC 3490,Section 4 to convert a unicode authority
string into its ASCII equivalent.
For example,u'www.Alliancefran\xe7aise.nu' will be converted into
'www.xn--alliancefranaise-npb.nu'
Args:
authority: unicode string,the URL authority component to convert,
e.g. u'www.Alliancefran\xe7aise.nu'
Returns:
string: the US-ASCII character equivalent to the inputed authority,
e.g. 'www.xn--alliancefranaise-npb.nu'
Raises:
Exception: if the function is not able to convert the inputed
authority
@author: Jonathan Benn
"""
# RFC 3490,Section 4,Step 1
# The encodings.idna Python module assumes that AllowUnassigned == True
# RFC 3490,Step 2
labels = label_split_regex.split(authority)
# RFC 3490,Step 3
# The encodings.idna Python module assumes that UseSTD3ASCIIRules == False
# RFC 3490,Step 4
# We use the ToASCII operation because we are about to put the authority
# into an IDN-unaware slot
asciiLabels = []
import encodings.idna
for label in labels:
if label:
asciiLabels.append(to_native(encodings.idna.ToASCII(label)))
else:
# encodings.idna.ToASCII does not accept an empty string,but
# it is necessary for us to allow for empty labels so that we
# don't modify the URL
asciiLabels.append('')
# RFC 3490,Step 5
return str(reduce(lambda x, y: x + unichr(0x002E) + y, asciiLabels))
def _negotiate_SOCKS4(self, dest_addr, dest_port):
"""
Negotiates a connection through a SOCKS4 server.
"""
proxy_type, port, password = self.proxy
writer = self.makefile("wb")
reader = self.makefile("rb", 0) # buffering=0 renamed in Python 3
try:
# Check if the destination address provided is an IP address
remote_resolve = False
try:
addr_bytes = socket.inet_aton(dest_addr)
except socket.error:
# It's a DNS name. Check where it should be resolved.
if rdns:
addr_bytes = b"\x00\x00\x00\x01"
remote_resolve = True
else:
addr_bytes = socket.inet_aton(socket.gethostbyname(dest_addr))
# Construct the request packet
writer.write(struct.pack(">BBH", 0x04, 0x01, dest_port))
writer.write(addr_bytes)
# The username parameter is considered userid for SOCKS4
if username:
writer.write(username)
writer.write(b"\x00")
# DNS name if remote resolving is required
# NOTE: This is actually an extension to the SOCKS4 protocol
# called SOCKS4A and may not be supported in all cases.
if remote_resolve:
writer.write(dest_addr.encode('idna') + b"\x00")
writer.flush()
# Get the response from the server
resp = self._readall(reader, 8)
if resp[0:1] != b"\x00":
# Bad data
raise GeneralProxyError("SOCKS4 proxy server sent invalid data")
status = ord(resp[1:2])
if status != 0x5A:
# Connection Failed: server returned an error
error = SOCKS4_ERRORS.get(status, "UnkNown error")
raise SOCKS4Error("{0:#04x}: {1}".format(status, error))
# Get the bound address/port
self.proxy_sockname = (socket.inet_ntoa(resp[4:]), struct.unpack(">H", resp[2:4])[0])
if remote_resolve:
self.proxy_peername = socket.inet_ntoa(addr_bytes), dest_port
else:
self.proxy_peername = dest_addr, dest_port
finally:
reader.close()
writer.close()
def _negotiate_HTTP(self, dest_port):
"""
Negotiates a connection through an HTTP server.
NOTE: This currently only supports HTTP CONNECT-style proxies.
"""
proxy_type, password = self.proxy
# If we need to resolve locally,we do this Now
addr = dest_addr if rdns else socket.gethostbyname(dest_addr)
self.sendall(b"CONNECT " + addr.encode('idna') + b":" + str(dest_port).encode() +
b" HTTP/1.1\r\n" + b"Host: " + dest_addr.encode('idna') + b"\r\n\r\n")
# We just need the first line to check if the connection was successful
fobj = self.makefile()
status_line = fobj.readline()
fobj.close()
if not status_line:
raise GeneralProxyError("Connection closed unexpectedly")
try:
proto, status_code, status_msg = status_line.split(" ", 2)
except ValueError:
raise GeneralProxyError("HTTP proxy server sent invalid response")
if not proto.startswith("HTTP/"):
raise GeneralProxyError("Proxy server does not appear to be an HTTP proxy")
try:
status_code = int(status_code)
except ValueError:
raise HTTPError("HTTP proxy server did not return a valid HTTP status")
if status_code != 200:
error = "{0}: {1}".format(status_code, status_msg)
if status_code in (400, 403, 405):
# It's likely that the HTTP proxy server does not support the CONNECT tunneling method
error += ("\n[*] Note: The HTTP proxy server may not be supported by PySocks"
" (must be a CONNECT tunnel proxy)")
raise HTTPError(error)
self.proxy_sockname = (b"0.0.0.0", 0)
self.proxy_peername = addr, dest_port
def _negotiate_SOCKS4(self, dest_port
finally:
reader.close()
writer.close()
def valid_url(self, value):
match = self.URL_REGEX.match(value)
if not match:
return False
url = match.groupdict()
if url['scheme'].lower() not in self.schemes:
return False
if url['host6']:
if IPv6Type.valid_ip(url['host6']):
return url
else:
return False
if url['host4']:
return url
try:
hostname = url['hostn'].encode('ascii').decode('ascii')
except UnicodeError:
try:
hostname = url['hostn'].encode('idna').decode('ascii')
except UnicodeError:
return False
if hostname[-1] == '.':
hostname = hostname[:-1]
if len(hostname) > 253:
return False
labels = hostname.split('.')
for label in labels:
if not 0 < len(label) < 64:
return False
if '-' in (label[0], label[-1]):
return False
if self.fqdn:
if len(labels) == 1 \
or not self.TLD_REGEX.match(labels[-1]):
return False
url['hostn_enc'] = hostname
return url
def valid_url(self, label[-1]):
return False
if self.fqdn:
if len(labels) == 1 \
or not self.TLD_REGEX.match(labels[-1]):
return False
url['hostn_enc'] = hostname
return url
def valid_url(self, label[-1]):
return False
if self.fqdn:
if len(labels) == 1 \
or not self.TLD_REGEX.match(labels[-1]):
return False
url['hostn_enc'] = hostname
return url
def _negotiate_SOCKS4(self, dest_port
finally:
reader.close()
writer.close()
def _negotiate_SOCKS4(self, dest_port
finally:
reader.close()
writer.close()
def _negotiate_SOCKS4(self, dest_port
finally:
reader.close()
writer.close()