Python flask 模块,request() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request()。
def test_manual_context_binding(self):
app = flask.Flask(__name__)
@app.route('/')
def index():
return 'Hello %s!' % flask.request.args['name']
ctx = app.test_request_context('/?name=World')
ctx.push()
self.assert_equal(index(), 'Hello World!')
ctx.pop()
try:
index()
except RuntimeError:
pass
else:
self.assert_true(0, 'expected runtime error')
def launch_lti() -> t.Any:
"""Do a LTI launch.
.. :quickref: LTI; Do a LTI Launch.
"""
lti = {
'params': CanvasLTI.create_from_request(flask.request).launch_params,
'exp': datetime.datetime.utcNow() + datetime.timedelta(minutes=1)
}
return flask.redirect(
'{}/lti_launch/?inLTI=true&jwt={}'.format(
app.config['EXTERNAL_URL'],
urllib.parse.quote(
jwt.encode(
lti, app.config['LTI_SECRET_KEY'], algorithm='HS512'
).decode('utf8')
)
)
)
def index():
if request.args.get('code'):
unlock_code = request.args.get('code')
# unlock,new password
re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none()
if re:
jid = re.jid
re.password_code = None
s.merge(re)
s.commit()
# set new password and send email
email_address = re.email
password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10))
p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
args = bytes("%s\n%s\n" % (password, password), encoding='utf8')
p.communicate(args)
sendMail(email_address, 'new password', password)
content = render_template('success.html', message='password was sent')
else:
content = render_template('error.html', message='link invalid')
else:
content = render_template('index.html')
return content
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, b'Hello World!')
result = greenlets[0].run()
self.assert_equal(result, 42)
# disable test if we don't have greenlets available
def api_send_loves():
sender = request.form.get('sender')
recipients = sanitize_recipients(request.form.get('recipient'))
message = request.form.get('message')
try:
recipients = send_loves(recipients, message, sender_username=sender)
recipients_display_str = ','.join(recipients)
link_url = create_love_link(recipients_display_str, message).url
return make_response(
u'love sent to {}! Share: {}'.format(recipients_display_str, link_url),
lovE_CREATED_STATUS_CODE,
{}
)
except Taintedlove as exc:
return make_response(
exc.user_message,
lovE_Failed_STATUS_CODE if exc.is_error else lovE_CREATED_STATUS_CODE,
{}
)
def sent():
link_id = request.args.get('link_id', None)
recipients_str = request.args.get('recipients', None)
message = request.args.get('message', None)
if not link_id or not recipients_str or not message:
return redirect(url_for('home'))
recipients = sanitize_recipients(recipients_str)
loved = [
Employee.get_key_for_username(recipient).get()
for recipient in recipients
]
return render_template(
'sent.html',
current_time=datetime.utcNow(),
current_user=Employee.get_current_employee(),
message=message,
loved=loved,
url='{0}l/{1}'.format(config.APP_BASE_URL, link_id),
)
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, app)
self.assert_equal(flask.request.path, '/')
self.assert_equal(flask.request.args['foo'], 'bar')
self.assert_false(flask.request)
return 42
greenlets.append(greenlet(g))
return 'Hello World!'
rv = app.test_client().get('/?foo=bar')
self.assert_equal(rv.data, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
# disable test if we don't have greenlets available
def login():
username = request.headers.get('username')
password = request.headers.get('password')
if username is None or password is None:
raise InvalidRequest()
user = UsersCollection().find_one({'username': username})
if user is None:
raise AuthFailed()
is_valid = check_password_hash(user['password_hash'], password)
if not is_valid:
raise AuthFailed()
return jsonify({'token': UserJWT.new(username, user['scope'])})
def fixUrl(destinationPort, transport, url, peerType):
"""
fixes the URL (original request string)
"""
transportProtocol = ""
if transport.lower() in "udp" or transport.lower() in "tcp":
transportProtocol="/"+transport
if ("honeytrap" in peerType):
return "Attack on port " + str(destinationPort) + transportProtocol
# prepared dionaea to output additional information in ticker
if ("dionaea" in peerType):
return "Attack on port " + str(destinationPort)+ transportProtocol
return url
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
# disable test if we don't have greenlets available
def password_reset_request():
"""Request for reseting password when user forget his/her password.
"""
if not current_user.is_anonymous:
return redirect(url_for('.index'))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
token = user.generate_reset_token()
send_email(user.email, 'Reset Your Password',
'auth/email/reset_password',
user=user, token=token,
next=request.args.get('next'))
flash('An email with instructions to reset your password has been '
'sent to you.')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password.html', form=form)
def inject():
return {
'root': _cfg("protocol") + "://" + _cfg("domain"),
'domain': _cfg("domain"),
'protocol': _cfg("protocol"),
'len': len,
'any': any,
'request': request,
'locale': locale,
'url_for': url_for,
'file_link': file_link,
'disown_link': disown_link,
'user': current_user,
'moe': random.choice(moe),
'random': random,
'owner': _cfg("owner"),
'owner_email': _cfg("owner_email"),
'_cfg': _cfg
}
def make_embed(request, message="New Transaction"):
"""
create an embed for logging transactions
"""
fields = []
fields.append({"name": "ID", "value": request['_id']})
if request['type'] == "deposit":
fields.append({"name": "Amount", "value": f"+{request['amount']}"})
elif request['type'] == "withdrawl":
fields.append({"name": "Amount", "value": f"-{request['amount']}"})
fields.append({"name": "Reason", "value": request['reason']})
fields.append({"name": "Balance", "value": request['user']['balance']})
fields.append(
{"name": "User", "value": f"{request['user']['name']}#{request['user']['discrim']} ({request['user']['_id']})"})
fields.append(
{"name": "Bot", "value": f"{request['bot']['name']}#{request['bot']['discrim']} ({request['bot']['_id']})"})
embed = {"title": message, "fields": fields, "timestamp":datetime.Now().isoformat()}
test = requests.post('https://canary.discordapp.com/api/webhooks/338371642277494786/vG8DJjpXC-NEXB4ZISo1r7QQ0Ras_RaqZbuhzjYOklKu70l73PmumdUCgBruypPv3fQp', json={"embeds": [embed]})
def get_transactions(request):
transactions = []
if "type" in request:
transactions.append(list(r.table('transactions').filter(
r.row['type'] == request['type']).run(conn)))
if "amount" in request:
transactions.append(list(r.table('transactions').filter(
r.row['amount'] == request['amount']).run(conn)))
if "bot" in request:
transactions.append(list(r.table('transactions').filter(
r.row['bot'] == request['bot']).run(conn)))
if "user" in request:
transactions.append(list(r.table('transactions').filter(
r.row['user'] == request['user']).run(conn)))
if "reason" in request:
transactions.append(list(r.table('transactions').filter(
r.row['reason'] == request['reason']).run(conn)))
if not "reason" or "user" or "bot" or "amount" or "type" in request:
transactions.append(list(r.table('transactions').run(conn)))
temp = []
for i in transactions:
if i not in temp:
temp.append(i)
transactions = temp
return transactions[:request['limit']]
def create_token():
"""
register a new bot,and return the token
"bot": {
"name": "Alpha Bot",
"discrim": "4112",
"_id": "331841835733614603"
}
"""
raw_request = flask.request
request = flask.request.get_json()
token = tokengenerator.URandomTokenGenerator().generate()
bot = {"name": request['bot']['name'], "discrim": request['bot']['discrim'],
"_id": request['bot']['id'], "owner": request['owner'], "token": token}
r.table('bots').insert({"bot": bot})
return flask.jsonify(bot)
def show_transactions():
"""
Returns all transactions that match the given parameters
(keys in parentheses are optional)
{
"limit": 20,
("type": "deposit",)
("amount": 200,)
("user": {
"name": "Mr Boo Grande",
"discrim": "6644",
"_id": "209137943661641728"
},)
("bot": {
"name": "Alpha Bot",
"discrim": "4112",
"_id": "331841835733614603"
},)
("reason": "casino")
}
"""
raw_request = flask.request
request = flask.request.get_json()
transactions = get_transactions(request)
return flask.jsonify(transactions)
def fake_transaction():
"""
create a fake transaction log (for testing purposes)
{
"_id": 90832,
"type": "deposit",
"amount": 200,
"user": {
"name": "Mr Boo Grande",
"bot": {
"name": "Alpha Bot",
"reason": "casino"
}
"""
raw_request = flask.request
request = flask.request.get_json()
make_embed(request)
return flask.jsonify(request)
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
# disable test if we don't have greenlets available
def flask(body, headers):
import flask
path = '/hello/<account_id>/test'
flask_app = flask.Flask('hello')
@flask_app.route(path)
def hello(account_id):
request = flask.request
user_agent = request.headers['User-Agent'] # NOQA
limit = request.args.get('limit', '10') # NOQA
return flask.Response(body, headers=headers,
mimetype='text/plain')
return flask_app
def werkzeug(body, headers):
import werkzeug.wrappers as werkzeug
from werkzeug.routing import Map, Rule
path = '/hello/<account_id>/test'
url_map = Map([Rule(path, endpoint='hello')])
@werkzeug.Request.application
def hello(request):
user_agent = request.headers['User-Agent'] # NOQA
limit = request.args.get('limit', '10') # NOQA
adapter = url_map.bind_to_environ(request.environ) # NOQA
endpoint, values = adapter.match() # NOQA
aid = values['account_id'] # NOQA
return werkzeug.Response(body,
mimetype='text/plain')
return hello
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
# disable test if we don't have greenlets available
def _serve_webui(self, file_name='index.html'): # pylint: disable=redefined-builtin
try:
assert file_name
web3 = self.flask_app.config.get('WEB3_ENDPOINT')
if web3 and 'config.' in file_name and file_name.endswith('.json'):
host = request.headers.get('Host')
if any(h in web3 for h in ('localhost', '127.0.0.1')) and host:
_, _port = split_endpoint(web3)
_host, _ = split_endpoint(host)
web3 = 'http://{}:{}'.format(_host, _port)
response = jsonify({'raiden': self._api_prefix, 'web3': web3})
else:
response = send_from_directory(self.flask_app.config['WEBUI_PATH'], file_name)
except (NotFound, AssertionError):
response = send_from_directory(self.flask_app.config['WEBUI_PATH'], 'index.html')
return response
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
# disable test if we don't have greenlets available
def find_fontface_by_font_id():
response_data = []
try:
query_string = request.args.get("font_id")
fontfaces = FontFaceService().find_by_font_id(query_string)
for fontface in fontfaces:
response_data.append(
{
"fontface_id": fontface.fontface_id,
"fontface": fontface.fontface,
"font_id": fontface.font_id,
"resource_path": fontface.resource_path
}
)
return jsonify(response_data)
except:
return jsonify({"error": "Invalid request"})
def pushbullet(ALERTID=None, TOKEN=None):
"""
Send a `link` notification to all devices on pushbullet with a link back to the alert's query.
If `TOKEN` is not passed,requires `PUSHBULLETTOKEN` defined,see https://www.pushbullet.com/#settings/account
"""
if not PUSHBULLETURL:
return ("PUSHBULLET parameter must be set,please edit the shim!", 500, None)
if (TOKEN is not None):
PUSHBULLETTOKEN = TOKEN
if not PUSHBULLETTOKEN:
return ("PUSHBULLETTOKEN parameter must be set, None)
a = parse(request)
payload = {
"body": a['info'],
"title": a['AlertName'],
"type": "link",
"url": a['url'],
}
headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN}
return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
def find_fontface_by_font_id():
response_data = []
try:
query_string = request.args.get("font_id")
fontfaces = FontFaceService().find_by_font_id(query_string)
for fontface in fontfaces:
response_data.append(
{
"fontface_id": fontface.fontface_id,
"resource_path": fontface.resource_path
}
)
return jsonify(response_data)
except:
return jsonify({"error": "Invalid request"})
def extract_token(self):
'''
Extracts a token from the current HTTP request if it is available.
Invokes the `save_user` callback if authentication is successful.
'''
header = request.headers.get(b'authorization')
if header and header.startswith(b'Negotiate '):
token = header[10:]
user, token = _gssapi_authenticate(token, self._service_name)
if token is not None:
stack.top.kerberos_token = token
if user is not None:
self._save_user(user)
else:
# Invalid Kerberos ticket,we Could not complete authentication
abort(403)
def test_greenlet_context_copying(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
def g():
self.assert_false(flask.request)
self.assert_false(flask.current_app)
with reqctx:
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
def test_greenlet_context_copying_api(self):
app = flask.Flask(__name__)
greenlets = []
@app.route('/')
def index():
reqctx = flask._request_ctx_stack.top.copy()
@flask.copy_current_request_context
def g():
self.assert_true(flask.request)
self.assert_equal(flask.current_app, 42)
# disable test if we don't have greenlets available
def recongize():
'''
Receives POST request from Twilio. Sends data from request to Amazon Rekognize and receives the API's analysis.
Returns the resulting analysis as JSON.
'''
# Reads image data from incoming request
r = request
data = r.files['file'].read()
# Sends image data to Amazon Rekognize for analysis. Returns JSON response of results.
try:
results = client.recognize_celebrities(Image={'Bytes': data})
except botocore.exceptions.ClientError as e:
results = {"Rekognition Client Error": str(e)}
logging.info(results)
return jsonify(results)
def chunked_reader():
try:
# If we're running under uWsgi,use the uwsgi.chunked_read method
# to read chunked input.
import uwsgi # noqa
while True:
chunk = uwsgi.chunked_read()
if len(chunk) > 0:
yield chunk
else:
return
except ImportError:
# Otherwise try to read the wsgi input. This works in embedded Apache.
stream = flask.request.environ["wsgi.input"]
try:
while True:
yield stream.next()
except Exception:
return
def get_history(coin):
c = Coin(coin.lower())
q = "SELECT * from api_coin WHERE name=:coin ORDER BY date desc"
if request.args.get('key') in API_KEYS:
print(Crayons.red('Pro request!'))
rows = pro_db.query(q, coin=c.name)
else:
rows = db.query(q, coin=c.name)
return jsonify(history=[
{
'value': r.value,
'value.currency': 'USD',
'timestamp': maya.MayaDT.from_datetime(r.date).subtract(hours=4).iso8601(),
'when': maya.MayaDT.from_datetime(r.date).subtract(hours=4).slang_time()
} for r in rows]
)
def second_phase_lti_launch(
) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]]
]:
launch_params = jwt.decode(
flask.request.headers.get('Jwt', None),
app.config['LTI_SECRET_KEY'],
algorithm='HS512'
)['params']
lti = CanvasLTI(launch_params)
user, new_token = lti.ensure_lti_user()
course = lti.get_course()
assig = lti.get_assignment(user)
lti.set_user_role(user)
new_role_created = lti.set_user_course_role(user, course)
db.session.commit()
result: t.Mapping[str, bool]]
result = {'assignment': assig, 'new_role_created': new_role_created}
if new_token is not None:
result['access_token'] = new_token
return helpers.jsonify(result)
def validate_sig(body, sig_str, pkh):
"""
Validate the signature on the body of the request - throws exception if not valid.
"""
# Check permission to update
if (pkh is None):
raise PermissionError("pkh not found.")
# Validate the pkh format
base58.b58decode_check(pkh)
if (len(pkh) < 20) or (len(pkh) > 40):
raise PermissionError("Invalid pkh")
try:
if not sig_str:
raise PermissionError("X-Bitcoin-Sig header not found.")
if not wallet.verify_bitcoin_message(body, pkh):
raise PermissionError("X-Bitcoin-Sig header not valid.")
except Exception as err:
logger.error("Failure: {0}".format(err))
raise PermissionError("X-Bitcoin-Sig header validation Failed.")
return True
def clear_cache():
user_name = request.headers.get('x-webauth-user')
account = ldap_get_member(user_name)
if not ldap_is_eval_director(account) and not ldap_is_rtp(account):
return redirect("/dashboard")
log = logger.new(request=request)
log.info('Purge All Caches')
_ldap_is_member_of_directorship.cache_clear()
ldap_get_member.cache_clear()
ldap_get_active_members.cache_clear()
ldap_get_intro_members.cache_clear()
ldap_get_onfloor_members.cache_clear()
ldap_get_current_students.cache_clear()
get_voting_members.cache_clear()
get_members_info.cache_clear()
get_onfloor_members.cache_clear()
return "cache cleared", 200
def database_processor(logger, log_method, event_dict): # pylint: disable=unused-argument,redefined-outer-name
if 'request' in event_dict:
if event_dict['method'] != 'GET':
log = UserLog(
ipaddr=event_dict['ip'],
user=event_dict['user'],
method=event_dict['method'],
blueprint=event_dict['blueprint'],
path=event_dict['path'],
description=event_dict['event']
)
db.session.add(log)
db.session.flush()
db.session.commit()
del event_dict['request']
return event_dict
def RunFileServer(fileServerDir, fileServerPort):
"""
Run a Flask file server on the given port.
Explicitly specify instance_path,because Flask's
auto_find_instance_path can fail when run in a frozen app.
"""
app = Flask(__name__, instance_path=fileServerDir)
@app.route('/fileserver-is-ready', methods=['GET'])
def FileserverIsReady(): # pylint: disable=unused-variable
"""
Used to test if file server has started.
"""
return 'Fileserver is ready!'
@app.route('/<path:filename>', methods=['GET'])
def ServeFile(filename): # pylint: disable=unused-variable
"""
Serves up a file from PYUPDATER_FILESERVER_DIR.
"""
return send_from_directory(fileServerDir, filename.strip('/'))
def ShutDownServer():
"""
Shut down the file server.
"""
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
raise RuntimeError('Not running with the Werkzeug Server')
func()
@app.route('/shutdown', methods=['POST'])
def ShutDown(): # pylint: disable=unused-variable
"""
Respond to a POSTed request to shut down the file server.
"""
ShutDownServer()
return 'Server shutting down...'
app.run(host=LOCALHOST, port=fileServerPort)
def request_password():
jid = request.form.get('jid')
re = s.query(RecoveryEmail).filter(RecoveryEmail.jid==jid, RecoveryEmail.confirmed==True).one_or_none()
if re:
password_code = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(64))
re.password_code = password_code
s.merge(re)
s.commit()
password_code_link = 'https://www.pimux.de/password/?code=%s' % password_code
sendMail(re.email, 'password reset request', 'click here: %s' % password_code_link)
content = render_template('success.html', message='link was sent')
else:
content = render_template('error.html', message='user not found')
return content