Python codecs 模块,iterdecode() 实例源码
我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用codecs.iterdecode()。
def parse_csv_file(thefile):
"""Parse csv file,yielding rows as dictionary.
The csv file should have an header.
Args:
thefile (file): File like object
Yields:
dict: Dictionary with column header name as key and cell as value
"""
reader = csv.reader(codecs.iterdecode(thefile, 'ISO-8859-1'))
# read header
colnames = next(reader)
# data rows
for row in reader:
pdb = {}
for k, v in zip(colnames, row):
if v is '':
v = None
pdb[k] = v
yield pdb
def test_all(self):
api = (
"encode", "decode",
"register", "CodecInfo", "Codec", "IncrementalEncoder",
"IncrementalDecoder", "StreamReader", "StreamWriter", "lookup",
"getencoder", "getdecoder", "getincrementalencoder",
"getincrementaldecoder", "getreader", "getwriter",
"register_error", "lookup_error",
"strict_errors", "replace_errors", "ignore_errors",
"xmlcharrefreplace_errors", "backslashreplace_errors",
"open", "EncodedFile",
"iterencode", "iterdecode",
"BOM", "BOM_BE", "BOM_LE",
"BOM_UTF8", "BOM_UTF16", "BOM_UTF16_BE", "BOM_UTF16_LE",
"BOM_UTF32", "BOM_UTF32_BE", "BOM_UTF32_LE",
"BOM32_BE", "BOM32_LE", "BOM64_BE", "BOM64_LE", # Undocumented
"StreamReaderWriter", "StreamRecoder",
)
self.assertEqual(sorted(api), sorted(codecs.__all__))
for api in codecs.__all__:
getattr(codecs, api)
def test_all(self):
api = (
"encode",
)
self.assertCountEqual(api, codecs.__all__)
for api in codecs.__all__:
getattr(codecs, api)
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "idna")),
"python.org"
)
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org."),
"python.org."
)
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."),
"pyth\xf6n.org."
)
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"xn--pythn-mua.org."),
"pyth\xf6n.org."
)
decoder = codecs.getincrementaldecoder("idna")()
self.assertEqual(decoder.decode(b"xn--xam", ), "")
self.assertEqual(decoder.decode(b"ple-9ta.o", "\xe4xample.")
self.assertEqual(decoder.decode(b"rg"), "")
self.assertEqual(decoder.decode(b"", True), "org")
decoder.reset()
self.assertEqual(decoder.decode(b"xn--xam", "\xe4xample.")
self.assertEqual(decoder.decode(b"rg."), "org.")
self.assertEqual(decoder.decode(b"", "")
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode("python.org",
u"python.org"
)
self.assertEqual(
"".join(codecs.iterdecode("python.org.",
u"python.org."
)
self.assertEqual(
"".join(codecs.iterdecode("xn--pythn-mua.org.",
u"pyth\xf6n.org."
)
self.assertEqual(
"".join(codecs.iterdecode("xn--pythn-mua.org.",
u"pyth\xf6n.org."
)
decoder = codecs.getincrementaldecoder("idna")()
self.assertEqual(decoder.decode("xn--xam", u"")
self.assertEqual(decoder.decode("ple-9ta.o", u"\xe4xample.")
self.assertEqual(decoder.decode(u"rg"), u"")
self.assertEqual(decoder.decode(u"", u"org")
decoder.reset()
self.assertEqual(decoder.decode("xn--xam", u"\xe4xample.")
self.assertEqual(decoder.decode("rg."), u"org.")
self.assertEqual(decoder.decode("", u"")
def test_basics_capi(self):
from _testcapi import codec_incrementalencoder, codec_incrementaldecoder
s = u"abc123" # all codecs should be able to encode these
for encoding in all_unicode_encodings:
if encoding not in broken_incremental_coders:
# check incremental decoder/encoder and iterencode()/iterdecode()
try:
cencoder = codec_incrementalencoder(encoding)
except LookupError: # no IncrementalEncoder
pass
else:
# check C API
encodedresult = ""
for c in s:
encodedresult += cencoder.encode(c)
encodedresult += cencoder.encode(u"", True)
cdecoder = codec_incrementaldecoder(encoding)
decodedresult = u""
for c in encodedresult:
decodedresult += cdecoder.decode(c)
decodedresult += cdecoder.decode("", True)
self.assertEqual(decodedresult, s,
"encoding=%r" % encoding)
if encoding not in only_strict_mode:
# check incremental decoder/encoder with errors argument
try:
cencoder = codec_incrementalencoder(encoding, "ignore")
except LookupError: # no IncrementalEncoder
pass
else:
encodedresult = "".join(cencoder.encode(c) for c in s)
cdecoder = codec_incrementaldecoder(encoding, "ignore")
decodedresult = u"".join(cdecoder.decode(c)
for c in encodedresult)
self.assertEqual(decodedresult,
"encoding=%r" % encoding)
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode("python.org", u"")
def test_basics_capi(self):
from _testcapi import codec_incrementalencoder,
"encoding=%r" % encoding)
def _fetch(self):
Feed = {}
try:
for base in self.bases:
Feed[base] = {}
for quote in self.quotes:
if quote == base:
continue
ticker = "%s%s" % (quote, base)
url = (
'http://www.google.com/finance/getprices'
'?i={period}&p={days}d&f=d,c&df=cpct&q={ticker}'
).format(ticker=ticker, period=self.period, days=self.days)
response = requests.get(url=url, headers=_request_headers, timeout=self.timeout)
reader = csv.reader(codecs.iterdecode(response.content.splitlines(), "utf-8"))
prices = []
for row in reader:
if re.match('^[a\d]', row[0]):
prices.append(float(row[1]))
if hasattr(self, "quoteNames") and quote in self.quoteNames:
quote = self.quoteNames[quote]
Feed[base][quote] = {"price": sum(prices) / len(prices),
"volume": 1.0}
except Exception as e:
raise Exception("\nError fetching results from {1}! ({0})".format(str(e), type(self).__name__))
return Feed
def wmo_importer(url='http://tgftp.nws.noaa.gov/data/nsd_bbsss.txt'):
if PY2:
delimiter = b';'
data = urlopen(url)
else:
delimiter = ';'
import codecs
data = codecs.iterdecode(urlopen(url), 'utf-8')
reader = csv.reader(data, delimiter=delimiter, quoting=csv.QUOTE_NONE)
def geo_normalize(value):
# recognize NSEW or undefined (which is interpreted as north)
orientation = value[-1]
sign = -1 if orientation in 'SW' else 1
coords = value if orientation not in 'NEWS' else value[:-1]
coords += '-0-0' # ensure missing seconds or minutes are 0
degrees, minutes, seconds = map(float, coords.split('-', 3)[:3])
return sign * (degrees + (minutes / 60) + (seconds / 3600))
not_airport = '----'
for row in reader:
name = row[0] + row[1] if row[2] == not_airport else row[2]
yield name, geo_normalize(row[8]), geo_normalize(row[7])
# dependence between hashtag's precision and distance accurate calculating
# in fact it's sizes of grids in km
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "")
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode("python.org", u"")
def test_basics_capi(self):
from _testcapi import codec_incrementalencoder,
"encoding=%r" % encoding)
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "")
def _add_csv_file_to_db(self, decoder):
f = codecs.iterdecode(
self.upload_file_form.cleaned_data['marketing_file'],
decoder
)
reader = csv.reader(f)
if not self.uploaded_file:
new_file = UploadedFile(
filename=self.upload_file_form.cleaned_data['marketing_file'].name,
uploaded_by=self.request.user,
num_columns=0,
)
new_file.save()
self.uploaded_file = new_file
is_first_row = True
self.num_cols = None
row_number = 0
for row in reader:
if not self.num_cols:
self.num_cols = len(row)
if self._csv_row_is_not_blank(row):
self._add_csv_row_to_db(row, is_first_row, row_number)
is_first_row = False
row_number += 1
if self.num_cols:
self.uploaded_file.num_columns = self.num_cols
self.uploaded_file.save()
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode("python.org", u"")
def load_from_ktipp(self):
""" load the blacklist from ktipp. """
url = 'http://trick77.com/tools/latest_cc_blacklist.txt'
response = urlopen(url)
self._import_csv(codecs.iterdecode(response, 'utf-8'),
source_name='ktipp')
def test_incremental_decode(self):
self.assertEqual(
"".join(codecs.iterdecode((bytes([c]) for c in b"python.org"), "")
def get_csv_entries():
if TEST:
action = open(LOCAL_CSV)
else:
action = closing(urlopen(REMOTE_CSV, context=CONTEXT))
with action as f:
if not TEST and sys.version_info.major > 2:
f = codecs.iterdecode(f, 'utf-8') # needed for urlopen and py3
for entry in csv.DictReader(f, fieldnames=FIELDS):
yield entry
def check_partial(self, input, partialresults):
# get a StreamReader for the encoding and Feed the bytestring version
# of input to the reader byte by byte. Read everything available from
# the StreamReader and check that the results equal the appropriate
# entries from partialresults.
q = Queue(b"")
r = codecs.getreader(self.encoding)(q)
result = ""
for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
q.write(bytes([c]))
result += r.read()
self.assertEqual(result, partialresult)
# check that there's nothing left in the buffers
self.assertEqual(r.read(), "")
self.assertEqual(r.bytebuffer, b"")
# do the check again,this time using a incremental decoder
d = codecs.getincrementaldecoder(self.encoding)()
result = ""
for (c, partialresults):
result += d.decode(bytes([c]))
self.assertEqual(result, partialresult)
# check that there's nothing left in the buffers
self.assertEqual(d.decode(b"", "")
self.assertEqual(d.buffer, b"")
# Check whether the reset method works properly
d.reset()
result = ""
for (c, b"")
# check iterdecode()
encoded = input.encode(self.encoding)
self.assertEqual(
input,
"".join(codecs.iterdecode([bytes([c]) for c in encoded], self.encoding))
)
def check_partial(self, partialresults):
# get a StreamReader for the encoding and Feed the bytestring version
# of input to the reader byte by byte. Read everything available from
# the StreamReader and check that the results equal the appropriate
# entries from partialresults.
q = Queue()
r = codecs.getreader(self.encoding)(q)
result = u""
for (c, partialresults):
q.write(c)
result += r.read()
self.assertEqual(result, u"")
self.assertEqual(r.bytebuffer, "")
self.assertEqual(r.charbuffer, u"")
# do the check again,this time using an incremental decoder
d = codecs.getincrementaldecoder(self.encoding)()
result = u""
for (c, partialresults):
result += d.decode(c)
self.assertEqual(result, partialresult)
# check that there's nothing left in the buffers
self.assertEqual(d.decode("", u"")
self.assertEqual(d.buffer, "")
# Check whether the reset method works properly
d.reset()
result = u""
for (c, "")
# check iterdecode()
encoded = input.encode(self.encoding)
self.assertEqual(
input,
u"".join(codecs.iterdecode(encoded, self.encoding))
)
def import_products(self, request, pk=None):
"""
Create products on a project using CSV and ZIP files.
"""
# Two files to import: CSV and ZIP of products
# Parse CSV
# Unzip designs
# Go through CSV file,creating products
# For each product parse the design
# Return list of created projects + failures
products_file = request.data.get('products_file')
designs_file = request.data.get('designs_file')
rejected = []
completed = []
if products_file:
# Read the CSV file of products into a list
decoded_file = codecs.iterdecode(products_file, 'utf-8-sig')
try:
products = [line for line in csv.DictReader(decoded_file, skipinitialspace=True)]
except UnicodeDecodeError:
return Response({'message': 'Please supply file in UTF-8 CSV format.'},
status=400)
# Open the zip file for reading,assign the files within to a dict with filenames
designs = {}
if designs_file:
with zipfile.ZipFile(designs_file, 'r') as dzip:
for file_path in dzip.namelist():
filename = file_path.split('/')[-1]
with dzip.open(file_path, 'rU') as d:
designs[filename] = d.read()
# Iteratre through products creating them and linking design
for p in products:
# Replace the name of the design file with the actual contents
if p.get('design', None):
p['design'] = designs[p['design']].decode('utf-8-sig')
p['project'] = self.get_object().id
serializer = ProductSerializer(data=p)
if serializer.is_valid():
instance = serializer.save(created_by=request.user)
items = []
parser = DesignFileParser(instance.design)
if instance.design_format == 'csv':
items, sbol = parser.parse_csv()
elif instance.design_format == 'gb':
items, sbol = parser.parse_gb()
for i in items:
instance.linked_inventory.add(i)
completed.append(p)
else:
p['reason'] = serializer.errors
rejected.append(p)
return Response({'message': 'Import completed',
'completed': completed,
'rejected': rejected})
else:
return Response({'message': 'Please supply a product deFinition and file of designs'},
status=400)
def check_partial(self,this time using a incremental decoder
d = codecs.getincrementaldecoder(self.encoding)()
result = u""
for (c, self.encoding))
)
def check_partial(self,this time using an incremental decoder
d = codecs.getincrementaldecoder(self.encoding)()
result = ""
for (c, self.encoding))
)
def write(table_name,
table_schema_path,
connection_string,
input_file,
db_schema,
geometry_support,
from_srid,
skip_headers,
indexes_fields):
table_schema = get_table_schema(table_schema_path)
## Todo: csv settings? use Frictionless Data csv standard?
## Todo: support line delimted json?
with fopen(input_file) as file:
if re.match(s3_regex, input_file) != None:
rows = csv.reader(codecs.iterdecode(file, 'utf-8'))
else:
rows = csv.reader(file)
if skip_headers:
next(rows)
if re.match(carto.carto_connection_string_regex, connection_string) != None:
load_postgis = geometry_support == 'postgis'
if indexes_fields != None:
indexes_fields = indexes_fields.split(',')
carto.load(db_schema, table_name, load_postgis, table_schema, connection_string, rows, indexes_fields)
else:
connection_string = get_connection_string(connection_string)
engine, storage = create_storage_adaptor(connection_string, db_schema, geometry_support, from_srid=from_srid)
## Todo: truncate? carto does. Makes this idempotent
if table_schema_path != None:
table_schema = get_table_schema(table_schema_path)
storage.describe(table_name, descriptor=table_schema)
if geometry_support == None and engine.dialect.driver == 'psycopg2':
copy_from(engine, rows)
else:
storage.write(table_name, rows)