Python aiohttp.web 模块,View() 实例源码
我们从Python开源项目中,提取了以下3个代码示例,用于说明如何使用aiohttp.web.View()。
def test_atomic_from_view(test_client):
app = web.Application()
class MyView(web.View):
@atomic
async def get(self):
return web.Response()
app.router.add_route("*", "/", MyView)
aiojobs_setup(app)
client = await test_client(app)
resp = await client.get('/')
assert resp.status == 200
scheduler = get_scheduler_from_app(app)
assert scheduler.active_count == 0
assert scheduler.pending_count == 0
def __call__(self, url, method: [Iterable, str] = 'GET'):
def _(obj):
from .view import BaseView
if iscoroutinefunction(obj):
assert method, "Must give at least one method to http handler `%s`" % obj.__name__
if type(method) == str: methods = (method,)
else: methods = list(method)
self.funcs.append((url, methods, obj))
elif isinstance(obj, type):
if issubclass(obj, WSHandler):
self.websockets.append((url, obj()))
elif issubclass(obj, web.View):
self.aiohttp_views.append((url, obj))
elif issubclass(obj, BaseView):
self.views.append((url, obj))
else:
raise BaseException('Invalid type for router: %r' % type(obj).__name__)
else:
raise BaseException('Invalid type for router: %r' % type(obj).__name__)
return obj
return _
def atomic(coro):
@wraps(coro)
async def wrapper(request):
if isinstance(request, View):
# Class Based View decorated.
request = request.request
job = await spawn(request, coro(request))
return await job.wait()
return wrapper