Python flask.current_app 模块,_get_current_object() 实例源码
我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.current_app._get_current_object()。
def send_email(subject, recipients, template, **kwargs):
if not isinstance(recipients, list):
recipients = list(recipients)
msg = Message(subject, reply_to=current_app.config['MAIL_DEFAULT_SENDER'],
recipients=recipients)
msg.body = render_template(template + '.txt', **kwargs)
msg.html = render_template(template + '.html', **kwargs)
attachments = kwargs.get('attachments', [])
mimes = MimeTypes()
for file in attachments:
path_ = os.path.join(current_app.config['APP_UPLOADS'], file)
with current_app.open_resource(path_) as fp:
mime = mimes.guess_type(fp.name)
msg.attach(path_, mime[0], fp.read())
app = current_app._get_current_object()
t = Thread(target=send_async_email, args=[app, msg])
t.start()
return t
def send_mail(to, subject, **kwargs):
"""
????
:param to: ???
:param subject: ??
:param template: ????
:param kwargs: ????
:return:
"""
app = current_app._get_current_object()
message = Message(app.config['MAIL_SUBJECT_PREFIX'] + '' + subject, sender=app.config['MAIL_SENDER'],
recipients=[to])
message.body = render_template(template + '.txt', **kwargs)
message.html = render_template(template + '.html', **kwargs)
thr = Thread(target=send_async_email, message])
thr.start()
return thr
def save(self):
"""
????????
:return:
"""
try:
db_session.add(self)
db_session.commit()
except:
db_session.rollback()
raise
finally:
db_session.close()
model_saved.send(app._get_current_object())
return self
def info():
app = current_app._get_current_object()
key = app.config['IP_INFO_DB_KEY']
ip = request.args.get('ip', None)
if not ip:
if request.headers.getlist("X-Forwarded-For"):
ip = request.headers.getlist("X-Forwarded-For")[0]
else:
ip = request.remote_addr
user_agent = {'browser': request.user_agent.browser,
'language': request.user_agent.language,
'platform': request.user_agent.platform,
'string': request.user_agent.string,
'version': request.user_agent.version,
'status': 'success',
'message': 'localhost user agent information'}
else:
user_agent = {'status': 'error',
'message': 'not query localhost information'}
ip_info = requests.get('http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, ip)).json()
return jsonify(status='success', data={'ip': ip, 'ip_information': ip_info, 'user_agent': user_agent})
def nslookup():
dom = request.args.get('domain', None)
if dom:
try:
result = socket.gethostbyname(dom)
except:
result = 'domain error,check your input'
if result.startswith('domain'):
ip_info = None
else:
app = current_app._get_current_object()
key = app.config['IP_INFO_DB_KEY']
ip_info = requests.get('http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, result)).json()
return jsonify(status='success', data={'DNS record': result, 'IP infomation': ip_info})
else:
return jsonify(status='error', data='needs domain parameter'), 400
def register_user(**kwargs):
confirmation_link, token = None, None
kwargs['password'] = encrypt_password(kwargs['password'])
user = _datastore.create_user(**kwargs)
_datastore.commit()
if _security.confirmable:
confirmation_link, token = generate_confirmation_link(user)
do_flash(*get_message('CONFIRM_REGISTRATION', email=user.email))
user_registered.send(app._get_current_object(),
user=user, confirm_token=token)
if config_value('SEND_REGISTER_EMAIL'):
send_mail(config_value('EMAIL_SUBJECT_REGISTER'), user.email, 'welcome',
user=user, confirmation_link=confirmation_link)
return user
def _check_token():
header_key = _security.token_authentication_header
args_key = _security.token_authentication_key
header_token = request.headers.get(header_key, None)
token = request.args.get(args_key, header_token)
if request.get_json(silent=True):
if not isinstance(request.json, list):
token = request.json.get(args_key, token)
user = _security.login_manager.token_callback(token)
if user and user.is_authenticated:
app = current_app._get_current_object()
_request_ctx_stack.top.user = user
identity_changed.send(app, identity=Identity(user.id))
return True
return False
def nslookup():
dom = request.args.get('domain',check your input'
if result.startswith('domain'):
ip_info = None
else:
app = current_app._get_current_object()
key = app.config['IP_INFO_DB_KEY']
time.sleep(3)
ip_info = requests.get(
'http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, 400
def send(self, message, envelope_from=None):
"""Verifies and sends message.
:param message: Message instance.
:param envelope_from: Email address to be used in MAIL FROM command.
"""
assert message.send_to, "No recipients have been added"
assert message.sender, (
"The message does not specify a sender and a default sender "
"has not been configured")
if message.has_bad_headers():
raise BadHeaderError
if message.date is None:
message.date = time.time()
if self.host:
self.host.sendmail(sanitize_address(envelope_from or message.sender),
list(sanitize_addresses(message.send_to)),
message.as_bytes() if PY3 else message.as_string(),
message.mail_options,
message.rcpt_options)
email_dispatched.send(message, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def set_status(msg, status):
message = {
'msg': msg,
'to_do': 'I do not kNow what to do,the signal will tell me'
}
status_changed.send(current_app._get_current_object(), status="go_to_work")
status = get_status_callback()
return message, status
def send_mails(recipients, cc, mail_title, mail_body):
msg = Message(mail_title)
msg.body = mail_body
msg.sender = current_app._get_current_object().config['MAIL_USERNAME']
msg.recipients = recipients
msg.cc = cc
mail.send(msg)
def send(self, message):
"""Verifies and sends message.
:param message: Message instance.
"""
assert message.recipients, (
"The message does not specify a sender and a default sender "
"has not been configured")
if message.has_bad_headers():
raise BadHeaderError
if message.date is None:
message.date = time.time()
if self.host:
self.host.sendmail(message.sender,
message.send_to,
message.as_string())
email_dispatched.send(message, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def super_user_role(self):
return app._get_current_object().config['SUPER_USER_ROLE']
def send(self, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def _dispatching_log(message, level=None):
app = current_app._get_current_object()
if app is None or app.debug:
return _original_log(message, level)
def send_email(to, **kwargs):
"""Send email using either Celery,or Thread.
Selection depends on CELERY_INSTEAD_THREADING config variable.
"""
app = current_app._get_current_object()
if app.config['CELERY_INSTEAD_THREADING']:
send_email_celery(to, countdown=None, **kwargs)
else:
send_email_thread(to, **kwargs)
def send(self, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def send(self, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def get_engine():
app = current_app._get_current_object()
if not hasattr(app, '_sa_engine'):
app._sa_engine = create_engine(get_config('DATABASE_URL'))
return app._sa_engine
def get_es():
app = current_app._get_current_object()
if not hasattr(app, '_es_instance'):
app._es_instance = Elasticsearch(get_config('ELASTICSEARCH_URL'),
timeout=240, sniff_on_start=True)
return app._es_instance
def send(self, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def before_first_request():
"""Start a background thread that looks for users that leave."""
def find_offline_users(app):
with app.app_context():
while True:
users = User.find_offline_users()
for user in users:
push_model(user)
db.session.remove()
time.sleep(5)
if not current_app.config['TESTING']:
thread = threading.Thread(target=find_offline_users,
args=(current_app._get_current_object(),))
thread.start()
def send(self, app=current_app._get_current_object())
self.num_emails += 1
if self.num_emails == self.mail.max_emails:
self.num_emails = 0
if self.host:
self.host.quit()
self.host = self.configure_host()
def setUp(self):
FlaskBloggingTestCase.setUp(self)
self._create_storage()
self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
self.engine = self._create_blogging_engine()
self.login_manager = LoginManager(self.app)
@self.login_manager.user_loader
@self.engine.user_loader
def load_user(user_id):
return TestUser(user_id)
@self.app.route("/login/<username>/", methods=["POST"],
defaults={"blogger": 0})
@self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
def login(username, blogger):
this_user = TestUser(username)
login_user(this_user)
if blogger:
identity_changed.send(current_app._get_current_object(),
identity=Identity(username))
return redirect("/")
@self.app.route("/logout/")
def logout():
logout_user()
identity_changed.send(current_app._get_current_object(),
identity=AnonymousIdentity())
return redirect("/")
for i in range(20):
tags = ["hello"] if i < 10 else ["world"]
user = "testuser" if i < 10 else "newuser"
self.storage.save_post(title="Sample Title%d" % i,
text="Sample Text%d" % i,
user_id=user, tags=tags)
def setUp(self):
FlaskBloggingTestCase.setUp(self)
self._create_storage()
self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
self.engine = self._create_blogging_engine()
self.login_manager = LoginManager(self.app)
@self.login_manager.user_loader
@self.engine.user_loader
def load_user(user_id):
return TestUser(user_id)
@self.app.route("/login/<username>/", tags=tags)
def setUp(self):
FlaskBloggingTestCase.setUp(self)
self._create_storage()
self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
self.engine = self._create_blogging_engine()
self.login_manager = LoginManager(self.app)
@self.login_manager.user_loader
@self.engine.user_loader
def load_user(user_id):
return TestUser(user_id)
@self.app.route("/login/<username>/", tags=tags)
def login():
user = User("testuser")
login_user(user)
# notify the change of role
identity_changed.send(current_app._get_current_object(),
identity=Identity("testuser"))
return redirect("/blog")
def _perform_login(self, user):
user = prepare_user(user)
login_user(user)
# Notify flask principal that the identity has changed
identity_changed.send(current_app._get_current_object(),
identity=Identity(user.id))
def send_email(to,
sender = app.config['FLASKY_MAIL_SENDER'], recipients=[to])
msg.body = render_template(template + '.txt', msg])
thr.start()
return thr
def login():
data = request.values
if hasattr(request, "json") and request.json:
data = request.json
if octoprint.server.userManager.enabled and "user" in data and "pass" in data:
username = data["user"]
password = data["pass"]
if "remember" in data and data["remember"] in valid_boolean_trues:
remember = True
else:
remember = False
if "usersession.id" in session:
_logout(current_user)
user = octoprint.server.userManager.findUser(username)
if user is not None:
if octoprint.server.userManager.checkPassword(username, password):
if octoprint.server.userManager.enabled:
user = octoprint.server.userManager.login_user(user)
session["usersession.id"] = user.get_session()
g.user = user
login_user(user, remember=remember)
identity_changed.send(current_app._get_current_object(), identity=Identity(user.get_id()))
return jsonify(user.asDict())
return make_response(("User unkNown or password incorrect", 401, []))
elif "passive" in data:
return passive_login()
return NO_CONTENT