Python flask.current_app 模块-_get_current_object() 实例源码

Python flask.current_app 模块,_get_current_object() 实例源码

我们从Python开源项目中,提取了以下49代码示例,用于说明如何使用flask.current_app._get_current_object()

项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def send_email(subject, recipients, template, **kwargs):
    if not isinstance(recipients, list):
        recipients = list(recipients)
    msg = Message(subject, reply_to=current_app.config['MAIL_DEFAULT_SENDER'],
                  recipients=recipients)
    msg.body = render_template(template + '.txt', **kwargs)
    msg.html = render_template(template + '.html', **kwargs)

    attachments = kwargs.get('attachments', [])
    mimes = MimeTypes()
    for file in attachments:
        path_ = os.path.join(current_app.config['APP_UPLOADS'], file)
        with current_app.open_resource(path_) as fp:
            mime = mimes.guess_type(fp.name)
            msg.attach(path_, mime[0], fp.read())

    app = current_app._get_current_object()
    t = Thread(target=send_async_email, args=[app, msg])
    t.start()
    return t
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def send_mail(to, subject, **kwargs):
    """
    ????
    :param to: ???
    :param subject: ??
    :param template: ????
    :param kwargs: ????
    :return:
    """
    app = current_app._get_current_object()
    message = Message(app.config['MAIL_SUBJECT_PREFIX'] + '' + subject, sender=app.config['MAIL_SENDER'],
                      recipients=[to])
    message.body = render_template(template + '.txt', **kwargs)
    message.html = render_template(template + '.html', **kwargs)
    thr = Thread(target=send_async_email, message])
    thr.start()
    return thr
项目:flask_api    作者:nullcc    | 项目源码 | 文件源码
def save(self):
        """
        ????????
        :return:
        """
        try:
            db_session.add(self)
            db_session.commit()
        except:
            db_session.rollback()
            raise
        finally:
            db_session.close()

        model_saved.send(app._get_current_object())
        return self
项目:ztool-backhend    作者:Z-Tool    | 项目源码 | 文件源码
def info():
    app = current_app._get_current_object()
    key = app.config['IP_INFO_DB_KEY']
    ip = request.args.get('ip', None)
    if not ip:
        if request.headers.getlist("X-Forwarded-For"):
            ip = request.headers.getlist("X-Forwarded-For")[0]
        else:
            ip = request.remote_addr
        user_agent = {'browser': request.user_agent.browser,
                      'language': request.user_agent.language,
                      'platform': request.user_agent.platform,
                      'string': request.user_agent.string,
                      'version': request.user_agent.version,
                      'status': 'success',
                      'message': 'localhost user agent information'}
    else:
        user_agent = {'status': 'error',
                      'message': 'not query localhost information'}
    ip_info = requests.get('http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, ip)).json()
    return jsonify(status='success', data={'ip': ip, 'ip_information': ip_info, 'user_agent': user_agent})
项目:ztool-backhend    作者:Z-Tool    | 项目源码 | 文件源码
def nslookup():
    dom = request.args.get('domain', None)
    if dom:
        try:
            result = socket.gethostbyname(dom)
        except:
            result = 'domain error,check your input'
        if result.startswith('domain'):
            ip_info = None
        else:
            app = current_app._get_current_object()
            key = app.config['IP_INFO_DB_KEY']
            ip_info = requests.get('http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, result)).json()
        return jsonify(status='success', data={'DNS record': result, 'IP infomation': ip_info})
    else:
        return jsonify(status='error', data='needs domain parameter'), 400
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def register_user(**kwargs):
    confirmation_link, token = None, None
    kwargs['password'] = encrypt_password(kwargs['password'])
    user = _datastore.create_user(**kwargs)
    _datastore.commit()

    if _security.confirmable:
        confirmation_link, token = generate_confirmation_link(user)
        do_flash(*get_message('CONFIRM_REGISTRATION', email=user.email))

    user_registered.send(app._get_current_object(),
                         user=user, confirm_token=token)

    if config_value('SEND_REGISTER_EMAIL'):
        send_mail(config_value('EMAIL_SUBJECT_REGISTER'), user.email, 'welcome',
                  user=user, confirmation_link=confirmation_link)

    return user
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def _check_token():
    header_key = _security.token_authentication_header
    args_key = _security.token_authentication_key
    header_token = request.headers.get(header_key, None)
    token = request.args.get(args_key, header_token)
    if request.get_json(silent=True):
        if not isinstance(request.json, list):
            token = request.json.get(args_key, token)

    user = _security.login_manager.token_callback(token)

    if user and user.is_authenticated:
        app = current_app._get_current_object()
        _request_ctx_stack.top.user = user
        identity_changed.send(app, identity=Identity(user.id))
        return True

    return False
项目:ztool-backhend-mongo    作者:Z-Tool    | 项目源码 | 文件源码
def nslookup():
    dom = request.args.get('domain',check your input'
        if result.startswith('domain'):
            ip_info = None
        else:
            app = current_app._get_current_object()
            key = app.config['IP_INFO_DB_KEY']
            time.sleep(3)
            ip_info = requests.get(
                'http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, 400
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def send(self, message, envelope_from=None):
        """Verifies and sends message.

        :param message: Message instance.
        :param envelope_from: Email address to be used in MAIL FROM command.
        """
        assert message.send_to, "No recipients have been added"

        assert message.sender, (
                "The message does not specify a sender and a default sender "
                "has not been configured")

        if message.has_bad_headers():
            raise BadHeaderError

        if message.date is None:
            message.date = time.time()

        if self.host:
            self.host.sendmail(sanitize_address(envelope_from or message.sender),
                               list(sanitize_addresses(message.send_to)),
                               message.as_bytes() if PY3 else message.as_string(),
                               message.mail_options,
                               message.rcpt_options)

        email_dispatched.send(message, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def send_email(to, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['CIRculaTE_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['CIRculaTE_MAIL_SENDER'], recipients=[to])
    msg.body = render_template(template + '.txt', msg])
    thr.start()
    return thr
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def send_email(to, **kwargs):
    """Send email using multible threads.
    """
    app = current_app._get_current_object()
    msg = Message(app.config['MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['MAIL_SENDER'], **kwargs)
    thread = Thread(target=send_async_email, msg])
    thread.start()
    return thread
项目:aardvark    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def __init__(self, thread_ID):
        self.thread_ID = thread_ID
        threading.Thread.__init__(self)
        self.app = current_app._get_current_object()
项目:flask_example    作者:flyhigher139    | 项目源码 | 文件源码
def set_status(msg, status):
    message = {
        'msg': msg,
        'to_do': 'I do not kNow what to do,the signal will tell me'
    }

    status_changed.send(current_app._get_current_object(), status="go_to_work")

    status = get_status_callback()

    return message, status
项目:flask_example    作者:flyhigher139    | 项目源码 | 文件源码
def send_mails(recipients, cc, mail_title, mail_body):
    msg = Message(mail_title)
    msg.body = mail_body

    msg.sender = current_app._get_current_object().config['MAIL_USERNAME']
    msg.recipients = recipients
    msg.cc = cc

    mail.send(msg)
项目:flasky    作者:RoSEOu    | 项目源码 | 文件源码
def send_email(to, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['FLASKY_MAIL_SENDER'], msg])
    thr.start()
    return thr
项目:flasky    作者:RoSEOu    | 项目源码 | 文件源码
def send(self, message):
        """Verifies and sends message.

        :param message: Message instance.
        """
        assert message.recipients, (
                "The message does not specify a sender and a default sender "
                "has not been configured")

        if message.has_bad_headers():
            raise BadHeaderError

        if message.date is None:
            message.date = time.time()

        if self.host:
            self.host.sendmail(message.sender,
                               message.send_to,
                               message.as_string())

        email_dispatched.send(message, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
项目:flask-boilerplate    作者:marcFord    | 项目源码 | 文件源码
def config(self):
        return app._get_current_object().config
项目:flask-boilerplate    作者:marcFord    | 项目源码 | 文件源码
def super_user_role(self):
        return app._get_current_object().config['SUPER_USER_ROLE']
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def send(self, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _dispatching_log(message, level=None):
    app = current_app._get_current_object()
    if app is None or app.debug:
        return _original_log(message, level)
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def send_email(to, **kwargs):
    """Send email using either Celery,or Thread.

    Selection depends on CELERY_INSTEAD_THREADING config variable.
    """
    app = current_app._get_current_object()
    if app.config['CELERY_INSTEAD_THREADING']:
        send_email_celery(to, countdown=None, **kwargs)
    else:
        send_email_thread(to, **kwargs)
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def send_email_thread(to, **kwargs):
    """Send async email using threading.
    """
    app = current_app._get_current_object()
    msg = Message(app.config['APP_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['APP_MAIL_SENDER'], msg])
    thr.start()
    return thr
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def send_email_celery(to, **kwargs):
    """Send async email using Celery.
    """
    app = current_app._get_current_object()
    msg = Message(app.config['APP_MAIL_SUBJECT_PREFIX'] + ' ' + subject, **kwargs)
    send_celery_async_email.apply_async(args=[msg], countdown=countdown)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def send_mail(to, **kwargs):
    app = current_app._get_current_object()
    msg = Message(subject, sender=os.environ.get('MAIL_USERNAME'), **kwargs)

    thr = Thread(target=send_mail_async, msg])
    thr.start()

    return thr
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def send(self, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def send_email(to, msg])
    thr.start()
    return thr
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def send(self, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
项目:smart-iiot    作者:quanpower    | 项目源码 | 文件源码
def send_email(to, msg])
    thr.start()
    return thr
项目:graph-data-experiment    作者:occrp-attic    | 项目源码 | 文件源码
def get_engine():
    app = current_app._get_current_object()
    if not hasattr(app, '_sa_engine'):
        app._sa_engine = create_engine(get_config('DATABASE_URL'))
    return app._sa_engine
项目:graph-data-experiment    作者:occrp-attic    | 项目源码 | 文件源码
def get_Metadata():
    app = current_app._get_current_object()
    if not hasattr(app, '_sa_Meta'):
        app._sa_Meta = MetaData()
        app._sa_Meta.bind = get_engine()
    return app._sa_Meta
项目:graph-data-experiment    作者:occrp-attic    | 项目源码 | 文件源码
def get_model():
    app = current_app._get_current_object()
    if not hasattr(app, '_lg_model'):
        from memorIoUs.model import Model
        model_config = get_config('MODEL_YAML', 'model.yaml')
        app._lg_model = Model(load_config_file(model_config))
    return app._lg_model
项目:graph-data-experiment    作者:occrp-attic    | 项目源码 | 文件源码
def get_es():
    app = current_app._get_current_object()
    if not hasattr(app, '_es_instance'):
        app._es_instance = Elasticsearch(get_config('ELASTICSEARCH_URL'),
                                         timeout=240, sniff_on_start=True)
    return app._es_instance
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def send(self, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
项目:api    作者:Yiave    | 项目源码 | 文件源码
def send_email(to, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['YIAVE_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['YIAVE_MAIL_SENDER'], msg])
    thr.start()
    return thr
项目:flack    作者:miguelgrinberg    | 项目源码 | 文件源码
def before_first_request():
    """Start a background thread that looks for users that leave."""
    def find_offline_users(app):
        with app.app_context():
            while True:
                users = User.find_offline_users()
                for user in users:
                    push_model(user)
                db.session.remove()
                time.sleep(5)

    if not current_app.config['TESTING']:
        thread = threading.Thread(target=find_offline_users,
                                  args=(current_app._get_current_object(),))
        thread.start()
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def send_email(to, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['SYstem_MAIL_SUBJECT_PREFIX'] + ' ' + subject,
                  sender=app.config['SYstem_MAIL_SENDER'], msg])
    thr.start()
    return thr
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def send(self, app=current_app._get_current_object())

        self.num_emails += 1

        if self.num_emails == self.mail.max_emails:
            self.num_emails = 0
            if self.host:
                self.host.quit()
                self.host = self.configure_host()
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def setUp(self):
        FlaskBloggingTestCase.setUp(self)
        self._create_storage()
        self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
        self.engine = self._create_blogging_engine()
        self.login_manager = LoginManager(self.app)

        @self.login_manager.user_loader
        @self.engine.user_loader
        def load_user(user_id):
            return TestUser(user_id)

        @self.app.route("/login/<username>/", methods=["POST"],
                        defaults={"blogger": 0})
        @self.app.route("/login/<username>/<int:blogger>/", methods=["POST"])
        def login(username, blogger):
            this_user = TestUser(username)
            login_user(this_user)
            if blogger:
                identity_changed.send(current_app._get_current_object(),
                                      identity=Identity(username))
            return redirect("/")

        @self.app.route("/logout/")
        def logout():
            logout_user()
            identity_changed.send(current_app._get_current_object(),
                                  identity=AnonymousIdentity())
            return redirect("/")

        for i in range(20):
            tags = ["hello"] if i < 10 else ["world"]
            user = "testuser" if i < 10 else "newuser"
            self.storage.save_post(title="Sample Title%d" % i,
                                   text="Sample Text%d" % i,
                                   user_id=user, tags=tags)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def setUp(self):
        FlaskBloggingTestCase.setUp(self)
        self._create_storage()
        self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
        self.engine = self._create_blogging_engine()
        self.login_manager = LoginManager(self.app)

        @self.login_manager.user_loader
        @self.engine.user_loader
        def load_user(user_id):
            return TestUser(user_id)

        @self.app.route("/login/<username>/", tags=tags)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def setUp(self):
        FlaskBloggingTestCase.setUp(self)
        self._create_storage()
        self.app.config["BLOGGING_URL_PREFIX"] = "/blog"
        self.engine = self._create_blogging_engine()
        self.login_manager = LoginManager(self.app)

        @self.login_manager.user_loader
        @self.engine.user_loader
        def load_user(user_id):
            return TestUser(user_id)

        @self.app.route("/login/<username>/", tags=tags)
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def login():
    user = User("testuser")
    login_user(user)

    # notify the change of role
    identity_changed.send(current_app._get_current_object(),
                          identity=Identity("testuser"))
    return redirect("/blog")
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def logout():
    logout_user()

    # notify the change of role
    identity_changed.send(current_app._get_current_object(),
                          identity=AnonymousIdentity())
    return redirect("/")
项目:zual    作者:ninadmhatre    | 项目源码 | 文件源码
def logout():
    logout_user()

    # notify the change of role
    identity_changed.send(current_app._get_current_object(),
                          identity=AnonymousIdentity())
    return redirect("/")
项目:kNowledge-repo    作者:airbnb    | 项目源码 | 文件源码
def logout():
    logout_user()

    # Notify flask principal that the user has logged out
    identity_changed.send(current_app._get_current_object(),
                          identity=AnonymousIdentity())

    return redirect(url_for('index.render_Feed'))
项目:kNowledge-repo    作者:airbnb    | 项目源码 | 文件源码
def _perform_login(self, user):
        user = prepare_user(user)
        login_user(user)

        # Notify flask principal that the identity has changed
        identity_changed.send(current_app._get_current_object(),
                              identity=Identity(user.id))
项目:database_project    作者:HughWen    | 项目源码 | 文件源码
def send_email(to, msg])
    thr.start()
    return thr
项目:copyflask_web    作者:superchilli    | 项目源码 | 文件源码
def send_email(to,
                  sender = app.config['FLASKY_MAIL_SENDER'], recipients=[to])

    msg.body = render_template(template + '.txt', msg])
    thr.start()
    return thr
项目:cci-demo-flask    作者:circleci    | 项目源码 | 文件源码
def send_email(to, **kwargs):
    app = current_app._get_current_object()
    msg = Message(app.config['MAIL_SUBJECT_PREFIX'] + ' ' + subject, msg])
    thr.start()
    return thr
项目:SuperOcto    作者:mcecchi    | 项目源码 | 文件源码
def login():
    data = request.values
    if hasattr(request, "json") and request.json:
        data = request.json

    if octoprint.server.userManager.enabled and "user" in data and "pass" in data:
        username = data["user"]
        password = data["pass"]

        if "remember" in data and data["remember"] in valid_boolean_trues:
            remember = True
        else:
            remember = False

        if "usersession.id" in session:
            _logout(current_user)

        user = octoprint.server.userManager.findUser(username)
        if user is not None:
            if octoprint.server.userManager.checkPassword(username, password):
                if octoprint.server.userManager.enabled:
                    user = octoprint.server.userManager.login_user(user)
                    session["usersession.id"] = user.get_session()
                    g.user = user
                login_user(user, remember=remember)
                identity_changed.send(current_app._get_current_object(), identity=Identity(user.get_id()))
                return jsonify(user.asDict())
        return make_response(("User unkNown or password incorrect", 401, []))

    elif "passive" in data:
        return passive_login()
    return NO_CONTENT

相关文章

Python setuptools.dep_util 模块,newer_pairwise_group() ...
Python chainer.utils.type_check 模块,eval() 实例源码 我...
Python chainer.utils.type_check 模块,prod() 实例源码 我...
Python chainer.utils.type_check 模块,expect() 实例源码 ...
Python multiprocessing.managers 模块,BaseProxy() 实例源...
Python multiprocessing.managers 模块,RemoteError() 实例...