Python httplib 模块,HTTP 实例源码
我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用httplib.HTTP。
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
void = fp.read()
fp.close()
# In case the server sent a relative URL,join with original:
newurl = basejoin(self.type + ":" + url, newurl)
# For security reasons we do not allow redirects to protocols
# other than HTTP,HTTPS or FTP.
newurl_lower = newurl.lower()
if not (newurl_lower.startswith('http://') or
newurl_lower.startswith('https://') or
newurl_lower.startswith('ftp://')):
raise IOError('redirect error',
errmsg + " - Redirection to url '%s' is not allowed" %
newurl,
headers)
return self.open(newurl)
def post_multipart(host, selector, fields, files):
"""
Post fields and files to an http host as multipart/form-data.
fields is a sequence of (name,value) elements for regular form fields.
files is a sequence of (name,filename,value) elements for data to be uploaded as files
Return the server's response page.
"""
content_type, body = encode_multipart_formdata(fields, files)
h = httplib.HTTP(host)
h.putrequest('POST', selector)
h.putheader('content-type', content_type)
h.putheader('content-length', str(len(body)))
h.endheaders()
h.send(body)
errcode, headers = h.getreply()
return h.file.read()
def encode_multipart_formdata(fields, files):
"""
fields is a sequence of (name,value) elements for data to be uploaded as files
Return (content_type,body) ready for httplib.HTTP instance
"""
BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$'
CRLF = '\r\n'
L = []
for (key, value) in fields:
L.append('--' + BOUNDARY)
L.append('Content-disposition: form-data; name="%s"' % key)
L.append('')
L.append(value)
for (key, filename, value) in files:
L.append('--' + BOUNDARY)
L.append('Content-disposition: form-data; name="%s"; filename="%s"' % (key, filename))
L.append('Content-Type: %s' % get_content_type(filename))
L.append('')
L.append(value)
L.append('--' + BOUNDARY + '--')
L.append('')
body = CRLF.join(L)
content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
return content_type, body
def redirect_internal(self, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL,
headers)
return self.open(newurl)
def make_connection(self, host):
# create a HTTPS connection object from a host descriptor
host, extra_headers, x509 = self.get_host_info(host)
http = HTTPTLSConnection(host, None,
self.username, self.password,
self.sharedKey,
self.certChain, self.privateKey,
self.checker.cryptoID,
self.checker.protocol,
self.checker.x509Fingerprint,
self.checker.x509TrustList,
self.checker.x509CommonName,
self.settings)
http2 = httplib.HTTP()
http2._setup(http)
return http2
def post_multipart(host, headers = h.getreply()
return h.file.read()
def redirect_internal(self,
headers)
return self.open(newurl)
def post_multipart(url, files):
parts = urlparse.urlparse(url)
scheme = parts[0]
host = parts[1]
selector = parts[2]
content_type, files)
if scheme == 'http':
h = httplib.HTTP(host)
elif scheme == 'https':
h = httplib.HTTPS(host)
else:
raise ValueError('unkNown scheme: ' + scheme)
h.putrequest('POST', headers = h.getreply()
return h.file.read()
def _send_message(self , xmlName):
self._set_message(xmlName)
librarylogger.info(u'?????????')
webservice = httplib.HTTP(self._host)
webservice.putrequest("POST" , self._postName)
webservice.putheader("Host" , self._host)
webservice.putheader("Content-Type" , self._ContentType)
webservice.putheader("Content-length" , "%d" % len ( self._message))
webservice.putheader("SOAPAction" , "\"http://tempuri.org/WSMethod\"")
webservice.putheader("Cookie" , self._cookie)
# ???????????header???
webservice.endheaders( )
librarylogger.info(u'???????')
librarylogger.info(u'??????data')
webservice.send(self._message)
webservice.getreply()
msg = webservice.getfile().read()
if(msg!=None and len(msg)>0):
librarylogger.info(u'?????????????')
return msg
def test_ipv6host_header(self):
# Default host header on IPv6 transaction should be wrapped by [] if
# it is an IPv6 address
expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
'Accept-Encoding: identity\r\n\r\n'
conn = httplib.httpconnection('[2001::]:81')
sock = FakeSocket('')
conn.sock = sock
conn.request('GET', '/foo')
self.assertTrue(sock.data.startswith(expected))
expected = 'GET /foo HTTP/1.1\r\nHost: [2001:102A::]\r\n' \
'Accept-Encoding: identity\r\n\r\n'
conn = httplib.httpconnection('[2001:102A::]')
sock = FakeSocket('')
conn.sock = sock
conn.request('GET', '/foo')
self.assertTrue(sock.data.startswith(expected))
def test_malformed_truncation(self):
# Other malformed header lines,especially without colons,used to
# cause the rest of the header section to be truncated
resp = (
b'HTTP/1.1 200 OK\r\n'
b'Public-Key-Pins: \n'
b'pin-sha256="xxx=";\n'
b'report-uri="https://..."\r\n'
b'transfer-encoding: chunked\r\n'
b'\r\n'
b'4\r\nbody\r\n0\r\n\r\n'
)
resp = httplib.HTTPResponse(FakeSocket(resp))
resp.begin()
self.assertIsNotNone(resp.getheader('Public-Key-Pins'))
self.assertEqual(resp.getheader('transfer-encoding'), 'chunked')
self.assertEqual(resp.read(), b'body')
def test_status_lines(self):
# Test HTTP status lines
body = "HTTP/1.1 200 Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
resp.begin()
self.assertEqual(resp.read(0), '') # Issue #20007
self.assertFalse(resp.isclosed())
self.assertEqual(resp.read(), 'Text')
self.assertTrue(resp.isclosed())
body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
self.assertRaises(httplib.BadStatusLine, resp.begin)
def test_ipv6host_header(self):
# Default host header on IPv6 transaction should be wrapped by [] if
# it is an IPv6 address
expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
'Accept-Encoding: identity\r\n\r\n'
conn = httplib.httpconnection('[2001::]:81')
sock = FakeSocket('')
conn.sock = sock
conn.request('GET', '/foo')
self.assertTrue(sock.data.startswith(expected))
def redirect_internal(self,
headers)
return self.open(newurl)
def flush(self):
if self.isOpen():
self.close()
self.open();
# Pull data out of buffer
data = self.__wbuf.getvalue()
self.__wbuf = StringIO()
# HTTP request
self.__http.putrequest('POST', self.path)
# Write headers
self.__http.putheader('Host', self.host)
self.__http.putheader('Content-Type', 'application/x-thrift')
self.__http.putheader('Content-Length', str(len(data)))
self.__http.endheaders()
# Write payload
self.__http.send(data)
# Get reply to flush the request
self.code, self.message, self.headers = self.__http.getreply()
# Decorate if we kNow how to timeout
def send(self, event, msg, key):
id, url = key
request = self.format_request(
"%s: %s" %
(event, msg) if msg else event)
webservice = httplib.HTTP(url)
webservice.putrequest("POST", id)
webservice.putheader("Host", url)
webservice.putheader("Content-type", "text/xml")
webservice.putheader("X-NotificationClass", "2")
webservice.putheader("X-WindowsPhone-Target", "toast")
webservice.putheader("Content-length", "%d" % len(request))
webservice.endheaders()
webservice.send(request)
webservice.close()
def redirect_internal(self,
headers)
return self.open(newurl)
def make_connection(self,
self.settings)
http2 = httplib.HTTP()
http2._setup(http)
return http2
def run(dmn,file):
h = httplib.HTTP('www.google.com')
h.putrequest('GET',"/search?num=500&q=site:"+dmn+"+filetype:"+file)
h.putheader('Host', 'www.google.com')
h.putheader('User-agent', 'Internet Explorer 6.0 ')
h.putheader('Referrer', 'www.g13net.com')
h.endheaders()
returncode, returnmsg, headers = h.getreply()
data=h.getfile().read()
data=re.sub('<b>','',data)
for e in ('>','=','<','\\','(',')','"','http',':','//'):
data = string.replace(data,e,' ')
r1 = re.compile('[-_.a-zA-Z0-9.-_]*'+'\.'+file)
res = r1.findall(data)
return res
def test_ipv6host_header(self):
# Default host header on IPv6 transaction should wrapped by [] if
# its actual IPv6 address
expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
'Accept-Encoding: identity\r\n\r\n'
conn = httplib.httpconnection('[2001::]:81')
sock = FakeSocket('')
conn.sock = sock
conn.request('GET', '/foo')
self.assertTrue(sock.data.startswith(expected))
def test_response_headers(self):
# test response with multiple message headers with the same field name.
text = ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: Customer="WILE_E_COYOTE";'
' Version="1"; Path="/acme"\r\n'
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
' Path="/acme"\r\n'
'\r\n'
'No body\r\n')
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
','
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
s = FakeSocket(text)
r = httplib.HTTPResponse(s)
r.begin()
cookies = r.getheader("Set-Cookie")
if cookies != hdr:
self.fail("multiple headers not combined properly")
def test_local_bad_hostname(self):
# The (valid) cert doesn't validate the HTTP hostname
import ssl
server = self.make_server(CERT_fakehostname)
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_required
context.check_hostname = True
context.load_verify_locations(CERT_fakehostname)
h = httplib.HTTPSConnection('localhost', server.port, context=context)
with self.assertRaises(ssl.CertificateError):
h.request('GET', '/')
h.close()
# With context.check_hostname=False,the mismatching is ignored
context.check_hostname = False
h = httplib.HTTPSConnection('localhost', context=context)
h.request('GET', '/nonexistent')
resp = h.getresponse()
self.assertEqual(resp.status, 404)
def redirect_internal(self,
headers)
return self.open(newurl)
def test_ipv6host_header(self):
# Default host header on IPv6 transaction should wrapped by [] if
# its actual IPv6 address
expected = 'GET /foo HTTP/1.1\r\nHost: [2001::]:81\r\n' \
'Accept-Encoding: identity\r\n\r\n'
conn = httplib.httpconnection('[2001::]:81')
sock = FakeSocket('')
conn.sock = sock
conn.request('GET', '/foo')
self.assertTrue(sock.data.startswith(expected))
def test_response_headers(self):
# test response with multiple message headers with the same field name.
text = ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: Customer="WILE_E_COYOTE";'
' Version="1"; Path="/acme"\r\n'
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
' Path="/acme"\r\n'
'\r\n'
'No body\r\n')
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
','
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
s = FakeSocket(text)
r = httplib.HTTPResponse(s)
r.begin()
cookies = r.getheader("Set-Cookie")
if cookies != hdr:
self.fail("multiple headers not combined properly")
def test_chunked_head(self):
chunked_start = (
'HTTP/1.1 200 OK\r\n'
'transfer-encoding: chunked\r\n\r\n'
'a\r\n'
'hello world\r\n'
'1\r\n'
'd\r\n'
)
sock = FakeSocket(chunked_start + '0\r\n')
resp = httplib.HTTPResponse(sock, method="HEAD")
resp.begin()
self.assertEqual(resp.read(), '')
self.assertEqual(resp.status, 200)
self.assertEqual(resp.reason, 'OK')
self.assertTrue(resp.isclosed())
def make_connection(self, host):
self.realhost = host
proxies = urllib.getproxies()
proxyurl = None
if 'http' in proxies:
proxyurl = proxies['http']
elif 'all' in proxies:
proxyurl = proxies['all']
if proxyurl:
urltype, proxyhost = urllib.splittype(proxyurl)
host, selector = urllib.splithost(proxyhost)
h = httplib.HTTP(host)
self.proxy_is_used = True
return h
else:
self.proxy_is_used = False
return Transport.make_connection(self, host)
def redirect_internal(self,
headers)
return self.open(newurl)
def redirect_internal(self,
headers)
return self.open(newurl)
def post_multipart(host, headers = h.getreply()
return h.file.read()
def make_connection(self,
self.settings)
http2 = httplib.HTTP()
http2._setup(http)
return http2
def redirect_internal(self,
headers)
return self.open(newurl)
def make_connection(self, host):
# return an existing connection if possible. This allows
# HTTP/1.1 keep-alive.
if self._connection and host == self._connection[0]:
http = self._connection[1]
else:
# create a HTTPS connection object from a host descriptor
chost, x509 = self.get_host_info(host)
http = HTTPTLSConnection(chost,
username=self.username, password=self.password,
certChain=self.certChain, privateKey=self.privateKey,
checker=self.checker,
settings=self.settings,
ignoreAbruptClose=self.ignoreAbruptClose)
# store the host argument along with the connection object
self._connection = host, http
if not self.conn_class_is_http:
return http
http2 = httplib.HTTP()
http2._setup(http)
return http2
def redirect_internal(self,
headers)
return self.open(newurl)
def send(self, "%d" % len(request))
webservice.endheaders()
webservice.send(request)
webservice.close()
def do_search_files(self, files):
h = httplib.HTTP(self.server)
h.putrequest(
'GET',
"search/web/results/?q=" +
self.word +
"filetype:" +
self.files +
"&elements_per_page=50&start_index=" +
self.counter)
h.putheader('Host', self.hostname)
h.putheader('User-agent', self.userAgent)
h.endheaders()
returncode, headers = h.getreply()
self.results = h.getfile().read()
self.totalresults += self.results
def run(dmn,' ')
r1 = re.compile('[-_.a-zA-Z0-9.-_]*'+'\.'+file)
res = r1.findall(data)
return res
def run_search(self):
try:
con = httplib.HTTP(self.server)
con.putrequest('GET', '/search?q=%40'+self.keyword)
con.putheader('Host', self.host)
con.putheader('Cookie', 'SRCHHPGUSR=ADLT=DEMOTE&NRSLT=50')
con.putheader('Accept-Language', 'en-us,en')
con.putheader('User-agent', self.u_agent)
con.endheaders()
# return code,messagge and header
returncode, header = con.getreply()
self.results = con.getfile().read()
self.tresult += self.results
except Exception as err:
print "\t\t|"
print "\t\t|__"+self.r+" Server not found!!\n"+self.t
def getreply(self):
file = self.sock.makefile('rb')
data = ''.join(file.readlines())
file.close()
self.file = StringIO(data)
line = self.file.readline()
try:
[ver, code, msg] = line.split(None, 2)
except ValueError:
try:
[ver, code] = line.split(None, 1)
msg = ""
except ValueError:
return -1, line, None
if ver[:5] != 'HTTP/':
return -1, None
code = int(code)
msg = msg.strip()
headers = mimetools.Message(self.file, 0)
return ver, headers
def __snd_request(self, method, uri, headers={}, body='', eh=1):
try:
h = HTTP()
h.connect(self.host, self.port)
h.putrequest(method, uri)
for n, v in headers.items():
h.putheader(n, v)
if eh:
h.endheaders()
if body:
h.send(body)
ver, hdrs = h.getreply()
data = h.getfile().read()
h.close()
except Exception:
raise NotAvailable(sys.exc_value)
return http_response(ver, hdrs, data)
# HTTP methods