Python flask 模块,jsonify() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.jsonify()。
def index():
code = request.args.get("code", "")
app.logger.debug("code: %s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data.get('access_token')
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html',userData=userData)
#resp.set_cookie(key="logged_in",value='true',expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
def omw_lang_selector():
selected_lang = request.cookies.get('selected_lang')
selected_lang2 = request.cookies.get('selected_lang2')
lang_id, lang_code = fetch_langs()
html = '<select name="lang" style="font-size: 85%; width: 9em" required>'
for lid in lang_id.keys():
if selected_lang == str(lid):
html += """<option value="{}" selected>{}</option>
""".format(lid, lang_id[lid][1])
else:
html += """<option value="{}">{}</option>
""".format(lid, lang_id[lid][1])
html += '</select>'
html += '<select name="lang2" style="font-size: 85%; width: 9em" required>'
for lid in lang_id.keys():
if selected_lang2 == str(lid):
html += """<option value="{}" selected>{}</option>
""".format(lid, lang_id[lid][1])
html += '</select>'
return jsonify(result=html)
def Nowplaying():
username = "@" + request.form.get('user_name', None)
user_id = request.form.get('user_id', None)
lastfm_user = request.form.get('text', None)
if lastfm_user == "" or lastfm_user == None:
user = User.query.filter_by(user_id=user_id).first()
if user == None:
return "Please enter a valid username", 200
lastfm_user = user.lastfm
if lastfm_user == None:
return "Please enter a valid username", 200
np = lastfm_network.get_user(lastfm_user).get_Now_playing()
if np == None:
return "No song playing.", 200
payload = {
"response_type": "in_channel",
"text": "<http://last.fm/user/" + lastfm_user + "|" + lastfm_user + "> is currently listening to <" + np.get_url() + "|" + np.artist.name + " - " + np.title + ">",
"unfurl_links": False
}
return jsonify(payload), 200
def get_chapter(book_id):
# chapters = Chapter.query.filter_by(book_id=book_id).all()
page = request.args.get('page', 1, type=int)
pagination = Chapter.query.filter_by(book_id=book_id).paginate(
page, per_page=current_app.config['CHAPTER_PER_PAGE'],
error_out=False
)
chapters = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_chapter', book_id=book_id, page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_chapter', page=page+1, _external=True)
return jsonify({
'chapters': [chapter.to_json() for chapter in chapters],
'prev': prev,
'next': next
})
def is_hot_dog():
if request.method == 'POST':
if not 'file' in request.files:
return jsonify({'error': 'no file'}), 400
# Image info
img_file = request.files.get('file')
img_name = img_file.filename
mimetype = img_file.content_type
# Return an error if not a valid mimetype
if not mimetype in valid_mimetypes:
return jsonify({'error': 'bad-type'})
# Write image to static directory and do the hot dog check
img_file.save(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
hot_dog_conf = rekognizer.get_confidence(img_name)
# Delete image when done with analysis
os.remove(os.path.join(app.config['UPLOAD_FOLDER'], img_name))
is_hot_dog = 'false' if hot_dog_conf == 0 else 'true'
return_packet = {
'is_hot_dog': is_hot_dog,
'confidence': hot_dog_conf
}
return jsonify(return_packet)
def create():
form = UserCreateForm.from_json(request.get_json())
if not form.validate():
return jsonify(form.errors), 400
user = User()
user.email = form.data.get('email')
user.first_name = form.data.get('first_name')
user.last_name = form.data.get('last_name')
user.avatar = form.data.get('avatar', None)
user.password = User.make_password(form.data.get('password'))
user.save()
access_token = jwt.jwt_encode_callback(user)
return jsonify({'user': user.serialize(), 'access_token': access_token.decode('utf-8')}), 200
def login():
data = request.get_json()
username = data.get(current_app.config.get('JWT_AUTH_USERNAME_KEY'), None)
password = data.get(current_app.config.get('JWT_AUTH_PASSWORD_KEY'), None)
criterion = [username, password, len(data) == 2]
if not all(criterion):
return jsonify({'message': 'Invalid credentials'}), 401
user = jwt.authentication_callback(username, password)
if user:
if not user.is_active:
return jsonify({'message': 'InActive User'}), 401
access_token = jwt.jwt_encode_callback(user)
return jsonify({'user': user.serialize(), 200
else:
return jsonify({'message': 'Invalid credentials'}), 401
def rundeck_list_groups():
'''
Return the list of groups from all available profiles
'''
resp_obj = {}
awsconfig = aws_config.AwsConfig()
profiles = awsconfig.get_profiles()
for profile in profiles:
session = boto3.Session(profile_name=profile)
iamclient = session.client('iam')
try:
groupinfo = iamclient.list_groups()
except botocore.exceptions.ClientError:
groupinfo['Groups'] = []
for group in groupinfo['Groups']:
grouptext = "(%s) %s" % (profile, group['GroupName'])
resp_obj[grouptext] = group['GroupName']
return jsonify(resp_obj)
def rundeck_list_iam_policies():
'''
Return the list of profiles from all available profiles
'''
resp_obj = {}
awsconfig = aws_config.AwsConfig()
profiles = awsconfig.get_profiles()
for profile in profiles:
session = boto3.Session(profile_name=profile)
iamclient = session.client('iam')
try:
policyinfo = iamclient.list_policies()
except botocore.exceptions.ClientError:
policyinfo['Policies'] = []
for policy in policyinfo['Policies']:
policytext = "(%s) %s" % (profile, policy['PolicyName'])
resp_obj[policytext] = policy['PolicyName']
return jsonify(resp_obj)
def detailed_id():
ili_id = request.args.get('ili_id', None)
rate_hist = fetch_rate_id([ili_id])
comm_hist = fetch_comment_id([ili_id])
users = fetch_allusers()
r_html = ""
for r, u, t in rate_hist[int(ili_id)]:
r_html += '{} ({}): {} <br>'.format(users[u]['userID'], t, r)
c_html = ""
for c, t in comm_hist[int(ili_id)]:
c_html += '{} ({}): {} <br>'.format(users[u]['userID'], c)
html = """
<td colspan="9">
<div style="width: 49%; float:left;">
<h6>ratings</h6>
{}</div>
<div style="width: 49%; float:right;">
<h6>Comments</h6>
{}</div>
</td>""".format(r_html, c_html)
return jsonify(result=html)
def min_omw_concepts(ss=None, ili_id=None):
if ili_id:
ss_ids = f_ss_id_by_ili_id(ili_id)
else:
ss_ids = [ss]
pos = fetch_pos()
langs_id, langs_code = fetch_langs()
ss, senses, defs, exes, links = fetch_ss_basic(ss_ids)
ssrels = fetch_ssrel()
return jsonify(result=render_template('min_omw_concept.html',
pos = pos,
langs = langs_id,
senses=senses,
ss=ss,
links=links,
ssrels=ssrels,
defs=defs,
exes=exes))
def network(interface=None):
if interface:
try:
netifaces.ifaddresses(interface)
interfaces = [interface]
except ValueError:
return jsonify({"message": "interface {} not available".format(interface)}), 404
else:
interfaces = netifaces.interfaces()
data = dict()
for i in interfaces:
try:
data[i] = netifaces.ifaddresses(i)[2]
except KeyError:
data[i] = {"message": "AF_INET data missing"}
return jsonify(data)
def run_selected_case():
# return jsonify(dict(name='selenium'))
data = request.get_json()
start = handle.Now_str()
# ???mongo??case??
db = MongoClient()
case_list = db.get_case_by_name(data.get('case_name'))
obj = apiFunc()
rt = obj.run_tests(case_list, app.config['THREAD_NUM'])
report = obj.writeReport(rt)
html = render_template('testResult.html',failNum=rt['Failed_num'], ignored=rt['ignored'],
successNum=rt['success_num'], total=rt['total'], start=start,
end=handle.Now_str(), cost="{:.2}?".format(handle.delta(start, handle.Now_str())),
fileName=report)
return jsonify(dict(html=html))
def taskstatus(task_id):
task = analyzetweets.AsyncResult(str(task_id))
if task.state == "PENDING":
response = {
"state": task.state,
"current": 0,
"total": NUMBER_OF_TWEETS
}
elif task.state != "FAILURE":
response = {
"state": task.state,
"current": task.info.get("current", 0),
"total": NUMBER_OF_TWEETS
}
if "subjectivityavg" in task.info:
response["subjectivityavg"] = task.info["subjectivityavg"]
if "sentimentavg" in task.info:
response["sentimentavg"] = task.info["sentimentavg"]
else:
response = {
"state": task.state,
"current": 1,
"total": NUMBER_OF_TWEETS
}
return jsonify(response)
def login():
"""Login POST handler.
Only runs when ``/login`` is hit with a POST method. There is no GET method
equivilent,as it is handled by the navigation template. Sets the status
code to ``401`` on login failure.
Returns:
JSON formatted output describing success or failure.
"""
log.debug("Entering login,attempting to authenticate user.")
username = request.form['signin_username']
password = request.form['signin_password']
log.debug("Username: {0}".format(username))
if fe.check_auth(username, password):
log.debug("User authenticated. Trying to set session.")
session_id = fe.set_login_id()
session['logged_in'] = session_id
log.debug("Session ID: {0},returning to user".format(session_id))
return jsonify({ "login": "success" })
log.debug("Username or password not recognized,sending 401.")
response.status = 401
return jsonify({ "login": "Failed" })
def star():
"""Starring/Highlighting handler.
Attempts to toggle a star/highlight on a particular show. The show ID must
be passed in the ``id`` query string. If the user is unauthenticated,the
function is aborted with a ``404`` message to hide the page.
Returns:
JSON formatted output describing success and the ID of the show starred.
"""
log.debug("Entering star,trying to toggle star.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("Sending show ID {0} to function".format(request.args['id']))
fe.star_show(request.args['id'])
log.debug("Returning to user.")
return jsonify({ "star": "success", "id": request.args['id'] })
log.debug("User cannot be authenticated,send 404 to hide page.")
abort(404)
def scan_scrapers():
"""On demand scrapper scanning handler.
For some reason the scheduler doesn't always work,this endpoint allows for
instant scanning,assuming it's not already occurring. The function is aborted
with a ``404`` message to hide the page if the user is not authenticated.
Scanning can take a long time - 20 to 30 minutes - so it's recommended this
endpoint be called asynchronously.
Returns:
JSON formatted output to identify that scanning has completed or is already
ongoing.
"""
log.debug("Entering scan_scrapers.")
if fe.check_login_id(escape(session['logged_in'])):
log.debug("User is logged in,attempting to begin scan.")
if not fe.scrape_shows():
log.debug("scrape_shows returned false,either the lockfile exists incorrectly or scraping is ongoing.")
return jsonify({"scan":"failure", "reason":"A scan is ongoing"})
log.debug("scrape_shows just returned. Returning success.")
return jsonify({"scan":"success"})
log.debug("User cannot be authenticated,send 404 to hide page.")
abort(404)
def get_role_credentials(api_version, requested_role, junk=None):
try:
role_params = roles.get_role_params_from_ip(
request.remote_addr,
requested_role=requested_role
)
except roles.UnexpectedRoleError:
return '', 403
try:
assumed_role = roles.get_assumed_role_credentials(
role_params=role_params,
api_version=api_version
)
except roles.GetRoleError as e:
return '', e.args[0][0]
return jsonify(assumed_role)
def get_instance_identity_document(api_version):
time_format = "%Y-%m-%dT%H:%M:%sZ"
Now = datetime.datetime.Now(dateutil.tz.tzutc())
ret = {
'privateIp': '127.255.0.1',
'devpayProductCodes': None,
'availabilityZone': 'us-east-1a',
'version': '2010-08-31',
'accountId': '1234',
'instanceId': 'i-{0}'.format(app.config['MOCKED_INSTANCE_ID']),
'billingProducts': None,
'instanceType': 't2.medium',
# This may be a terrible mock for this...
'pendingTime': Now.strftime(time_format),
'imageId': 'ami-1234',
'kernelId': None,
'ramdiskId': None,
'architecture': 'x86_64',
'region': 'us-east-1'
}
return jsonify(ret)
def iam_sts_credentials(api_version, junk=None):
if not _supports_iam(api_version):
return passthrough(request.path)
try:
role_params = roles.get_role_params_from_ip(
request.remote_addr,
requested_role=requested_role
)
except roles.UnexpectedRoleError:
msg = "Role name {0} doesn't match expected role for container"
log.error(msg.format(requested_role))
return '', 404
log.debug('Providing assumed role credentials for {0}'.format(role_params['name']))
assumed_role = roles.get_assumed_role_credentials(
role_params=role_params,
api_version=api_version
)
return jsonify(assumed_role)
def post(self, params=None):
user = SharedSessionMaster.new(
alias=params['client_alias']
)
secret = SharedSessionSecret.new(
shares=params['session_policies']['shares'],
quorum=params['session_policies']['quorum'],
protocol=params['session_policies']['protocol']
)
session = Combinesession.new(
session_id=params.get('session_id'),
master=user,
alias=params['session_alias'],
session_type=params['session_type'],
secret=secret
).store()
return flask.jsonify(
{
"session": session.to_api(auth=user.uuid),
"session_id": str(session.uuid)
}
)
def put(self, session_id, params=None):
session = Combinesession.get(session_id)
if not session:
raise exceptions.ObjectNotFoundException
user = params.get('auth') and session.get_user(params['auth'], alias=str(params['client_alias'])) \
or session.join(params['client_alias'])
if not user:
raise exceptions.ObjectDeniedException
user.session = session
if params.get('share'):
session.secret.add_share(Share(params['share'], str(user.uuid)))
session.update()
return flask.jsonify(
{
"session": session.to_api(auth=str(user.uuid)),
"session_id": str(session.uuid)
}
)
def get(self, params=None):
session = SplitSession.get(session_id, auth=params['auth'])
if not session:
raise exceptions.ObjectNotFoundException
if not session.ttl:
raise exceptions.ObjectExpiredException
if session.current_user.is_shareholder and session.secret.splitted and not \
session.secret.user_have_share(session.current_user):
session.current_user.secret.attach_user_to_share(session.current_user)
session.update()
return flask.jsonify(
{
"session": session.to_api(auth=params['auth']),
"session_id": str(session.uuid)
}
)
def upload():
starttime = time.time()
groupid = None
if request.args.get('groupid'):
groupid = request.args.get('groupid')
else:
groupid = (int(time.time()) * 1000) + random.randint(0,999)
f = request.files.getlist('vidfiles')
savelocation = './videos/{0}'.format(groupid)
if not os.path.exists(savelocation):
os.makedirs(savelocation)
for file in f:
file.save(os.path.join(savelocation,file.filename.lower()))
endtime = time.time()
totaltime = endtime-starttime
thread = threading.Thread(target=video_processing, args=(str(groupid),))
thread.start()
return jsonify(groupid=groupid, time=totaltime)
def get(self, testcase_name): # pylint: disable=no-self-use
""" GET the info of one testcase"""
testcase = Testcase().show(testcase_name)
if not testcase:
return api_utils.result_handler(
status=1,
data="The test case '%s' does not exist or is not supported"
% testcase_name)
testcase_info = api_utils.change_obj_to_dict(testcase)
dependency_dict = api_utils.change_obj_to_dict(
testcase_info.get('dependency'))
testcase_info.pop('name')
testcase_info.pop('dependency')
result = {'testcase': testcase_name}
result.update(testcase_info)
result.update({'dependency': dependency_dict})
return jsonify(result)
def post(self):
check_user = check_login()
if check_user is None:
return jsonify({'status': 'error', 'msg': 'no authority'})
if check_user == -1:
return jsonify({'status': 'error', 'msg': 'user is valid'})
user_id = session.get('user_id')
counts = db.session.query(Message).filter(Message.to_id == user_id).count()
page = int(request.form.get('page', 0))
nums = (page - 1 if page >= 0 else 0) * 5
msgs = db.session.query(Message).filter(Message.to_id == user_id).order_by(Message.id.desc())[nums: nums + 5]
all_page = int(counts / 5) + (1 if counts % 5 != 0 else 0)
return jsonify({'status': 'ok', 'data': [msg.to_json() for msg in msgs], 'len': len(msgs), 'Now': page,
'all': all_page})
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def run_cmd():
"""Execute general (non-host specific) command."""
#Read 'command' argument from query string.
comm = request.args.get('command')
if not comm or comm in EXCLUDED_FUNCTIONS:
abort(400)
try:
#Read 'args' argument from query string.
args = request.args.getlist('args')
#Cast arguments to string.
for i, val in enumerate(args):
args[i] = str(val)
except KeyError:
args = []
#Execute command.
ret = RSPET_API.call_plugin(comm, args)
if ret['code'] == rspet_server.ReturnCodes.OK:
http_code = 200
else:
http_code = 404
return make_response(jsonify(ret), http_code)
def general_help():
"""Return general help."""
hlp_sntx = RSPET_API.help()
#Read 'state' argument from query string.
cur_state = request.args.get('state')
#Remove excluded functions.
for hlp in EXCLUDED_FUNCTIONS:
if hlp in hlp_sntx:
hlp_sntx.pop(hlp)
#Remove out of scope functions.
tmp = {}
for hlp in hlp_sntx:
if cur_state in hlp_sntx[hlp]['states']:
tmp[hlp] = hlp_sntx[hlp]
return jsonify({'commands': [make_public_help(command, hlp_sntx[command])\
for command in tmp]})
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
app.logger.debug(_data)
access_token = _data['access_token']
uid = _data['uid']
userData = Get_User_Info(access_token, uid)
app.logger.debug(userData)
#resp = render_template('info.html', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
openid = Get_OpenID(access_token)['openid']
userData = Get_User_Info(access_token, openid)
app.logger.debug(userData)
#resp = render_template('info.html', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", expires=None)
return resp
else:
return redirect(url_for("login"))
def upload_music():
name = request.form.get('name')
author = request.form.get('author')
authorID = int(request.form.get('authorID'))
date = request.form.get('date')
music_name = request.form.get('musicName')
img_name = request.form.get('imgName')
info = request.form.get('musicInfo')
author_avatar = request.form.get('authorAvatarName')
upload_result = db.music_insert(name, author, authorID, author_avatar, date, music_name, img_name, info)
if upload_result == 0:
dic = {"error": 0, "msg": "OK"}
return jsonify(dic), 200
elif upload_result == 1:
dic = {"error": 21, "msg": "Exist the same name"}
return jsonify(dic), 409
def predict():
import ipdb; ipdb.set_trace(context=20)
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
try:
pokemon_name = predict_mlp(file).capitalize()
pokemon_desc = pokemon_entries.get(pokemon_name)
msg = ""
except Exception as e:
pokemon_name = None
pokemon_desc = None
msg = str(e)
return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def get_concepts():
keywords = request.args.get('keywords', None)
semanticGroups = request.args.get('semanticGroups', None)
pageSize = int(request.args.get('pageSize', 1))
pageNumber = int(request.args.get('pageNumber', 1))
validatePagination(pageSize, pageNumber)
validateKeywords(keywords)
q = GolrsearchQuery(
term=keywords,
category=build_categories(semanticGroups),
rows=pageSize,
start=getStartIndex(pageNumber, pageSize)
)
results = q.exec()
concepts = []
for d in results['docs']:
concept = parse_concept(d)
concepts.append(concept)
return jsonify(concepts)
def get_types():
frequency = {semanticGroup : 0 for semanticGroup in semantic_mapping.keys()}
results = GolrAssociationQuery(
rows=0,
facet_fields=['subject_category', 'object_category']
).exec()
facet_counts = results['facet_counts']
subject_category = facet_counts['subject_category']
object_category = facet_counts['object_category']
for key in subject_category:
frequency[monarch_to_UMLS(key)] += subject_category[key]
for key in object_category:
frequency[monarch_to_UMLS(key)] += object_category[key]
return jsonify([{'id' : c, 'idmap' : None, 'frequency' : f} for c, f in frequency.items()])
def get_predicates():
"""
I'm not quite sure how to best get at all the predicates and tag them as relations with id's
"""
"""
results = GolrAssociationQuery(
rows=0,
facet_fields=['relation']
).exec()
facet_counts = results['facet_counts']
relations = facet_counts['relation']
return jsonify([{'id' : "biolink:"+c,'name' : c,'deFinition' : None} for key in relations])
"""
# Not yet implemented... don't really kNow how
return jsonify([])
def make_json_app(import_name, **kwargs):
"""
Creates a JSON-oriented Flask app.
All error responses that you don't specifically
manage yourself will have application/json content
type,and will contain JSON like this (just an example):
{ "message": "405: Method Not Allowed" }
"""
def make_json_error(ex):
response = jsonify(message=str(ex))
response.status_code = (ex.code if isinstance(ex, HTTPException) else 500)
return response
app = Flask(import_name, **kwargs)
for code in default_exceptions.iterkeys():
app.register_error_handler(code, make_json_error)
return app
def messages(room_id):
room_member = RoomMember.where('room_id', room_id).where('user_id', g.user['id']).first()
if not room_member:
return jsonify({'message': "UnkNown Room"}), 400
room = Room.where('id', room_id).with_('members.user').first().serialize()
for item in room['members']:
item['user']['username'] = '%s %s' % (item['user']['first_name'],item['user']['last_name'])
messages = Message.select('messages.*', 'u.first_name', 'u.last_name', 'u.avatar') \
.where('room_id', room_id) \
.join('users as u', 'u.id', '=', 'messages.sender_id') \
.order_by('created_at', 'desc') \
.limit(100) \
.get()
return jsonify({'room': room, 'messages': messages.serialize()}), 200
def s_connect():
g.user = get_jwt_user()
if not g.user:
return jsonify({}), 403
# get user rooms
my_rooms = RoomMember.select('room_id').where('user_id', g.user['id']).group_by('room_id').get()
for room in my_rooms:
join_room('room-%s' % room.room_id)
connected_users.append({
'id': g.user['id'],
'sid': request.sid
})
socketio.emit('user_connect',{'id':g.user['id']})
def do_summary(self, *args, **kwargs):
return flask.jsonify(self.df.describe().to_dict())
def _process_json(self, df, **kwargs):
time_frame = None if not kwargs['time_frame'] else kwargs['time_frame']
from_time = None if not kwargs['from_time'] else kwargs['from_time']
to_time = None if not kwargs['to_time'] else kwargs['to_time']
if time_frame and time_frame != 'custom':
from_time, to_time = self._time_frame(time_frame)
if from_time or to_time:
df = df[from_time:to_time]
if kwargs['columns_data']:
return flask.jsonify(self.columns_data(df))
j = df.to_json(date_format='iso', orient=kwargs['orient'], double_precision=2, date_unit='s')
return flask.jsonify(json.loads(j))
def do_describe(self, **kwargs):
ret = {'csv_file': self.handler.data_file,
'last_update': self.last_update,
'samples': len(self.df),
'auto_refresh': self.auto_refresh,
'commands': [x.split('do_')[-1].replace('_', '-') for x in dir(self) if 'do_' in x],
'status': self._status()}
return flask.jsonify(ret)
def do_describe(self,
'started': self.started,
'status': self._status()}
return flask.jsonify(ret)
def index():
'''
Default path to orca server. List all the services available
'''
services = ['s3', 'iam', 'ec2']
resp_obj = {}
resp_obj['services'] = services
return jsonify(resp_obj)
def rundeck_list_resources():
'''
Return a list of S3 and EC2 Resources from all available profiles
'''
resp_obj = {}
awsconfig = aws_config.AwsConfig()
profiles = awsconfig.get_profiles()
# Populate s3 buckets.
for profile in profiles:
session = boto3.Session(profile_name=profile)
s3client = session.client('s3')
try:
s3info = s3client.list_buckets()
except botocore.exceptions.ClientError:
s3info['Buckets'] = []
for bucket in s3info['Buckets']:
bucket_text = "s3: (%s) %s" % (profile, bucket['Name'])
resp_obj[bucket_text] = bucket['Name']
# Populate ec2 instances.
for profile in profiles:
session = boto3.Session(profile_name=profile)
ec2client = session.client('ec2', region_name="us-east-1")
try:
ec2info = ec2client.describe_instances()
except botocore.exceptions.ClientError:
ec2info['Instances'] = []
for reservation in ec2info['Reservations']:
for instance in reservation['Instances']:
instance_text = "ec2: (%s) %s" % \
(profile, instance['InstanceId'])
resp_obj[instance_text] = instance['InstanceId']
return jsonify(resp_obj)
def list_groups(profile):
'''
Return all the groups.
'''
resp_obj = {}
resp_obj['status'] = 'OK'
awsconfig = aws_config.AwsConfig()
profiles = awsconfig.get_profiles()
profile_valid = False
for configuredprofile in profiles:
if profile == configuredprofile:
profile_valid = True
if not profile_valid:
resp_obj['status'] = 'FAIL'
return jsonify(resp_obj)
session = boto3.Session(profile_name=profile)
iamclient = session.client('iam')
try:
groupinfo = iamclient.list_groups()
except botocore.exceptions.ClientError:
groupinfo['Groups'] = []
groups = []
for group in groupinfo['Groups']:
groups.append(group['GroupName'])
resp_obj['groups'] = groups
return jsonify(resp_obj)
def custResponse(code=404, message="Error: Not Found", data=None):
message = {
"status": code,
"message": message
}
if data:
for k, v in data.items():
message[k] = v
resp = jsonify(message)
resp.status_code = code
return resp