Python httplib 模块,OK 实例源码
我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用httplib.OK。
def test_validate_image_status_before_upload_ok_v1(self):
mock_conn = mock.Mock()
fake_url = 'http://fake_host/fake_path/fake_image_id'
mock_check_resp_status_and_retry = self.mock_patch_object(
self.glance, 'check_resp_status_and_retry')
mock_head_resp = mock.Mock()
mock_head_resp.status = httplib.OK
mock_head_resp.read.return_value = 'fakeData'
mock_head_resp.getheader.return_value = 'queued'
mock_conn.getresponse.return_value = mock_head_resp
self.glance.validate_image_status_before_upload_v1(
mock_conn, fake_url, extra_headers=mock.Mock())
self.assertTrue(mock_conn.getresponse.called)
self.assertEqual(mock_head_resp.read.call_count, 2)
self.assertFalse(mock_check_resp_status_and_retry.called)
def test_validate_image_status_before_upload_req_head_exception_v1(self):
mock_conn = mock.Mock()
mock_conn.request.side_effect = Fake_HTTP_Request_Error()
fake_url = 'http://fake_host/fake_path/fake_image_id'
mock_head_resp = mock.Mock()
mock_head_resp.status = httplib.OK
mock_head_resp.read.return_value = 'fakeData'
mock_head_resp.getheader.return_value = 'queued'
mock_conn.getresponse.return_value = mock_head_resp
self.assertRaises(self.glance.RetryableError,
self.glance.validate_image_status_before_upload_v1,
mock_conn, extra_headers=mock.Mock())
mock_conn.request.assert_called_once()
mock_head_resp.read.assert_not_called()
mock_conn.getresponse.assert_not_called()
def test_validate_image_status_before_upload_ok_v2_using_uwsgi(self):
mock_conn = mock.Mock()
fake_url = 'http://fake_host/fake_path/fake_image_id'
mock_check_resp_status_and_retry = self.mock_patch_object(
self.glance, 'check_resp_status_and_retry')
mock_head_resp = mock.Mock()
mock_head_resp.status = httplib.OK
mock_head_resp.read.return_value = '{"status": "queued"}'
mock_conn.getresponse.return_value = mock_head_resp
fake_extra_headers = mock.Mock()
fake_patch_path = 'fake_patch_path'
self.glance.validate_image_status_before_upload_v2(
mock_conn, fake_extra_headers, fake_patch_path)
self.assertTrue(mock_conn.getresponse.called)
self.assertEqual(
mock_head_resp.read.call_count, 2)
self.assertFalse(mock_check_resp_status_and_retry.called)
mock_conn.request.assert_called_with('GET',
'fake_patch_path',
headers=fake_extra_headers)
def fetch_from_url(url, config, data=None, handlers=None):
"""Returns data retrieved from a URL.
@param url: URL to attempt to open
@type url: basestring
@param config: SSL context configuration
@type config: Configuration
@return data retrieved from URL or None
"""
return_code, return_message, response = open_url(url, data=data,
handlers=handlers)
if return_code and return_code == http_client_.OK:
return_data = response.read()
response.close()
return return_data
else:
raise URLFetchError(return_message)
def fetch_from_url_to_file(url, output_file, handlers=None):
"""Writes data retrieved from a URL to a file.
@param url: URL to attempt to open
@type url: basestring
@param config: SSL context configuration
@type config: Configuration
@param output_file: output file
@type output_file: basestring
@return: tuple (
returned HTTP status code or 0 if an error occurred
returned message
boolean indicating whether access was successful)
"""
return_code,
handlers=handlers)
if return_code == http_client_.OK:
return_data = response.read()
response.close()
outfile = open(output_file, "w")
outfile.write(return_data)
outfile.close()
return return_code, return_code == http_client_.OK
def fetch_stream_from_url(url, handlers=None):
"""Returns data retrieved from a URL.
@param url: URL to attempt to open
@type url: basestring
@param config: SSL context configuration
@type config: Configuration
@param data: HTTP POST data
@type data: str
@param handlers: list of custom urllib2 handlers to add to the request
@type handlers: iterable
@return: data retrieved from URL or None
@rtype: file derived type
"""
return_code,
handlers=handlers)
if return_code and return_code == http_client_.OK:
return response
else:
raise URLFetchError(return_message)
def Delete(
name,
creds,
transport
):
"""Delete a tag or digest.
Args:
name: a tag or digest to be deleted.
creds: the creds to use for deletion.
transport: the transport to use to contact the registry.
"""
docker_transport = docker_http.Transport(
name, creds, transport, docker_http.DELETE)
resp, unused_content = docker_transport.Request(
'{scheme}://{registry}/v2/{repository}/manifests/{entity}'.format(
scheme=docker_http.Scheme(name.registry),
registry=name.registry,
repository=name.repository,
entity=_tag_or_digest(name)),
method='DELETE',
accepted_codes=[httplib.OK, httplib.ACCEPTED])
def _content(
self,
suffix,
accepted_mimes=None,
cache=True
):
"""Fetches content of the resources from registry by http calls."""
if isinstance(self._name, docker_name.Repository):
suffix = '{repository}/{suffix}'.format(
repository=self._name.repository,
suffix=suffix)
if suffix in self._response:
return self._response[suffix]
_, content = self._transport.Request(
'{scheme}://{registry}/v2/{suffix}'.format(
scheme=docker_http.Scheme(self._name.registry),
registry=self._name.registry,
suffix=suffix),
accepted_codes=[httplib.OK],
accepted_mimes=accepted_mimes)
if cache:
self._response[suffix] = content
return content
def blob_size(self, digest):
"""The byte size of the raw blob."""
suffix = 'blobs/' + digest
if isinstance(self._name,
suffix=suffix)
resp, unused_content = self._transport.Request(
'{scheme}://{registry}/v2/{suffix}'.format(
scheme=docker_http.Scheme(self._name.registry),
method='HEAD',
accepted_codes=[httplib.OK])
return int(resp['content-length'])
# Large,do not memoize.
def catalog(self, page_size=100):
# Todo(user): Handle docker_name.Repository for /v2/<name>/_catalog
if isinstance(self._name, docker_name.Repository):
raise ValueError('Expected docker_name.Registry for "name"')
url = '{scheme}://{registry}/v2/_catalog?n={page_size}'.format(
scheme=docker_http.Scheme(self._name.registry),
registry=self._name.registry,
page_size=page_size)
for _, content in self._transport.PaginatedRequest(
url, accepted_codes=[httplib.OK]):
wrapper_object = json.loads(content)
if 'repositories' not in wrapper_object:
raise docker_http.BadStateException(
'Malformed JSON response: %s' % content)
# Todo(user): This should return docker_name.Repository
for repo in wrapper_object['repositories']:
yield repo
# __enter__ and __exit__ allow use as a context manager.
def _content(
self,
accepted_mimes = None,
cache = True
):
"""Fetches content of the resources from registry by http calls."""
if isinstance(self._name,
accepted_mimes=accepted_mimes)
if cache:
self._response[suffix] = content
return content
def _put_layer(
self,
image,
layer_id
):
"""Upload the aufs tarball for a single layer."""
# Todo(user): We should stream this instead of loading
# it into memory.
docker_http.Request(self._transport,
'{scheme}://{endpoint}/v1/images/{layer}/layer'.format(
scheme=docker_http.Scheme(self._endpoint),
endpoint=self._endpoint,
layer=layer_id),
self._token_creds,
accepted_codes=[httplib.OK],
body=image.layer(layer_id),
content_type='application/octet-stream')
def Delete(
name,
transport
):
"""Delete a tag or digest.
Args:
name: a tag or digest to be deleted.
creds: the credentials to use for deletion.
transport: the transport to use to contact the registry.
"""
docker_transport = docker_http.Transport(
name, httplib.ACCEPTED])
def _content(self, suffix, cache=True):
"""Fetches content of the resources from registry by http calls."""
if isinstance(self._name,
accepted_codes=[httplib.OK])
if cache:
self._response[suffix] = content
return content
def catalog(self, accepted_codes=[httplib.OK]):
wrapper_object = json.loads(content)
if 'repositories' not in wrapper_object:
raise docker_http.BadStateException(
'Malformed JSON response: %s' % content)
for repo in wrapper_object['repositories']:
# Todo(user): This should return docker_name.Repository instead.
yield repo
# __enter__ and __exit__ allow use as a context manager.
def test_validate_URL_existence_url_ok(self, mock_cs, mock_url):
url_ok = Foo()
setattr(url_ok, "code", httplib.OK)
mock_url.side_effect = [url_ok]
mock_cs.side_effect = [None]
api = API.apiCalls.ApiCalls("", "", "")
validate_URL = api.validate_URL_existence
url = "http://google.com"
valid = True
is_valid = validate_URL(url)
self.assertEqual(is_valid, valid)
API.apiCalls.urlopen.assert_called_with(url, timeout=api.max_wait_time)
def session(self):
if self._session_set_externally:
return self._session
try:
self._session_lock.acquire()
response = self._session.options(self.base_URL)
if response.status_code != httplib.OK:
raise Exception
else:
logging.debug("Existing session still works,going to reuse it.")
except:
logging.debug("Token is probably expired,going to get a new session.")
oauth_service = self.get_oauth_service()
access_token = self.get_access_token(oauth_service)
self._session = oauth_service.get_session(access_token)
finally:
self._session_lock.release()
return self._session
def test_view_product(self):
""" test product view loads """
product = Product.active.all()[0]
product_url = product.get_absolute_url()
url_entry = urlresolvers.resolve(product_url)
template_name = url_entry[2]['template_name']
response = self.client.get(product_url)
self.failUnless(response)
self.assertEqual(response.status_code, httplib.OK)
self.assertTemplateUsed(response, template_name)
self.assertContains(response, product.name)
self.assertContains(response, html.escape(product.description))
# check for cart form in product page response
cart_form = response.context[0]['form']
self.failUnless(cart_form)
# check that the cart form is instance of correct form class
self.failUnless(isinstance(cart_form, ProductAddToCartForm))
product_reviews = response.context[0].get('product_reviews',None)
self.failIfEqual(product_reviews, None)
def test_get_folder(self):
self.client.get.return_value = mock_api_response('/folders/folder-id',
httplib.OK,
None,
'folders', 'GET_{id}_response.json')
folder = self.folders.get('folder-id')
self.client.get.assert_called_with(self.client.get.return_value.url, params=None)
self.assertIsNotNone(folder)
self.assertEqual('folder-id', folder.id)
self.assertEqual('folder-name', folder.name)
self.assertEqual('root-id', folder.parentId)
self.assertEqual(1, len(folder.files))
self.assertEqual('file-id', folder.files[0].id)
self.assertEqual(1, len(folder.subfolders))
self.assertEqual('subfolder-id', folder.subfolders[0].id)
self.assertEqual('folder-id', folder.subfolders[0].parentId)
def test_ngas_status():
"""
Execute the STATUS command against the NGAS server on the host fabric is
currently pointing at
"""
try:
serv = urllib2.urlopen('http://{0}:7777/STATUS'.format(env.host), timeout=5)
except IOError:
failure('Problem connecting to server {0}'.format(env.host))
raise
response = serv.read()
serv.close()
if response.find('Status="SUCCESS"') == -1:
failure('Problem with response from {0},not SUCESS as expected'.format(env.host))
raise ValueError(response)
else:
success('Response from {0} OK'.format(env.host))
def upload_to(host, filename, port=7777):
"""
Simple method to upload a file into NGAS
"""
with contextlib.closing(httplib.httpconnection(host, port)) as conn:
conn.putrequest('POST', '/QARCHIVE?filename=%s' % (urllib2.quote(os.path.basename(filename)),) )
conn.putheader('Content-Length', os.stat(filename).st_size)
conn.endheaders()
with open(filename) as f:
for data in iter(functools.partial(f.read, 4096), ''):
conn.send(data)
r = conn.getresponse()
if r.status != httplib.OK:
raise Exception("Error while QARCHIVE-ing %s to %s:%d:\nStatus: %d\n%s\n\n%s" % (filename, conn.host, conn.port, r.status, r.msg, r.read()))
else:
success("{0} successfully archived to {1}!".format(filename, host))
def _request(self, url, method='GET', ok_statuses=None):
"""
Issue an HTTP request using the authorized Http object,handle bad responses,set the Meta object from the
response content,and return the data as JSON.
:param url: endpoint to send the request
:param method: HTTP method (e.g. GET,POST,etc.),defaults to GET
:return: JSON data
"""
#
# Todo: clean up the ability to send a POST and add unit tests.
#
if data is None:
req_body = None
else:
req_body = urllib.urlencode(data)
self.resp, self.content = self.http.request(url, method, body=req_body)
if ok_statuses is None:
ok_statuses = [httplib.OK]
self._raise_for_status(ok_statuses)
resp_json = json.loads(self.content)
self.Meta = upapi.Meta.Meta(**resp_json['Meta'])
return resp_json['data']
def disconnect(self):
"""
Revoke access for this user.
"""
self.delete(upapi.endpoints.disCONNECT)
self.credentials = None
#
# Todo: finish pubsub implementation.
#
# def set_pubsub(self,url):
# """
# Register a user-specific pubsub webhook.
#
# :param url: user-specific webhook callback URL
# """
# resp = self.oauth.post(upapi.endpoints.PUBSUB,data={'webhook': url})
# self._raise_for_status([httplib.OK],resp)
#
# def delete_pubsub(self):
# """
# Delete a user-specific pubsub webhook.
# """
# self.delete(upapi.endpoints.PUBSUB)
def test__raise_for_status(self, mock_resp):
"""
Verify that an exceptions gets raised for unexpected responses.
:param mock_resp: mocked httplib Response
"""
#
# ok_statuses should not raise
#
mock_resp.status = httplib.CREATED
self.up.resp = mock_resp
self.up.content = ''
try:
self.up._raise_for_status([httplib.OK, httplib.CREATED])
except Exception as exc:
self.fail('_raise_for_status unexpectedly threw {}'.format(exc))
#
# Anything else should raise.
#
mock_resp.status = httplib.ACCEPTED
self.assertRaises(
upapi.exceptions.UnexpectedAPIResponse,
self.up._raise_for_status,
[httplib.OK, httplib.CREATED])
def _parse_result(http_response, response):
if http_response.status / 100 == httplib.CONTINUE / 100:
raise BceClientError('Can not handle 1xx http status code')
bse = None
body = http_response.read()
if body:
d = json.loads(body)
if 'message' in d and 'code' in d and 'requestId' in d:
bse = BceServerError(d['message'], code=d['code'], request_id=d['requestId'])
elif http_response.status / 100 == httplib.OK / 100:
response.__dict__.update(json.loads(body, \
object_hook=utils.dict_to_python_object).__dict__)
http_response.close()
return True
elif http_response.status / 100 == httplib.OK / 100:
return True
if bse is None:
bse = BceServerError(http_response.reason, request_id=response.Metadata.bce_request_id)
bse.status_code = http_response.status
raise bse
def _parse_result(http_response, request_id=response.Metadata.bce_request_id)
bse.status_code = http_response.status
raise bse
def check_endpoint_for_error(resource, operation=None):
def _decorator(func):
def _execute(name=None):
try:
return func(name), httplib.OK
except error.NotFoundError:
return connexion.problem(
httplib.NOT_FOUND,
'{} not found'.format(resource),
'Requested {} `{}` not found.'
.format(resource.lower(), name))
except error.TobedoneError:
return connexion.problem(
httplib.NOT_IMPLEMENTED,
'{} handler not implemented'.format(operation),
'Requested operation `{}` on {} not implemented.'
.format(operation.lower(), resource.lower()))
return _execute
return _decorator
def fetch_from_url(url,
handlers=handlers)
if return_code and return_code == http_client_.OK:
return_data = response.read()
response.close()
return return_data
else:
raise URLFetchError(return_message)
def fetch_from_url_to_file(url, return_code == http_client_.OK
def fetch_stream_from_url(url,
handlers=handlers)
if return_code and return_code == http_client_.OK:
return response
else:
raise URLFetchError(return_message)
def fetch_from_url(url,
handlers=handlers)
if return_code and return_code == http_client_.OK:
return_data = response.read()
response.close()
return return_data
else:
raise URLFetchError(return_message)
def fetch_from_url_to_file(url, return_code == http_client_.OK
def deconstruct_list(response_data, model=None, status_code=OK):
view_name = None
if not response_data:
raise KrumpException(
'Response data is empty; the view name at least is required.')
if len(response_data) >= 1:
view_name = required_view_name(response_data[0])
if len(response_data) >= 2:
model = response_data[1]
if len(response_data) >= 3:
status_code = response_data[2]
if model is None:
model = {}
if status_code is None:
status_code = OK
return view_name, model, status_code
def test_get_value_for_attribute_with_a_present_attribute(self, mock_Meta_req):
"""Test get_value_for_attribute returns correctly.
Setup:
* Mock out a httplib.HTTPResponse .
* Return that from _issue_HTTP_Request.
Expected results:
* A matching string.
"""
mock_response = 'expected_response'
with mock.patch('httplib.HTTPResponse',
mock.mock_open(read_data=mock_response)) as mock_http_resp:
mock_http_resp.return_value.status = httplib.OK
mock_Meta_req.side_effect = mock_http_resp
actual_response = Metadata_server.get_value_for_attribute('')
self.assertEqual(actual_response, mock_response)
def _refresh_access_token(self):
"""
Request new access token to send with requests to Brightcove. Access Token expires every 5 minutes.
"""
url = "https://oauth.brightcove.com/v3/access_token"
params = {"grant_type": "client_credentials"}
auth_string = base64.encodestring(
'{}:{}'.format(self.api_key, self.api_secret)
).replace('\n', '')
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "Basic " + auth_string
}
resp = requests.post(url, headers=headers, data=params)
if resp.status_code == httplib.OK:
result = resp.json()
return result['access_token']
def get(self, headers=None, can_retry=True):
"""
Issue REST GET request to a given URL. Can throw apiclientError or its subclass.
Arguments:
url (str): API url to fetch a resource from.
headers (dict): Headers necessary as per API,e.g. authorization bearer to perform authorised requests.
can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call.
Returns:
Response in python native data format.
"""
headers_ = {'Authorization': 'Bearer ' + str(self.access_token)}
if headers is not None:
headers_.update(headers)
resp = requests.get(url, headers=headers_)
if resp.status_code == httplib.OK:
return resp.json()
elif resp.status_code == httplib.UNAUTHORIZED and can_retry:
self.access_token = self._refresh_access_token()
return self.get(url, headers, can_retry=False)
else:
raise BrightcoveapiclientError
def get(self, can_retry=False):
"""
Issue REST GET request to a given URL. Can throw apiclientError or its subclass.
Arguments:
url (str): API url to fetch a resource from.
headers (dict): Headers necessary as per API,e.g. authorization bearer to perform
authorised requests.
Returns:
Response in python native data format.
"""
headers_ = {
'Authorization': 'Bearer {}'.format(self.access_token.encode(encoding='utf-8')),
'Accept': 'application/json'
}
if headers is not None:
headers_.update(headers)
resp = requests.get(url, headers=headers_)
if resp.status_code == httplib.OK:
return resp.json()
else:
raise VimeoapiclientError(_("Can't fetch requested data from API."))
def connect_intercept(self):
hostname = self.path.split(':')[0]
certpath = "%s/%s.crt" % (self.certdir.rstrip('/'), hostname)
with self.lock:
if not os.path.isfile(certpath):
epoch = "%d" % (time.time() * 1000)
p1 = Popen(["openssl", "req", "-new", "-key", self.certkey, "-subj", "/CN=%s" % hostname], stdout=PIPE)
p2 = Popen(["openssl", "x509", "-req", "-days", "3650", "-CA", self.cacert, "-CAkey", self.cakey,
"-set_serial", epoch, "-out", certpath], stdin=p1.stdout, stderr=PIPE)
p2.communicate()
self.wfile.write("%s %d %s\r\n" % (self.protocol_version, httplib.OK, 'Connection Established'))
self.end_headers()
self.connection = ssl.wrap_socket(self.connection, keyfile=self.certkey, certfile=certpath, server_side=True)
self.rfile = self.connection.makefile("rb", self.rbufsize)
self.wfile = self.connection.makefile("wb", self.wbufsize)
conntype = self.headers.get('Proxy-Connection', '')
if conntype.lower() == 'close':
self.close_connection = 1
elif conntype.lower() == 'keep-alive' and self.protocol_version >= "HTTP/1.1":
self.close_connection = 0
def connect_relay(self):
address = self.path.split(':', 1)
address[1] = int(address[1]) or 443
try:
s = socket.create_connection(address, timeout=self.timeout)
except:
self.send_error(httplib.BAD_GATEWAY)
return
self.send_response(httplib.OK, 'Connection Established')
self.end_headers()
conns = [self.connection, s]
self.close_connection = 0
while not self.close_connection:
rlist, wlist, xlist = select.select(conns, [], conns, self.timeout)
if xlist or not rlist:
break
for r in rlist:
other = conns[1] if r is conns[0] else conns[0]
data = r.recv(8192)
if not data:
self.close_connection = 1
break
other.sendall(data)
def fetch_from_url(url,
handlers=handlers)
if return_code and return_code == http_client_.OK:
return_data = response.read()
response.close()
return return_data
else:
raise URLFetchError(return_message)
def fetch_from_url_to_file(url, return_code == http_client_.OK
def fetch_stream_from_url(url,
handlers=handlers)
if return_code and return_code == http_client_.OK:
return response
else:
raise URLFetchError(return_message)
def wopiGetFile(fileid):
'''Implements the GetFile WOPI call'''
Wopi.refreshconfig()
try:
acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
if acctok['exp'] < time.time():
raise jwt.exceptions.ExpiredSignatureError
Wopi.log.info('msg="GetFile" user="%s:%s" filename="%s" fileid="%s" token="%s"' % \
(acctok['ruid'], acctok['rgid'], acctok['filename'], fileid, flask.request.args['access_token'][-20:]))
# stream file from storage to client
resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream')
resp.status_code = httplib.OK
return resp
except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
Wopi.log.warning('msg="Signature verification Failed" client="%s" requestedUrl="%s" token="%s"' % \
(flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
return 'Invalid access token', httplib.UNAUTHORIZED
except Exception, e:
return _logGeneralExceptionAndReturn(e, flask.request)
#
# The following operations are all called on POST /wopi/files/<fileid>
#
def wopiUnlock(fileid, reqheaders, acctok):
'''Implements the Unlock WOPI call'''
lock = reqheaders['X-WOPI-Lock']
retrievedLock = _retrieveWopiLock(fileid, 'UNLOCK', lock, acctok)
if not _compareWopiLocks(retrievedLock, lock):
return _makeConflictResponse('UNLOCK', retrievedLock, '', acctok['filename'])
# OK,the lock matches. Remove any extended attribute related to locks and conflicts handling
try:
xrdcl.removefile(_getLockName(acctok['filename']), Wopi.lockruid, Wopi.lockrgid, 1)
except IOError:
# ignore,it's not worth to report anything here
pass
try:
xrdcl.rmxattr(acctok['filename'], LASTSAVETIMEKEY)
except IOError:
# same as above
pass
# and update our internal list of opened files
try:
del Wopi.openfiles[acctok['filename']]
except KeyError:
# already removed?
pass
return 'OK', httplib.OK
def wopiGetLock(fileid, reqheaders_unused, acctok):
'''Implements the GetLock WOPI call'''
resp = flask.Response()
# throws exception if no lock
resp.headers['X-WOPI-Lock'] = _retrieveWopiLock(fileid, 'GETLOCK', acctok)
resp.status_code = httplib.OK
# for statistical purposes,check whether a lock exists and update internal bookkeeping
if resp.headers['X-WOPI-Lock']:
try:
# the file was already opened for write,check whether this is a new user
if not acctok['username'] in Wopi.openfiles[acctok['filename']][1]:
# yes it's a new user
Wopi.openfiles[acctok['filename']][1].add(acctok['username'])
if len(Wopi.openfiles[acctok['filename']][1]) > 1:
# for later monitoring,explicitly log that this file is being edited by at least two users
Wopi.log.info('msg="Collaborative editing detected" filename="%s" token="%s" users="%s"' % \
(acctok['filename'], flask.request.args['access_token'][-20:], list(Wopi.openfiles[acctok['filename']][1])))
except KeyError:
# existing lock but missing Wopi.openfiles[acctok['filename']] ?
Wopi.openfiles[acctok['filename']] = (time.asctime(), sets.Set([acctok['username']]))
return resp
def _copy(gcs_stub, headers):
"""copy file.
Args:
gcs_stub: an instance of gcs stub.
filename: dst filename of format /bucket/filename
headers: a dict of request headers. Must contain _XGoogcopySource header.
Returns:
An _FakeUrlFetchResult instance.
"""
source = _XGoogcopySource(headers).value
result = _handle_head(gcs_stub, source)
if result.status_code == httplib.NOT_FOUND:
return result
directive = headers.pop('x-goog-Metadata-directive', 'copY')
if directive == 'REPLACE':
gcs_stub.put_copy(source, headers)
else:
gcs_stub.put_copy(source, None)
return _FakeUrlFetchResult(httplib.OK, {}, '')
def _handle_head(gcs_stub, filename):
"""Handle HEAD request."""
filestat = gcs_stub.head_object(filename)
if not filestat:
return _FakeUrlFetchResult(httplib.NOT_FOUND, '')
http_time = common.posix_time_to_http(filestat.st_ctime)
response_headers = {
'x-goog-stored-content-length': filestat.st_size,
'content-length': 0,
'content-type': filestat.content_type,
'etag': filestat.etag,
'last-modified': http_time
}
if filestat.Metadata:
response_headers.update(filestat.Metadata)
return _FakeUrlFetchResult(httplib.OK, response_headers, '')