Python flask.request 模块,remote_addr() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.remote_addr()。
def create_session(data):
user = User.find_by_email_or_username(data['username'])
if not (user and user.password == data['password']):
return make_error_response('Invalid username/password combination', 401)
session = Session(user=user)
# Todo can this be made more accurate?
session.ip_address = request.remote_addr
if request.user_agent:
session.user_agent = request.user_agent.string
# Denormalize user agent
session.platform = request.user_agent.platform
session.browser = request.user_agent.browser
db.session.add(session)
db.session.commit()
return session
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = LoginForm()
if request.method == 'POST' and form.validate_on_submit():
username = form.username.data
if '@' in username:
existing_user = User.query.filter_by(email=username).first()
else:
existing_user = User.query.filter_by(username=username).first()
if not (existing_user and existing_user.check_password(form.password.data)):
flash('Invalid username or password. Please try again.', 'warning')
return render_template('login.html', form=form)
login_user(existing_user)
db.session.add(Connection(user=existing_user, address=request.remote_addr))
db.session.commit()
return redirect(url_for('index'))
if form.errors:
flash(form.errors, 'danger')
return render_template('login.html', form=form)
def access_write_task(original_requester, key=None):
"""
Determine whether a requester can write to a task or its runs.
"""
requester = request.remote_addr
# Local interfaces are always okay.
if requester in local_ips:
return True
# Tasks without keys are limited to the original requester only
if key is None:
return requester == original_requester
# Beyond here,the task has a key.
request_key = arg_string("key")
return (request_key is not None) and (request_key == key)
def get_role_credentials(api_version, requested_role, junk=None):
try:
role_params = roles.get_role_params_from_ip(
request.remote_addr,
requested_role=requested_role
)
except roles.UnexpectedRoleError:
return '', 403
try:
assumed_role = roles.get_assumed_role_credentials(
role_params=role_params,
api_version=api_version
)
except roles.GetRoleError as e:
return '', e.args[0][0]
return jsonify(assumed_role)
def zmirror_status():
"""????????????"""
if request.remote_addr and request.remote_addr != '127.0.0.1':
return generate_simple_resp_page(b'Only 127.0.0.1 are allowed', 403)
output = ""
output += strx('extract_real_url_from_embedded_url', extract_real_url_from_embedded_url.cache_info())
output += strx('\nis_content_type_streamed', is_mime_streamed.cache_info())
output += strx('\nembed_real_url_to_embedded_url', embed_real_url_to_embedded_url.cache_info())
output += strx('\ncheck_global_ua_pass', check_global_ua_pass.cache_info())
output += strx('\nextract_mime_from_content_type', extract_mime_from_content_type.cache_info())
output += strx('\nis_content_type_using_cdn', is_content_type_using_cdn.cache_info())
output += strx('\nis_ua_in_whitelist', is_content_type_using_cdn.cache_info())
output += strx('\nis_mime_represents_text', is_mime_represents_text.cache_info())
output += strx('\nis_domain_match_glob_whitelist', is_domain_match_glob_whitelist.cache_info())
output += strx('\nverify_ip_hash_cookie', verify_ip_hash_cookie.cache_info())
output += strx('\nis_denied_because_of_spider', is_denied_because_of_spider.cache_info())
output += strx('\nis_ip_not_in_allow_range', is_ip_not_in_allow_range.cache_info())
output += strx('\n\ncurrent_threads_number', threading.active_count())
# output += strx('\nclient_requests_text_rewrite',client_requests_text_rewrite.cache_info())
# output += strx('\nextract_url_path_and_query',extract_url_path_and_query.cache_info())
output += strx('\n----------------\n')
output += strx('\ndomain_alias_to_target_set', domain_alias_to_target_set)
return "<pre>" + output + "</pre>\n"
def retrieveAlertsJson():
""" Retrieve last 5 Alerts in JSON without IPs """
# set cacheItem independent from url parameters,respect community index
cacheEntry = request.url
# get result from cache
getCacheResult = getCache(cacheEntry, "url")
if getCacheResult is not False:
app.logger.debug('Returning /retrieveAlertsJson from Cache %s' % str(request.remote_addr))
return jsonify(getCacheResult)
# query ES
else:
numAlerts = 35
# Retrieve last X Alerts from ElasticSearch and return JSON formatted with limited alert content
returnResult = formatAlertsJson(queryAlertsWithoutIP(numAlerts, checkCommunityIndex(request)))
setCache(cacheEntry, returnResult, 25, "url")
app.logger.debug('UNCACHED %s' % str(request.url))
return jsonify(returnResult)
def accept_webhook():
try:
log.debug("POST request received from {}.".format(request.remote_addr))
data = json.loads(request.data)
if type(data) == dict: # older webhook style
data_queue.put(data)
else: # For RM's frame
for frame in data:
data_queue.put(frame)
except Exception as e:
log.error("Encountered error while receiving webhook ({}: {})".format(type(e).__name__, e))
abort(400)
return "OK" # request ok
# Thread used to distribute the data into varIoUs processes (for RocketMap format)
def __call__(self, form, field):
if current_app.testing:
return True
if request.json:
response = request.json.get('g-recaptcha-response', '')
else:
response = request.form.get('g-recaptcha-response', '')
remote_ip = request.remote_addr
if not response:
raise ValidationError(field.gettext(self.message))
if not self._validate_recaptcha(response, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def get_accounts():
system_id = request.args.get('system_id')
if not system_id:
log.error("Request from {} is missing system_id".format(request.remote_addr))
abort(400)
count = int(request.args.get('count', 1))
min_level = int(request.args.get('min_level', 1))
max_level = int(request.args.get('max_level', 40))
reuse = parse_bool(request.args.get('reuse')) or parse_bool(request.args.get('include_already_assigned'))
banned_or_new = parse_bool(request.args.get('banned_or_new'))
# lat = request.args.get('latitude')
# lat = float(lat) if lat else lat
# lng = request.args.get('longitude')
# lng = float(lng) if lng else lng
log.info(
"System ID [{}] requested {} accounts level {}-{} from {}".format(system_id, count, min_level, max_level,
request.remote_addr))
accounts = Account.get_accounts(system_id, reuse, banned_or_new)
if len(accounts) < count:
log.warning("Could only deliver {} accounts.".format(len(accounts)))
return jsonify(accounts[0] if accounts and count == 1 else accounts)
def log_exception(self, exc_info):
self.logger.error("""
Path: %s
HTTP Method: %s
Client IP Address: %s
User Agent: %s
User Platform: %s
User browser: %s
User browser Version: %s
GET args: %s
view args: %s
URL: %s
""" % (
request.path,
request.method,
request.remote_addr,
request.user_agent.string,
request.user_agent.platform,
request.user_agent.browser,
request.user_agent.version,
dict(request.args),
request.view_args,
request.url
), exc_info=exc_info)
def log_request(code='-'):
proto = request.environ.get('SERVER_PROTOCOL')
msg = request.method + ' ' + request.path + ' ' + proto
code = str(code)
if code[0] == '1': # 1xx - informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx,or any other response
msg = color(msg, color='magenta', attrs=['bold'])
logger.info('%s - - [%s] "%s" %s', request.remote_addr, log_date_time_string(), msg, code)
def configure_before_handlers(app):
"""Configures the before request handlers."""
@app.before_request
def update_lastseen():
"""Updates `lastseen` before every reguest if the user is
authenticated."""
if current_user.is_authenticated:
current_user.lastseen = datetime.datetime.utcNow()
db.session.add(current_user)
db.session.commit()
if app.config["REdis_ENABLED"]:
@app.before_request
def mark_current_user_online():
if current_user.is_authenticated:
mark_online(current_user.username)
else:
mark_online(request.remote_addr, guest=True)
def configure_before_handlers(app):
"""Configures the before request handlers."""
@app.before_request
def update_lastseen():
"""Updates `lastseen` before every reguest if the user is
authenticated."""
if current_user.is_authenticated:
current_user.lastseen = datetime.datetime.utcNow()
db.session.add(current_user)
db.session.commit()
if app.config["REdis_ENABLED"]:
@app.before_request
def mark_current_user_online():
if current_user.is_authenticated:
mark_online(current_user.username)
else:
mark_online(request.remote_addr, guest=True)
def configure_before_handlers(app):
"""Configures the before request handlers."""
@app.before_request
def update_lastseen():
"""Updates `lastseen` before every reguest if the user is
authenticated."""
if current_user.is_authenticated:
current_user.lastseen = datetime.datetime.utcNow()
db.session.add(current_user)
db.session.commit()
if app.config["REdis_ENABLED"]:
@app.before_request
def mark_current_user_online():
if current_user.is_authenticated:
mark_online(current_user.username)
else:
mark_online(request.remote_addr, guest=True)
def _capture_change_info(self):
"""Capture the change info for the new version. By default calls:
(1) :meth:`_fetch_current_user_id` which should return a string or None; and
(2) :meth:`_fetch_remote_addr` which should return an IP address string or None;
(3) :meth:`_get_custom_change_info` which should return a 1-depth dict of additional keys.
These 3 methods generate a ``change_info`` and with 2+ top-level keys (``user_id``,
``remote_addr``,and any keys from :meth:`_get_custom_change_info`)
"""
change_info = {
'user_id': self._fetch_current_user_id(),
'remote_addr': self._fetch_remote_addr(),
}
extra_info = self._get_custom_change_info()
if extra_info:
change_info.update(extra_info)
return change_info
def __call__(self, field):
if current_app.testing:
return True
if request.json:
challenge = request.json.get('recaptcha_challenge_field', '')
response = request.json.get('recaptcha_response_field', '')
else:
challenge = request.form.get('recaptcha_challenge_field', '')
response = request.form.get('recaptcha_response_field', '')
remote_ip = request.remote_addr
if not challenge or not response:
raise ValidationError(field.gettext(self.message))
if not self._validate_recaptcha(challenge, response, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def report_event():
"""
update DB with new container task state change event
:return: str. 'true' if successful
"""
if not request.json:
logger.error('received non-json data')
abort(400)
logger.info('Received event from {}'.format(request.remote_addr))
logger.debug('Event payload {}'.format(request.json))
event_id = request.json['event_id']
event = request.json['event']
timestamp = request.json['timestamp']
db.put(str(timestamp)+"_"+str(event_id),
{'container_id': event_id, 'event_action': event, 'timestamp': timestamp},
'ecs_id_mapper_events')
return 'true'
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def make_all_files_public(bucket_id):
initSession()
config_array = {}
config_array["wait_time"] = 1
config_array["max_allowed_from_one_ip"] = 1
config_array["mode"] = 1
if session['logged_in'] or (request.remote_addr == "127.0.0.1" and can_login_local_without_auth()):
public_file_sharing_manager = OwnStorjPublicFileSharingManager()
files_manager = OwnStorjFilesManager(str(bucket_id))
files_list = files_manager.get_files_list()
for file in files_list:
if not public_file_sharing_manager.is_file_public(bucket_id=bucket_id, file_id=file["id"]):
public_file_hash = public_file_sharing_manager.generate_public_file_hash(
input_string=bucket_id + "_" + file["id"] + file["filename"] + str(file["size"]) + file["created"])
public_file_sharing_manager.save_public_file_to_db(bucket_id, file["id"], public_file_hash,
public_file_hash,
config_array, file["size"], file["filename"],
file["created"])
return "SUCCESS", 200
else:
return make_response(redirect("/login"))
def add_playlist():
initSession()
if session['logged_in'] or (request.remote_addr == "127.0.0.1" and can_login_local_without_auth()):
playlist_name = None
playlist_category = None
playlist_description = None
success = False
if request.method == 'GET':
playlist_name = request.args.get('playlist_name')
playlist_category = request.args.get('playlist_category')
playlist_description = request.args.get('playlist_description')
if playlist_name != None:
ownstorj_playlist_manager = OwnStorjplaylistManager()
ownstorj_playlist_manager.add_new_playlist(playlist_name=playlist_name,
playlist_description=playlist_description,
playlist_category=playlist_category)
return "SUCCESS", 200
else:
return make_response(redirect("/login"))
def insert_file_to_playlist_endpoint(file_local_public_hash, playlist_id):
initSession()
ownstorj_playlist_manager = OwnStorjplaylistManager()
ownstorj_public_file_sharing_manager = OwnStorjPublicFileSharingManager()
if session['logged_in'] or (request.remote_addr == "127.0.0.1" and can_login_local_without_auth()):
if file_local_public_hash != "":
public_file_details = ownstorj_public_file_sharing_manager.get_public_file_details_by_local_hash(file_local_public_hash)
ownstorj_playlist_manager.insert_track(track_name=public_file_details[0]["file_name"]
, track_local_file_id=file_local_public_hash
, playlist_id=playlist_id)
return '{result: "success"}', 200 # return the HTTP 200 statuss code - OK
else:
return '{result: "unauthorized"}', 401
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def after_request_log(response):
name = dns_resolve(request.remote_addr)
current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status}
Request: {method} {path}
Version: {http}
Status: {status}
Url: {url}
IP: {ip}
Hostname: {host}
Agent: {agent_platform} | {agent_browser} | {agent_browser_version}
Raw Agent: {agent}
""".format(method=request.method,
path=request.path,
url=request.url,
ip=request.remote_addr,
host=name if name is not None else '?',
agent_platform=request.user_agent.platform,
agent_browser=request.user_agent.browser,
agent_browser_version=request.user_agent.version,
agent=request.user_agent.string,
http=request.environ.get('SERVER_PROTOCOL'),
status=response.status))
return response
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def configuration(node=None):
'''
Retrieve an osquery configuration for a given node.
:returns: an osquery configuration file
'''
current_app.logger.info(
"%s - %s checking in to retrieve a new configuration",
request.remote_addr, node
)
config = node.get_config()
# write last_checkin,last_ip
db.session.add(node)
db.session.commit()
return jsonify(config, node_invalid=False)
def distributed_read(node=None):
'''
'''
data = request.get_json()
current_app.logger.info(
"%s - %s checking in to retrieve distributed queries", node
)
queries = node.get_new_queries()
# need to write last_checkin,last_ip,and update distributed
# query state
db.session.add(node)
db.session.commit()
return jsonify(queries=queries, node_invalid=False)
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def get_user():
"""User information.
.. note::
**Privacy note** A users IP address,user agent string,and user id
(if logged in) is sent to a message queue,where it is stored for about
5 minutes. The information is used to:
- Detect robot visits from the user agent string.
- Generate an anonymized visitor id (using a random salt per day).
- Detect the users host contry based on the IP address.
The information is then discarded.
"""
return dict(
ip_address=request.remote_addr,
user_agent=request.user_agent.string,
user_id=(
current_user.get_id() if current_user.is_authenticated else None
),
)
def downloadPic(source,id,fileName):
file, ext = os.path.splitext(fileName)
result =None
article = Article.query.get(id)
article.download_num = article.download_num+1
view = ArticleDownload()
view.article_id=id
view.ip = request.remote_addr
if current_user.is_authenticated:
view.user_id = current_user.get_id()
db.session.add(view)
db.session.commit()
result = send_from_directory('pics/'+source, file+ext)
return result
def itemDetail(id,form=None):
article = Article.query.get(id)
author = User.query.get(article.author_id)
tag = Tag.query.filter_by(article_id=id,status=STATUS_norMAL).all()
keywords = ""
for t in tag:
keywords = keywords + t.title + ','
keywords = keywords[0:-1]
article.view_num = article.view_num+1
view = ArticleView()
view.article_id=id
view.ip = request.remote_addr
isLike = False
if current_user.is_authenticated:
user_id = current_user.get_id()
view.user_id = user_id
articleLike = ArticleLike.query.filter_by(article_id=id,user_id=user_id,status=STATUS_norMAL).first()
if articleLike != None:
isLike = True
db.session.add(view)
db.session.commit()
return render_template('item.html',article=article,author=author,isLike=isLike,tags=tag,form=form,keywords=keywords)
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def login():
if request.method == 'POST':
if request.form['user'] is '' or request.form['password'] is '':
flash("Can't leave it blank",'danger')
else:
au=auth()
user=au.check(request.form['user'],request.form['password'])
if user:
login_user(user)
flash('Logged in successfully.','success')
if user.is_admin:
return redirect(url_for('admin'))
return redirect(url_for('desktops'))
else:
flash('Username not found or incorrect password.','warning')
remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr
disposables=app.isardapi.show_disposable(remote_addr)
log.info(disposables)
log.info(remote_addr)
return render_template('login_disposables.html', disposables=disposables if disposables else '')
def voucher_login():
if request.method == 'POST':
remote_addr=request.headers['X-Forwarded-For'] if 'X-Forwarded-For' in request.headers else request.remote_addr
au=auth_voucher()
if au.check_voucher(request.form['voucher']):
if au.check_user_exists(request.form['email']):
au.register_user(request.form['voucher'],request.form['email'],remote_addr)
flash('Resetting account. Email with new isard user sent to '+request.form['email']+'. Please check your email','warning')
else:
au.register_user(request.form['voucher'],remote_addr)
flash('Email with isard user sent to '+request.form['email']+'. Please check your email','success')
else:
flash('Invalid registration voucher code','danger')
disposables=False
return render_template('login.html', disposables=disposables if disposables else '')
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def game():
# Controller page
time_full = time.perf_counter()
player_ip = request.remote_addr
# To redirect players who isent in the ip list and has thearfor ni team
redirect_var = True
for i in players:
if player_ip == i.ip:
print("OK")
redirect_var = False
if redirect_var:
return redirect(url_for('index'))
team_var = get_team(player_ip)
direction_var = None
if request.method == 'POST':
# Adds a request to move the robot in the multithread queue
q.put(request.form['submit'])
print(time.perf_counter() - time_full)
return render_template('game.html',team=team_var, direction=direction_var)
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def dist_index(dist):
netbsd_logo_url = url_for('static', filename='images/netbsd.png')
if dist is None or dist == '':
dist = 'NetBSD-current'
if dist not in config.DB_PATHS and dist != 'favicon.ico':
return redirect(url_for('search'))
ip = request.remote_addr
user_agent = request.user_agent
platform = user_agent.platform
browser = user_agent.browser
version = user_agent.version
language = user_agent.language
referrer = request.referrer
dblogger.log_page_visit(1, ip, platform, browser, version, language, referrer,
int(time.time()), user_agent.string, dist)
return render_template('index.html',
netbsd_logo_url=netbsd_logo_url, distnames=distnames)
def catch_hash_page(some_hash):
if ip_in_db(request.remote_addr):
if some_hash in image_mappings:
image = 'Banned!<p></p><img src="images/{}">'\
.format(image_mappings[some_hash])
return render_template(
"index.html", content={"link": "", "text": image})
else:
return render_template(
"index.html", "text": "wrong way:("})
add_ip(request.remote_addr)
return render_template(
"index.html",
content={
"link": next_url(some_hash),
"text": "You're on the right way!"})
def monkey_patch_email_field(form_class):
""" We use our monkey patched Email validator,and also a html5 email input.
"""
from wtforms.fields.html5 import EmailField
from flask_security.forms import (email_required,
unique_user_email,
get_form_field_label)
import wtforms.validators
from pygameweb.user.rbl import rbl
def rbl_spamlist_validator(form, field):
""" If the ip address of the person signing up is listed in a spam list,
we abort with an error.
"""
remote_addr = request.remote_addr or None
if rbl(remote_addr):
abort(500)
email_validator = wtforms.validators.Email(message='INVALID_EMAIL_ADDRESS')
form_class.email = EmailField(get_form_field_label('email'),
validators=[email_required,
email_validator,
rbl_spamlist_validator,
unique_user_email])
def request_register():
username = str(request.form['username'])
requestid = str(request.form['requestid'])
client_ip = request.remote_addr
if not pivportal.security.username_is_valid(username) or not pivportal.security.requestid_is_valid(requestid) or not pivportal.security.ip_is_valid(client_ip):
# client_ip is None when testing,so its ok
return Response(response=json.dumps({"response": " invalid request"}), status=400, mimetype="application/json")
if pivportal.security.is_duplicate_register(username, requestid, redis_store.hgetall("requests")):
return Response(response=json.dumps({"response": " invalid request"}), mimetype="application/json")
this_request = {"username": username, "client_ip": client_ip, "authorized": False, "time": time.time()}
redis_store.hmset("requests", {requestid: json.dumps(this_request)})
return Response(response=json.dumps({"response": "success"}), status=200, mimetype="application/json")
def request_status():
username = str(request.form['username'])
requestid = str(request.form['requestid'])
client_ip = request.remote_addr
if not pivportal.security.username_is_valid(username) or not pivportal.security.requestid_is_valid(requestid) or not pivportal.security.ip_is_valid(client_ip):
return Response(response=json.dumps({"response": " invalid request"}), mimetype="application/json")
auth_requests = redis_store.hgetall("requests")
if requestid in auth_requests:
this_request = json.loads(auth_requests[requestid])
if this_request["username"] == username and this_request["client_ip"] == client_ip:
if this_request["authorized"] == True:
# Success
redis_store.hdel("requests", requestid)
return Response(response=json.dumps({"response": "success"}), mimetype="application/json")
else:
# Delete auth_request,it Failed anyway
redis_store.hdel("requests", requestid)
return Response(response=json.dumps({"response": "Authentication Failure"}), status=401, mimetype="application/json")
def register():
""" Register a new client """
if request.method == 'POST':
client_id = request.form.get('uuid','')
user_agent = request.form.get('user_agent','')
ip = request.remote_addr
if client_id and user_agent and ip:
print("Register: ", client_id)
c = Client(client_id, user_agent, ip)
db.session.add(c)
# add pre flight scripts
c.add_preflight()
db.session.commit()
return 'Hello {}!'.format(client_id)
else:
return 'UUID is not present'
def before_request(self,*args):
"""Verfies that the API is Vaild for the correct user
and the IP address hasn't changed since log in"""
signer = TimestampSigner(SECRET_KEY)
api_key = request.headers['API_KEY']
Client_id = request.headers['Client_ID']
ip_address = request.remote_addr
user = User.query.filter_by(Client_id=Client_id).first()
if user == None:
return make_response(jsonify({'Failure': 'Invaild User'}), 400)
if api_key != user.api_key:
return make_response(jsonify({'Failure': 'Incorrect API Key'}), 400)
if ip_address != user.current_login_ip:
return make_response(jsonify({'Failure': 'Incorrect IP for Client,Please Re-login in'}), 400)
try:
signer.unsign(api_key, max_age=86164)
except:
return make_response(jsonify({'Failure': 'Invaild API Key,please request new API Key'}), 400)
def route():
args = request.json
args = args if args else {}
cfg = args.get('cfg', None)
log_request(
args.get('user', 'unkNown'),
args.get('hostname',
request.method,
request.path,
args)
try:
endpoint = create_endpoint(request.method, cfg, args)
json, status = endpoint.execute()
except AutocertError as ae:
status = 500
json = dict(errors={ae.name: ae.message})
return make_response(jsonify(json), status)
if not json:
raise EmptyJsonError(json)
return make_response(jsonify(json), status)
def main():
iplow = ip2long('192.30.252.0')
iphigh = ip2long('192.30.255.255')
if request.remote_addr in range(iplow, iphigh):
payload = request.get_json()
if payload["repository"]["name"] == "Python-IRC-Bot":
try:
subprocess.check_call(["git", "pull"])
except subprocess.CalledProcessError:
irc.privmsg("##wolfy1339", "git pull Failed!")
else:
if "handlers.py" in payload['head_commit']['modified']:
reload_handlers(bot)
return flask.Response("Thanks.", mimetype="text/plain")
return flask.Response("Wrong repo.", mimetype="text/plain")
else:
flask.abort(403)
def __call__(self, remote_ip):
field.recaptcha_error = 'incorrect-captcha-sol'
raise ValidationError(field.gettext(self.message))
def app_getinfo(ver):
""" Returns Flask API Info """
response = dict()
response['api_version'] = ver
response['message'] = "Flask API Data"
response['status'] = "200"
response['method'] = request.method
response['path'] = request.path
response['remote_addr'] = request.remote_addr
response['user_agent'] = request.headers.get('User-Agent')
# GET attributes
for key in request.args:
response['GET ' + key] = request.args.get(key, '')
# POST Attributes
for key in request.form.keys():
response['POST ' + key] = request.form[key]
return jsonify(response)
def app_getinfo(ver):
""" Returns Flask API Info """
response = dict()
response['api_version'] = ver
response['message'] = "Flask API Data"
response['status'] = "200"
response['method'] = request.method
response['path'] = request.path
response['remote_addr'] = request.remote_addr
response['user_agent'] = request.headers.get('User-Agent')
# GET attributes
for key in request.args:
response['GET ' + key] = request.args.get(key, '')
# POST Attributes
for key in request.form.keys():
response['POST ' + key] = request.form[key]
return jsonify(response)