Python flask.request 模块,environ() 实例源码


项目:craton    作者:openstack    | 项目源码 | 文件源码
def request_validate(view):

    def wrapper(*args, **kwargs):
        endpoint = request.endpoint.partition('.')[-1]
        # data
        method = request.method
        if method == 'HEAD':
            method = 'GET'
        locations = validators.get((endpoint, method), {})
        data_type = {"json": "request_data", "args": "request_args"}
        for location, schema in locations.items():
            value = getattr(request, location, MultiDict())
            validator = FlaskValidatorAdaptor(schema)
            result = validator.validate(value)
  "Validated request %s: %s" % (location, result))
            if schema.get("maxProperties") == 0:
                kwargs[data_type[location]] = result

        context = request.environ['context']
        return view(*args, context=context, **kwargs)

    return wrapper
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def auth():
        Check user/password and returns token if valid
    if settings.API.get('forwarded_host'):
            if not request.environ['HTTP_X_FORWARDED_HOST'] == settings.API['forwarded_host']:
                raise BadRequest('Invalid HTTP_X_FORWARDED_HOST')
        except KeyError:
            raise BadRequest('Missing HTTP_X_FORWARDED_HOST')

    body = request.get_json()
    authenticated, ret = GeneralController.auth(body)
    if authenticated:
        return ret
        raise Unauthorized(ret)
项目:python-aws-ecr-deployer    作者:filc    | 项目源码 | 文件源码
def _setup_requests(app):

    def _init_request():
        session = request.environ['beaker.session']


    def before_request():
        init_request = _init_request()
        return init_request
项目:python-ares    作者:pynog    | 项目源码 | 文件源码
def createTeam():
  """ """
  if request.environ.get('HTTP_ORIGIN') == request.host_url[:-1]:
    newTeam = Team.query.filter_by(team_name=request.args['team']).first()
    dbEnv = Aressql.sqliteDB(request.args['report_name'])
    if not newTeam:
      team = Team(request.args['team'], request.args['team_email'])
      newTeam = Team.query.filter_by(team_name=request.args['team']).first()
    team_id = newTeam.team_id
    role = request.args.get('role', 'user')
    dbEnv.modify("""INSERT INTO team_def (team_id,team_name,role) VALUES (%s,'%s','%s');
                 INSERT INTO env_auth (env_id,team_id)
                 SELECT env_def.env_id,%s
                 FROM env_def
                 WHERE env_def.env_name = '%s' ;""" % (team_id, request.args['team'], role, team_id, request.args['report_name']))
    return json.dumps('Success'), 200

  return json.dumps('Forbidden', 403)
项目:arowf    作者:priyankamandikal    | 项目源码 | 文件源码
def page_not_found(error):
    logdate = datetime.strftime(, '%Y-%m-%d')
    logfn = './logs/activity-'+logdate
    user = 'Anonymous'                      # Username is Anonymous by default
    if 'token' in session:
        token = session['token']
        tokenfilename = 'registered/'+token
        with open(tokenfilename, 'r') as f: # for getting username associated with the set token
            user = f.readline()[:-1]
    with open(logfn, 'a') as f:     # logging username,IP addr,error code
        log = user+' '+request.environ['REMOTE_ADDR']+' 500\n'
    return render_template('500.html'), 500

# No cache
项目:bibtex-browser    作者:frapac    | 项目源码 | 文件源码
def post_comment():
    """Add post to article."""
    form = PostForm()
    article = request.environ["HTTP_REFERER"].split("=")[-1]
    tim = time.time()
    user =
    post = Post(author=user, article=article,
      , time=tim)

    user =
    event = Event(author=user,
                  event="COMMENT", time=time.time())
    return redirect("/biblio/article=" + article)
项目:invenio1-orcid    作者:bronger    | 项目源码 | 文件源码
def send_error_mail(exception):
        """Sends an error mail to the admin containing the traceback and configuration.
        After that,a custom HTTP 500 page is shown.

        :param exception: the exception raised

        :type exception: ``Exception``

          the HTML to send back in the response,and the HTTP code 500.

        :rtype: str,int
        # Inspired from <>.
        message = Message("Join2 ORCID exception: %s" % exception, sender=CFG_SITE_ADMIN_EMAIL,
        message_contents = ["Traceback:", "=" * 80, traceback.format_exc(), "\n", "Request information:", "=" * 80]
        environ = request.environ
        for key in sorted(environ.keys()):
            message_contents.append("%s: %s" % (key, environ.get(key)))
        message.body = "\n".join(message_contents) + "\n"
        return render_template("500.html"), 500
项目:flack    作者:miguelgrinberg    | 项目源码 | 文件源码
def run_flask_request(environ):
    from .wsgi_aux import app

    if '_wsgi.input' in environ:
        environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])

    # Create a request context similar to that of the original request
    # so that the task can have access to flask.g,flask.request,etc.
    with app.request_context(environ):
        # Record the fact that we are running in the Celery worker Now
        g.in_celery = True

        # Run the route function and record the response
            rv = app.full_dispatch_request()
            # If we are in debug mode we want to see the exception
            # Else,return a 500 error
            if app.debug:
            rv = app.make_response(InternalServerError())
        return (rv.get_data(), rv.status_code, rv.headers)
项目:flack    作者:miguelgrinberg    | 项目源码 | 文件源码
def wrapped(*args, **kwargs):
        # If we are already running the request on the celery side,then we
        # just call the wrapped function to allow the request to execute.
        if getattr(g, 'in_celery', False):
            return f(*args, **kwargs)

        # If we are on the Flask side,we need to launch the Celery task,
        # passing the request environment,which will be used to reconstruct
        # the request object. The request body has to be handled as a special
        # case,since Wsgi requires it to be provided as a file-like object.
        environ = {k: v for k, v in request.environ.items()
                   if isinstance(v, text_types)}
        if 'wsgi.input' in request.environ:
            environ['_wsgi.input'] = request.get_data()
        t = run_flask_request.apply_async(args=(environ,))

        # Return a 202 response,with a link that the client can use to
        # obtain task status that is based on the Celery task id.
        if t.state == states.PENDING or t.state == states.RECEIVED or \
                t.state == states.STARTED:
            return '', 202, {'Location': url_for('tasks.get_status',}

        # If the task already finished,return its return value as response.
        # This would be the case when CELERY_ALWAYS_EAGER is set to True.
项目:fabric8-analytics-server    作者:fabric8-analytics    | 项目源码 | 文件源码
def error():
    """This endpoint is used by httpd,which redirects its errors to it."""
        status = int(request.environ['REDIRECT_STATUS'])
    except Exception:
        # if there's an exception,it means that a client accessed this directly;
        #  in this case,we want to make it look like the endpoint is not here
        return api_404_handler()
    msg = 'UnkNown error'
    # for Now,we just provide specific error for stuff that already happened;
    #  before adding more,I'd like to see them actually happening with reproducers
    if status == 401:
        msg = 'Authentication Failed'
    elif status == 405:
        msg = 'Method not allowed for this endpoint'
    raise HTTPError(status, msg)
项目:infoset-ng    作者:PalisadoesFoundation    | 项目源码 | 文件源码
def flask_cache_key(*args, **kwargs):
    """Create a key for use by Flask-Caching.


        result: Key to be used

    # Use the request URI as part of the key
    path = request.path

    # This helps to differentiate between varIoUs instances of infoset
    # each running on different ports
    server_port = request.environ['SERVER_PORT']

    # Use a hash of the request arguments as part of the key
    args = str(hash(frozenset(request.args.items())))

    # Return
    result = (
            path, server_port, args)[:255].encode('utf-8'))
    return result
项目:hacks    作者:neo1218    | 项目源码 | 文件源码
def run_ctx_request(environ):
    run flask request context in celery worker
    from blueprints import app  #

    if '_wsgi.input' in environ:
        # an input stream (file-like object) from which the HTTP request body can be read.
        # detail:
        environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])

    with app.request_context():
        g.in_celery = True

            rv = app.full_dispatch_request()
        except InternalServerError:
            if app.debug:
            return app.make_response(InternalServerError())
        return (rv.get_data(), rv.headers)
项目:hacks    作者:neo1218    | 项目源码 | 文件源码
def decorator(*args, **kwargs):

        if getattr(g, **kwargs)

        environ = {k: v for k, text_types)}
        if 'wsgi.input' in request.environ:
            environ['_wsgi.input'] = request.get_data()  # request.body
        task = run_ctx_request.apply_async(args=(environ,))

        if task.state == states.PENDING or task.state == states.RECEIVED or \
           task.state == states.STARTED:
            return '', {'Location': url_for('api.get_status',}

项目:rest_api    作者:opentargets    | 项目源码 | 文件源码
def __init__(self, rate_limiter):
        super(LogApiCallCount, self).__init__()
        if self.stats:
            tags = ['version:'+str(Config.API_VERSION_MInor),
            ip = get_remote_addr()
                                             ignore_time = True,

            # self.stats.increment('',
            #                 tags=tags,)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def ensure_project_exists(view):

    def wrapper(*args, **kwargs):
        context = request.environ['context']
        if context.using_keystone:
            find_or_create_project(request, context)
        return view(*args, **kwargs)

    return wrapper
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'false', 'no')
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def twitter():
    auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg",  "http://"+ request.environ["HTTP_HOST"] + "/auth/twitter")
    auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET)

    api = tweepy.API(auth)
             return redirect(url_for('index'))
    except tweepy.TweepError:

    redirect_url = auth.get_authorization_url()
    session["request_token"] = auth.request_token
    return redirect(redirect_url)
项目:flask_restapi    作者:dracarysX    | 项目源码 | 文件源码
def make_response(self, rv):
        status_or_headers = headers = None
        if isinstance(rv, tuple):
            rv, status_or_headers, headers = rv + (None,) * (3 - len(rv))

        if rv is None:
            raise ValueError('View function did not return a response')

        if isinstance(status_or_headers, (dict, list)):
            headers, status_or_headers = status_or_headers, None

        if not isinstance(rv, self.response_class):
            # When we create a response object directly,we let the constructor
            # set the headers and status.  We do this because there can be
            # some extra logic involved when creating these objects with
            # specific values (like default content type selection).
            if isinstance(rv, (JSONRender, text_type, bytes, bytearray, list, dict)):
                rv = self.response_class(rv, headers=headers, status=status_or_headers)
                headers = status_or_headers = None
                rv = self.response_class.force_type(rv, request.environ)

        if status_or_headers is not None:
            if isinstance(status_or_headers, string_types):
                rv.status = status_or_headers
                rv.status_code = status_or_headers
        if headers:

        return rv
项目:cerberus-core    作者:ovh    | 项目源码 | 文件源码
def force_type(cls, rv, environ=None):
        if isinstance(rv, dict) or isinstance(rv, list):
            rv = Response(
        return super(CerberusResponse, cls).force_type(rv, environ)
项目:flask-breathalyzer    作者:mindflayer    | 项目源码 | 文件源码
def get_http_info_with_retriever(self, retriever=None):
        Exact method for getting http_info but with form data work around.
        if retriever is None:
            retriever = self.get_form_data

        url_parts = urlparse.urlsplit(request.url)

            data = retriever()
        except Clientdisconnected:
            data = {}

        headers = dict(get_headers(request.environ))

        if self.data_blacklist:
            data = apply_blacklist(data, self.data_blacklist)
        if self.headers_blacklist:
            headers = apply_blacklist(headers, self.headers_blacklist)

        return {
            'url': '%s://%s%s' % (url_parts.scheme, url_parts.netloc, url_parts.path),
            'query_string': url_parts.query,
            'method': request.method,
            'data': data,
            'headers': headers,
            'env': dict(get_environ(request.environ)),
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:esignature-recipes-python    作者:docusign    | 项目源码 | 文件源码
def get_script_url():
    # Dynamically determine the script's url
    # For production use,this is not a great idea. Instead,set it
    # explicitly. Remember that for production,webhook urls must start with
    # https!
    my_url = rm_queryparameters(full_url(request.environ))
        # See
    return my_url
项目:esignature-recipes-python    作者:docusign    | 项目源码 | 文件源码
def url_origin(s, use_forwarded_host = False):

    # testing if Heroku includes forwarding host
    use_forwarded_host = True
    include_protocol = True

    ssl      = (('HTTPS' in s) and s['HTTPS'] == 'on')
    sp       = s['SERVER_PROTOCOL'].lower()
    protocol = sp[:sp.find('/')] + ('s' if ssl else '' )
    port     = s['SERVER_PORT']
    port     = '' if ((not ssl and port=='80') or (ssl and port=='443')) else (':' + port)
    host     = s['HTTP_X_FORWARDED_HOST'] if (use_forwarded_host and ('HTTP_X_FORWARDED_HOST' in s)) \
                 else (s['HTTP_HOST'] if ('HTTP_HOST' in s) else None)
    host     = host if (host != None) else (s['SERVER_NAME'] + port)

    # The protocol can easily be wrong if we're frontended by a HTTPS proxy
    # (Like the standard Heroku setup!)
    on_heroku = heroku_env in os.environ
    upgrade_insecure_request = request.headers.get('Upgrade-Insecure-Requests')
    upgrade_insecure_request = upgrade_insecure_request and upgrade_insecure_request == 1
    https_proto = request.headers.get('X-Forwarded-Proto')
    https_proto = https_proto and https_proto == 'https'
    use_https = on_heroku or upgrade_insecure_request or https_proto
    if use_https: # Special handling
        protocol = "https"
    return protocol + '://' + host
项目:python-ares    作者:pynog    | 项目源码 | 文件源码
def editScript():
  """ """
  scriptDtls = request.environ['HTTP_REFERER'].split('?')[0].split('/')
  if scriptDtls[-2] == 'run':
    script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-1], "" % scriptDtls[-1]), 'w')
    script = open(os.path.join(config.ARES_USERS_LOCATION, scriptDtls[-2], 'w')
    for line in'utf-8').split('\n'):
      script.write('%s\n' % line)
  except Exception as e:
  return 'OK'
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:legends-of-erukar    作者:etkirsch    | 项目源码 | 文件源码
def on_connect():
    addr = request.environ['REMOTE_ADDR']
    if addr in blacklist:
        print('{} was found in the blacklist and was rejected'.format(addr))
项目:Indushell    作者:SecarmaLabs    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:Liljimbo-chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:flask_system    作者:prashasy    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:beesly    作者:bincyber    | 项目源码 | 文件源码
def after_request(resp):
    Adds HTTP Headers to the response

    resp : flask.Response object
      the Flask response object
    resp.headers['Cache-Control'] = 'no-cache'

    if app.config['DEV']:
        resp.headers['Access-Control-Allow-Origin'] = '*'
        resp.headers['Access-Control-Allow-Methods'] = 'GET,POST'
        resp.headers['Access-Control-Allow-Headers'] = 'Origin,X-Requested-With,Content-Type,Accept'

        source_ip = get_real_source_ip()

            msg="HTTP Request",
            user_agent=request.environ.get('HTTP_USER_AGENT', '-')

    return resp
项目:beesly    作者:bincyber    | 项目源码 | 文件源码
def get_real_source_ip():
    Returns the real source IP address of the HTTP request.
    if 'X-Forwarded-For' in request.headers:
        return request.headers.getlist("X-Forwarded-For")[0].rpartition(' ')[-1]
        return request.environ['REMOTE_ADDR']
项目:arowf    作者:priyankamandikal    | 项目源码 | 文件源码
def register():
    with open('users.pkl', 'r') as f:
        userdict = load(f)
    if request.method == 'GET':
        logdate = datetime.strftime(, '%Y-%m-%d')
        logfn = './logs/activity-'+logdate
        user = 'Anonymous'                      # Username is Anonymous by default
        if 'token' in session:
            token = session['token']
            tokenfilename = 'registered/'+token
            with open(tokenfilename, 'r') as f: # for getting username associated with the set token
                user = f.readline()[:-1]
        with open(logfn,end-point,request type
            log = user+' '+request.environ['REMOTE_ADDR']+' register'+' GET\n'
        return render_template('register.html', userdict=userdict) # inputs for name,email,timezone,phone,aboutme
    elif request.method == 'POST':
        uname = str(request.form['uname'])      # mandatory
        email = str(request.form['email'])      # mandatory
        timezone = request.form['timezone']     # optional
        phone = request.form['phone']           # optional
        aboutme = request.form['aboutme']       # optional
        token = pbkdf2_sha512.encrypt(email)    # setting the token as a salted and hashed email
        token = token.replace('/', '+')         # filenames can't contain '/'
        session['token'] = token                # set token in session key on successful registration
        fn = 'registered' + path.sep + token
        with open(fn, 'w') as f:                # write reviewer info into a file named by the token
        userdict[uname] = email
        with open('users.pkl', 'w') as f:       # dictionary with keys as usernames and values as emails
            dump(userdict, f)
        send_email(email, 'Welcome to AROWF!', 'registration_mail', name=uname, token=token)    # send welcome email with token
        logdate = datetime.strftime(, '%Y-%m-%d')
        logfn = './logs/activity-'+logdate
        with open(logfn,request type
            log = uname+' '+request.environ['REMOTE_ADDR']+' register'+' POST\n'
        return redirect(url_for('index'))
项目:arowf    作者:priyankamandikal    | 项目源码 | 文件源码
def token():
    if request.method == 'GET':
        logdate = datetime.strftime(,request type
            log = user+' '+request.environ['REMOTE_ADDR']+' token'+' GET\n'
        tokenNames = listdir('registered/')         # get list of all tokens
        return render_template('token.html', user=user, tokenNames=tokenNames) # displays links to help docs for each end-point
    elif request.method == 'POST':                  # if token not set in session key
        if request.form['tokeninput'] != 'null':
            session['token'] = request.form['tokeninput']    # obtain from form and set it
            session.pop('token', None)
        logdate = datetime.strftime(,request type
            log = user+' '+request.environ['REMOTE_ADDR']+' token'+' POST\n'
        return redirect(url_for('index'))
项目:arowf    作者:priyankamandikal    | 项目源码 | 文件源码
def page_not_found(error):
    logdate = datetime.strftime(,error code
        log = user+' '+request.environ['REMOTE_ADDR']+' 404\n'
    return render_template('404.html'), 404
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:bibtex-browser    作者:frapac    | 项目源码 | 文件源码
def update_entry():
    """Add a new entry to the bibliography."""
    form = BiblioForm()
    article_name = request.environ["HTTP_REFERER"].split("=")[-1]
    if form.validate_on_submit():
        article = BiblioEntry.query.filter_by(
        article.ID =
        article.ENTRYTYPE =
        article.authors =
        article.title =
        article.year =
        article.journal = =
        article.url =
        article.keywords =
        article.tag =

        user =
        event = Event(author=user,,
                      event="UPDATE", time=time.time())

        return redirect("/biblio/article=" + article_name)
    return redirect("/biblio")
项目:FileStoreGAE    作者:liantian-cn    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:infinite-lorem-ipsum    作者:patjm1992    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:seedBox    作者:nailgun    | 项目源码 | 文件源码
def ensure_secure_request():
    is_request_secure = request.environ['wsgi.url_scheme'] == 'https'
    if not is_request_secure and not config.allow_insecure_transport:
        if request.method in ('POST', 'PUT', 'PATCH'):
            # request body already sent in insecure manner
            # return error in this case to notify cluster admin
            return abort(400)
            return redirect(request.url.replace('http://', 'https://', 1))
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def get_debug_flag(default=None):
    val = os.environ.get('FLASK_DEBUG')
    if not val:
        return default
    return val not in ('0', 'no')
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def swagger(file_name):
    with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), 'r') as swagger_file:
        swagger_spec =
        swagger_spec = swagger_spec.replace('localhost:8088', 'ari:{port}'.format(port=request.environ['SERVER_PORT']))
        return make_response(swagger_spec, 200, {'Content-Type': 'application/json'})
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def swagger(file_name):
    with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def swagger(file_name):
    with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def swagger(file_name):
    with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})
项目:xivo-ctid-ng    作者:wazo-pbx    | 项目源码 | 文件源码
def swagger(file_name):
    with open('/usr/local/share/ari/api-docs/{file_name}'.format(file_name=file_name), {'Content-Type': 'application/json'})


