Python urllib.error 模块,code() 实例源码
我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用urllib.error.code()。
def execute_get(self, operation, headers={}):
url = self._host + operation.get_url()
headers['Content-Type'] = 'application/json;charset=utf-8'
request = urllib.request.Request(url, headers=headers)
try:
response = self._opener.open(request)
except urllib.error.HTTPError as error:
error_body = error.read().decode('utf-8')
new_error_string = ('HTTP error ' +
str(error.code) + ' ' +
error.reason + ': ' +
error_body)
raise ConnectionError(new_error_string)
return response.read().decode('utf-8')
def is_operation_supported(self, operation=None, headers={}):
url = self._host + '/graph/operations/' + operation.get_operation()
headers['Content-Type'] = 'application/json;charset=utf-8'
request = urllib.request.Request(url, headers=headers)
try:
response = self._opener.open(request)
except urllib.error.HTTPError as error:
error_body = error.read().decode('utf-8')
new_error_string = ('HTTP error ' +
str(error.code) + ' ' +
error.reason + ': ' +
error_body)
raise ConnectionError(new_error_string)
response_text = response.read().decode('utf-8')
return response_text
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
type=str, help='Search term (default: %(default)s)')
parser.add_argument('-l', '--location', dest='location',
default=DEFAULT_LOCATION, type=str,
help='Search location (default: %(default)s)')
input_values = parser.parse_args()
try:
query_api(input_values.term, input_values.location)
except urllib.error.HTTPError as error:
sys.exit(
'Encountered HTTP error {0}. Abort '
'program.'.format(error.code))
def getDetailedBinaryLog(self, module, starttime, stoptime, filename):
url = self.getDetailed_url % {
"addr": module, "start": starttime, "stop": stoptime}
if isinstance(self.urlopener, urllib.request.OpenerDirector):
url_f = self.urlopener.open(url, timeout=30)
else:
url_f = self.urlopener.open(url)
if url_f.code != 200:
url_f.close()
raise j.exceptions.RuntimeError(
"Server fault code %s" % (url_f.code))
else:
with open(filename, 'wb') as fp:
while True:
chunk = url_f.read(65536)
if not chunk:
break
fp.write(chunk)
url_f.close()
def http_error_301(self, req, fp, code, msg, headers):
self.preProcessingRedirection(req, headers) result = super(SmartRedirectHandler, self).http_error_301(req, headers) self.postProcessingRedirection(result)
return result def http_error_302(self, headers): self.preProcessingRedirection(req, headers) result = super(SmartRedirectHandler, self).http_error_302(req, headers) self.postProcessingRedirection(result) return result
def preProcessingRedirection(self, headers): location = '' for i in headers._headers: if i[0] == 'Location': location = i[1].strip() req.add_header('Host',urlparse(location).netloc)
printAnswer(code, str(msg) + " " + location) printHeaders(headers._headers,'Set-Cookie')
def requestC(opener,url, headers, data, method = 'POST'): [answer, code] = requestB(opener, method)
def test_includes_code_and_msg(self):
def raise_error():
raise UCSM_XML_API_Error('bad', 4224)
error = self.assertRaises(UCSM_XML_API_Error, raise_error)
self.assertEqual('bad', error.args[0])
self.assertEqual(4224, error.code)
def _send_data(self, url):
if isinstance(self.urlopener, timeout=30)
else:
url_f = self.urlopener.open(url)
data = url_f.read()
url_f.close()
if url_f.code != 200:
raise j.exceptions.RuntimeError(
"Server fault code %s message %s" % (url_f.code, data))
return data
def execute_operation_chain(self, operation_chain, headers={}):
"""
This method queries Gaffer with the provided operation chain.
"""
# Construct the full URL path to the Gaffer server
url = self._host + '/graph/operations/execute'
if hasattr(operation_chain, "to_json"):
op_chain_json_obj = operation_chain.to_json()
else:
op_chain_json_obj = operation_chain
# Query Gaffer
if self._verbose:
print('\nQuery operations:\n' +
json.dumps(op_chain_json_obj, indent=4) + '\n')
# Convert the query dictionary into JSON and post the query to Gaffer
json_body = bytes(json.dumps(op_chain_json_obj), 'ascii')
headers['Content-Type'] = 'application/json;charset=utf-8'
request = urllib.request.Request(url, headers=headers, data=json_body)
try:
response = self._opener.open(request)
except urllib.error.HTTPError as error:
error_body = error.read().decode('utf-8')
new_error_string = ('HTTP error ' +
str(error.code) + ' ' +
error.reason + ': ' +
error_body)
raise ConnectionError(new_error_string)
response_text = response.read().decode('utf-8')
if self._verbose:
print('Query response: ' + response_text)
if response_text is not None and response_text is not '':
result = json.loads(response_text)
else:
result = None
return g.JsonConverter.from_json(result)