Python flask 模块,url_for() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.url_for()。
def index():
code = request.args.get("code", "")
app.logger.debug("code: %s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data.get('access_token')
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html',userData=userData)
#resp.set_cookie(key="logged_in",value='true',expires=None)
resp = jsonify(userData)
resp.set_cookie(key="logged_in", value='true', expires=None)
return resp
else:
return redirect(url_for("login"))
def twittercallback():
verification = request.args["oauth_verifier"]
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
try:
auth.request_token = session["request_token"]
except KeyError:
flash("Please login again", "danger")
return redirect(url_for("bp.home"))
try:
auth.get_access_token(verification)
except tweepy.TweepError:
flash("Failed to get access token", "danger")
return redirect(url_for("bp.home"))
session["access_token"] = auth.access_token
session["access_token_secret"] = auth.access_token_secret
return render_template("twittercallback.html", form=HashtagForm())
def test_inject_blueprint_url_defaults(self):
app = flask.Flask(__name__)
bp = flask.Blueprint('foo.bar.baz', __name__,
template_folder='template')
@bp.url_defaults
def bp_defaults(endpoint, values):
values['page'] = 'login'
@bp.route('/<page>')
def view(page): pass
app.register_blueprint(bp)
values = dict()
app.inject_url_defaults('foo.bar.baz.view', values)
expected = dict(page='login')
self.assert_equal(values, expected)
with app.test_request_context('/somepage'):
url = flask.url_for('foo.bar.baz.view')
expected = '/login'
self.assert_equal(url, expected)
def get_chapter(book_id):
# chapters = Chapter.query.filter_by(book_id=book_id).all()
page = request.args.get('page', 1, type=int)
pagination = Chapter.query.filter_by(book_id=book_id).paginate(
page, per_page=current_app.config['CHAPTER_PER_PAGE'],
error_out=False
)
chapters = pagination.items
prev = None
if pagination.has_prev:
prev = url_for('api.get_chapter', book_id=book_id, page=page-1, _external=True)
next = None
if pagination.has_next:
next = url_for('api.get_chapter', page=page+1, _external=True)
return jsonify({
'chapters': [chapter.to_json() for chapter in chapters],
'prev': prev,
'next': next
})
def get(self):
if request.cookies.get('save_id'):
resp = make_response(redirect(url_for('.exit')))
resp.set_cookie('user_name', expires=0)
resp.set_cookie('login_time', expires=0)
resp.set_cookie('save_id', expires=0)
return resp
if session.get('name'):
session.pop('name')
if session.get('show_name'):
session.pop('show_name')
if session.get('user_id'):
session.pop('user_id')
return redirect(url_for('.login'))
# ?config.json ???? is_register ?false??????? ??????????????
def get_zip(self, project, ty):
"""Get a ZIP file directly from uploaded directory
or generate one on the fly and upload it if not existing."""
filename = self.download_name(project, ty)
if not self.zip_existing(project, ty):
print "Warning: Generating %s on the fly Now!" % filename
self._make_zip(project, ty)
if isinstance(uploader, local.LocalUploader):
filepath = self._download_path(project)
res = send_file(filename_or_fp=safe_join(filepath, filename),
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=filename)
# fail safe mode for more encoded filenames.
# It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
# res.headers['Content-disposition'] = 'attachment; filename*=%s' % filename
return res
else:
return redirect(url_for('rackspace', filename=filename,
container=self._container(project),
_external=True))
def build_sitemap():
from redBerry.models import RedPost, RedCategory
from apesmit import Sitemap
sm = Sitemap(changefreq='weekly')
for post in RedPost.all_published():
sm.add(url_for('redBerry.show_post', slug=post.slug, _external=True), lastmod=post.updated_at.date())
for category in RedCategory.query.all():
sm.add(url_for('redBerry.show_category', category_slug=category.slug, lastmod=category.updated_at.date())
with open(os.path.join(REDBerry_ROOT, 'static', 'redBerry', 'sitemap.xml'), 'w') as f:
sm.write(f)
flash("Sitemap created.", 'success')
return redirect(url_for('redBerry.home'))
##############
# ADMIN ROUTES
##############
def index():
form = NameForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.name.data).first()
if user is None:
user = User(username=form.name.data)
db.session.add(user)
session['kNown'] = False
if app.config['FLASKY_ADMIN']:
send_email(app.config['FLASKY_ADMIN'], 'New User',
'mail/new_user', user=user)
else:
session['kNown'] = True
session['name'] = form.name.data
from.name.data = ''
return redirect(url_for('index'))
return render_template('index.html', form=form, name=session.get('name'),
kNown=session.get('kNown', False))
def login():
""" This login function checks if the username & password
match the admin.db; if the authentication is successful,
it passes the id of the user into login_user() """
if request.method == "POST" and \
"username" in request.form and \
"password" in request.form:
username = request.form["username"]
password = request.form["password"]
user = User.get(username)
# If we found a user based on username then compare that the submitted
# password matches the password in the database. The password is stored
# is a slated hash format,so you must hash the password before comparing it.
if user and hash_pass(password) == user.password:
login_user(user, remember=True)
# FIXME! Get this to work properly...
# return redirect(request.args.get("next") or url_for("index"))
return redirect(url_for("index"))
else:
flash(u"Invalid username,please try again.")
return render_template("login.html")
def changepass():
if request.method == 'POST':
# process password change
if request.form['pass1'] == request.form['pass2']:
change_password(session['username'], request.form['pass1'])
log_action(session['uid'], 8)
session.pop('logged_in', None)
session.pop('uid', None)
session.pop('priv', None)
session.pop('username', None)
flash('Your password has been changed. Please login using your new password.')
return redirect(url_for('home'))
else:
flash('The passwords you entered do not match. Please try again.')
return render_template('changepass.html')
return render_template('changepass.html')
#
# EDIT USER PAGE
#
def save_config(self):
if not self.is_authenticated():
return redirect(url_for('login'))
if (config['CONfig_PATH'] is not None and
os.path.isfile(config['CONfig_PATH'])):
config_path = config['CONfig_PATH']
else:
config_path = os.path.join(config['ROOT_PATH'], 'config.json')
with open(config_path, 'w') as f:
data = {'GOOGLEMAPS_KEY': config['GOOGLEMAPS_KEY'],
'LOCALE': config['LOCALE'],
'CONfig_PASSWORD': config['CONfig_PASSWORD'],
'SCAN_LOCATIONS': self.scan_config.SCAN_LOCATIONS.values(),
'ACCOUNTS': config['ACCOUNTS']}
f.write(json.dumps(data))
def article():
site_info = site_get()
article_id = request.args.get('article_id',0)
if article_id != 0:
article = Article.query.filter_by(id = article_id).first()
if article is not None:
article = article.__dict__
article_id = article['id']
title = article['title']
packet_id = article['packet_id']
show = article['show']
timestamp = article['timestamp']
body = article['body'][:-1]
else:
return redirect(url_for('main.index'))
return render_template('article.html', **locals())
def test_url_with_method(self):
from flask.views import MethodView
app = flask.Flask(__name__)
class MyView(MethodView):
def get(self, id=None):
if id is None:
return 'List'
return 'Get %d' % id
def post(self):
return 'Create'
myview = MyView.as_view('myview')
app.add_url_rule('/myview/', methods=['GET'],
view_func=myview)
app.add_url_rule('/myview/<int:id>',
view_func=myview)
app.add_url_rule('/myview/create', methods=['POST'],
view_func=myview)
with app.test_request_context():
self.assert_equal(flask.url_for('myview', _method='GET'),
'/myview/')
self.assert_equal(flask.url_for('myview', id=42,
'/myview/42')
self.assert_equal(flask.url_for('myview', _method='POST'),
'/myview/create')
def test_dotted_names(self):
frontend = flask.Blueprint('myapp.frontend', __name__)
backend = flask.Blueprint('myapp.backend', __name__)
@frontend.route('/fe')
def frontend_index():
return flask.url_for('myapp.backend.backend_index')
@frontend.route('/fe2')
def frontend_page2():
return flask.url_for('.frontend_index')
@backend.route('/be')
def backend_index():
return flask.url_for('myapp.frontend.frontend_index')
app = flask.Flask(__name__)
app.register_blueprint(frontend)
app.register_blueprint(backend)
c = app.test_client()
self.assert_equal(c.get('/fe').data.strip(), b'/be')
self.assert_equal(c.get('/fe2').data.strip(), b'/fe')
self.assert_equal(c.get('/be').data.strip(), b'/fe')
def test_build_error_handler(self):
app = flask.Flask(__name__)
# Test base case,a URL which results in a BuildError.
with app.test_request_context():
self.assertRaises(BuildError, flask.url_for, 'spam')
# Verify the error is re-raised if not the current exception.
try:
with app.test_request_context():
flask.url_for('spam')
except BuildError as err:
error = err
try:
raise RuntimeError('Test case where BuildError is not current.')
except RuntimeError:
self.assertRaises(BuildError, app.handle_url_build_error, error, 'spam', {})
# Test a custom handler.
def handler(error, endpoint, values):
# Just a test.
return '/test_handler/'
app.url_build_error_handlers.append(handler)
with app.test_request_context():
self.assert_equal(flask.url_for('spam'), '/test_handler/')
def test_aborting(self):
class Foo(Exception):
whatever = 42
app = flask.Flask(__name__)
app.testing = True
@app.errorhandler(Foo)
def handle_foo(e):
return str(e.whatever)
@app.route('/')
def index():
raise flask.abort(flask.redirect(flask.url_for('test')))
@app.route('/test')
def test():
raise Foo()
with app.test_client() as c:
rv = c.get('/')
self.assertEqual(rv.headers['Location'], 'http://localhost/test')
rv = c.get('/test')
self.assertEqual(rv.data, b'42')
def signup():
from forms import SignupForm
form = SignupForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data.lower()).first()
if user is not None:
form.email.errors.append("The Email address is already taken.")
return render_template('signup.html', form=form)
newuser = User(form.firstname.data,form.lastname.data,form.email.data,form.password.data)
db.session.add(newuser)
db.session.commit()
session['email'] = newuser.email
return redirect(url_for('login'))
return render_template('signup.html', form=form)
def login():
if g.user is not None and g.user.is_authenticated:
return redirect(url_for('index'))
from app.forms import LoginForm
form = LoginForm()
if form.validate_on_submit():
session['remember_me'] = form.remember_me.data
user = User.query.filter_by(email=form.email.data.lower()).first()
if user and user.check_password(form.password.data):
session['email'] = form.email.data
login_user(user,remember=session['remember_me'])
return redirect(url_for('index'))
else:
return render_template('login.html',form=form,Failed_auth=True)
return render_template('login.html',form=form)
def _lookup_url(self, values):
"""Return Rackspace URL for object."""
try:
# Create failover urls for avatars
if '_avatar' in values['filename']:
failover_url = url_for('static',
filename='img/placeholder.user.png')
else:
failover_url = url_for('static',
filename='img/placeholder.project.png')
cont = self.get_container(values['container'])
if cont.cdn_enabled:
return "%s/%s" % (cont.cdn_ssl_uri, values['filename'])
else:
msg = ("Rackspace Container %s was not public"
% values['container'])
current_app.logger.warning(msg)
cont.make_public()
return "%s/%s" % (cont.cdn_ssl_uri, values['filename'])
except:
current_app.logger.error(traceback.print_exc())
return failover_url
def confirm_email():
"""Send email to confirm user email."""
acc_conf_dis = current_app.config.get('ACCOUNT_CONFIRMATION_disABLED')
if acc_conf_dis:
return abort(404)
if current_user.valid_email is False:
user = user_repo.get(current_user.id)
account = dict(fullname=current_user.fullname, name=current_user.name,
email_addr=current_user.email_addr)
confirm_url = get_email_confirmation_url(account)
subject = ('Verify your email in %s' % current_app.config.get('BRAND'))
msg = dict(subject=subject,
recipients=[current_user.email_addr],
body=render_template('/account/email/validate_email.md',
user=account, confirm_url=confirm_url))
msg['html'] = render_template('/account/email/validate_email.html',
user=account, confirm_url=confirm_url)
mail_queue.enqueue(send_mail, msg)
msg = gettext("An e-mail has been sent to \
validate your e-mail address.")
flash(msg, 'info')
user.confirmation_email_sent = True
user_repo.update(user)
return redirect_content_type(url_for('.profile', name=current_user.name))
def reset_api_key(name):
"""
Reset API-KEY for user.
Returns a Jinja2 template.
"""
if request.method == 'POST':
user = user_repo.get_by_name(name)
if not user:
return abort(404)
ensure_authorized_to('update', user)
user.api_key = model.make_uuid()
user_repo.update(user)
cached_users.delete_user_summary(user.name)
msg = gettext('New API-KEY generated')
flash(msg, 'success')
return redirect_content_type(url_for('account.profile', name=name))
else:
csrf = dict(form=dict(csrf=generate_csrf()))
return jsonify(csrf)
def manage_user_login(user, user_data, next_url):
"""Manage user login."""
if user is None:
user = user_repo.get_by_name(user_data['screen_name'])
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
if ((user.email_addr != user.name) and user.newsletter_prompted is False
and newsletter.is_initialized()):
return redirect(url_for('account.newsletter_subscribe',
next=next_url))
if user.email_addr != user.name:
return redirect(next_url)
else:
flash("Please update your e-mail address in your profile page")
return redirect(url_for('account.update_profile', name=user.name))
def manage_user_login(user, next_url):
"""Manage user login."""
if user is None:
# Give a hint for the user
user = user_repo.get_by(email_addr=user_data.get('email'))
if user is not None:
msg, method = get_user_signup_method(user)
flash(msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
else:
return redirect(url_for('account.signin'))
else:
login_user(user, remember=True)
flash("Welcome back %s" % user.fullname, 'success')
request_email = (user.email_addr == user.name)
if request_email:
flash("Please update your e-mail address in your profile page")
return redirect(url_for('account.update_profile', name=user.name))
if (not request_email and user.newsletter_prompted is False
and newsletter.is_initialized()):
return redirect(url_for('account.newsletter_subscribe', next=next_url))
return redirect(next_url)
def manage_user_login(user, next_url):
"""Manage user login."""
if user is None:
# Give a hint for the user
user = user_repo.get_by(email_addr=user_data['email'])
if user is None:
name = username_from_full_name(user_data['name'])
user = user_repo.get_by_name(name)
msg, 'info')
if method == 'local':
return redirect(url_for('account.forgot_password'))
else:
return redirect(url_for('account.signin'))
else:
login_user(user, 'success')
if user.newsletter_prompted is False and newsletter.is_initialized():
return redirect(url_for('account.newsletter_subscribe',
next=next_url))
return redirect(next_url)
def delete_autoimporter(short_name):
pro = pro_features()
if not pro['autoimporter_enabled']:
raise abort(403)
project = project_by_shortname(short_name)[0]
ensure_authorized_to('read', project)
ensure_authorized_to('update', project)
if project.has_autoimporter():
autoimporter = project.get_autoimporter()
project.delete_autoimporter()
project_repo.save(project)
auditlogger.log_event(project, current_user, 'delete', 'autoimporter',
json.dumps(autoimporter), 'nothing')
return redirect(url_for('.tasks', short_name=project.short_name))
def publish(short_name):
(project, owner, n_tasks, n_task_runs,
overall_progress, last_activity,
n_results) = project_by_shortname(short_name)
#### shruthi
if("sched" in project.info.keys() and project.info["sched"]=="FRG"):
if(project.owner_id==current_user.id and not cached_users.is_quiz_created(current_user.id, project)):
flash("You did not created quiz.Please create the quiz","danger")
return redirect(url_for('quiz.create_quiz', short_name=project.short_name))
#### end
pro = pro_features()
ensure_authorized_to('publish', project)
if request.method == 'GET':
return render_template('projects/publish.html',
project=project,
pro_features=pro)
project.published = True
project_repo.save(project)
task_repo.delete_taskruns_from_project(project)
result_repo.delete_results_from_project(project)
webhook_repo.delete_entries_from_project(project)
auditlogger.log_event(project, 'update', 'published', False, True)
flash(gettext('Project published! Volunteers will Now be able to help you!'))
return redirect(url_for('.details', short_name=project.short_name))
def reset_secret_key(short_name):
"""
Reset Project key.
"""
(project,
n_results) = project_by_shortname(short_name)
title = project_title(project, "Results")
ensure_authorized_to('update', project)
project.secret_key = make_uuid()
project_repo.update(project)
cached_projects.delete_project(short_name)
msg = gettext('New secret key generated')
flash(msg, 'success')
return redirect_content_type(url_for('.update', short_name=short_name))
def add_admin(user_id=None):
"""Add admin flag for user_id."""
try:
if user_id:
user = user_repo.get(user_id)
if user:
ensure_authorized_to('update', user)
user.admin = True
user_repo.update(user)
return redirect_content_type(url_for(".users"))
else:
msg = "User not found"
return format_error(msg, 404)
except Exception as e: # pragma: no cover
current_app.logger.error(e)
return abort(500)
def create_post():
post_data = {
'title': request.form.get('title'),
'content': request.form.get('content'),
}
post = Post()
post.set(post_data)
post = markdown(post)
upload_image = request.files.get('featured_image')
if upload_image.filename != '' and allowed_file(upload_image.filename):
f = Attachment(upload_image.filename, data=upload_image.stream)
post.set('featured_image', f)
post.save()
tag_names = request.form.get('tags').lower().strip()
tags = [get_tag_by_name(x) for x in split_tag_names(tag_names)]
map_tags_to_post(tags, post)
return redirect(url_for('show_post', post_id=post.id))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
userData = Get_User_Info(access_token)
app.logger.debug(userData)
#resp = render_template('info.html', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
app.logger.debug(_data)
access_token = _data['access_token']
uid = _data['uid']
userData = Get_User_Info(access_token, uid)
app.logger.debug(userData)
#resp = render_template('info.html', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", "")
#app.logger.debug("code:%s" %code)
#app.logger.debug(request.args)
if g.signin:
return "logged_in"
elif code:
_data = Get_Access_Token(code)
access_token = _data['access_token']
openid = Get_OpenID(access_token)['openid']
userData = Get_User_Info(access_token, openid)
app.logger.debug(userData)
#resp = render_template('info.html', expires=None)
return resp
else:
return redirect(url_for("login"))
def index():
code = request.args.get("code", expires=None)
return resp
else:
return redirect(url_for("login"))
def login():
if current_user.is_authenticated:
return redirect(url_for('index'))
form = LoginForm()
if not form.validate_on_submit():
status_code = Unauthorized.code if form.is_submitted() else 200
return render_template('login.html',
title='Login',
form=form,
User=User,
password_length={'min': TRACKER_PASSWORD_LENGTH_MIN,
'max': TRACKER_PASSWORD_LENGTH_MAX}), status_code
user = user_assign_new_token(form.user)
user.is_authenticated = True
login_user(user)
return redirect(url_for('index'))
def admin_login_required(method):
def is_admin(user):
if isinstance(user.is_admin, bool):
return user.is_admin
else:
return user.is_admin()
@functools.wraps(method)
def wrapper(*args, **kwargs):
if not current_user.is_authenticated:
flash("This section is for logged in users only.", 'warning')
return redirect(url_for('redBerry.home'))
if not hasattr(current_user, 'is_admin'):
flash("RedBerry expects your user instance to implement an `is_admin` boolean attribute "
"or an `is_admin()` method.", 'warning')
return redirect(url_for('redBerry.home'))
if not is_admin(current_user):
flash("This section is for admin users only.", 'warning')
return redirect(url_for('redBerry.home'))
return method(*args, **kwargs)
return wrapper
############
# CMS ROUTES
############
def show_category(category_slug):
from redBerry.models import RedCategory
category = RedCategory.query.filter_by(slug=category_slug).first()
if not category:
flash("Category not found!", 'danger')
return redirect(url_for('redBerry.home'))
return render_redBerry('redBerry/category.html', category=category)
def new_record(model_name):
from redBerry.models import RedCategory, RedPost
from redBerry.forms import CategoryForm, PostForm
if model_name == 'category':
form = CategoryForm()
new_record = RedCategory()
elif model_name == 'post':
form = PostForm()
new_record = RedPost()
# Convert category ids into objects for saving in the relationship.
if form.categories.data:
form.categories.data = RedCategory.query.filter(RedCategory.id.in_(form.categories.data)).all()
form.categories.choices = [(c, c.title) for c in RedCategory.sorted()]
else:
form.categories.choices = [(c.id, c.title) for c in RedCategory.sorted()]
if form.validate_on_submit():
form.populate_obj(new_record)
cms.config['db'].session.add(new_record)
cms.config['db'].session.flush()
build_sitemap()
flash("Saved %s %s" % (model_name, new_record.id), 'success')
return redirect(url_for('redBerry.admin', model_name=model_name))
return render_template('redBerry/admin/form.html', model_name=model_name)
def dev_login(user_id):
if ENVIRONMENT == 'dev':
login_user(db.session.query(User).get(user_id))
return redirect(url_for('index'))
def index():
bot_links = [
{
'title': 'LINE',
'url': 'https://line.me/R/ti/p/W1MINAEbHE',
'external': True,
},
{
'title': 'Facebook (web)',
'url': 'https://www.facebook.com/pycontwchatbot/',
{
'title': 'Facebook (app)',
'url': 'fb://page/299082580532144',
]
misc_links = [
{
'title': '???',
'url': url_for('leaderboard'),
'external': False,
]
return render_template(
'index.html',
bot_links=bot_links, misc_links=misc_links,
)
def logout():
logout_user()
return redirect(url_for("index"))
################################################################################
################################################################################
# SET UP CONNECTION WITH DATABASES
################################################################################
def teacher_session():
if '/teacher/' in request.path:
if 'credentials' not in flask.session:
return flask.redirect(flask.url_for('index'))
elif not flask.session['is_teacher']:
return flask.redirect(flask.url_for('register'))
def student_session():
if '/student/' in request.path:
if 'credentials' not in flask.session:
return flask.redirect(flask.url_for('index'))
elif not flask.session['is_student']:
return flask.redirect(flask.url_for('register'))
# make sure user is authenticated w/ live session on every request
def manage_session():
# want to go through oauth flow for this route specifically
# not get stuck in redirect loop
if request.path == '/oauth/callback':
return
# allow all users to visit the index page without a session
if request.path == '/' or request.path == '/oauth/logout':
return
# validate that user has valid session
# add the google user info into session
if 'credentials' not in flask.session:
flask.session['redirect'] = request.path
return flask.redirect(flask.url_for('oauth2callback'))
def switch_type():
im = index_model.Index(flask.session['id'])
if request.form['type'] == 'teacher':
if im.is_teacher():
return flask.redirect(flask.url_for('main_teacher'))
else:
return flask.redirect(flask.url_for('register'))
elif request.form['type'] == 'student':
if im.is_student():
return flask.redirect(flask.url_for('main_student'))
else:
return flask.redirect(flask.url_for('register'))
def login():
im = index_model.Index(flask.session['id'])
if im.is_student():
print flask.url_for('main_student')
return flask.redirect(flask.url_for('main_student'))
elif im.is_teacher():
return flask.redirect(flask.url_for('main_teacher'))
else:
return render_template('login.html', not_registered=True)
def remove_class():
tm = teachers_model.Teachers(flask.session['id'])
# show potential courses to remove on get request
if request.method == 'GET':
courses = tm.get_courses()
context = dict(data=courses)
return render_template('remove_class.html', **context)
# remove course by cid
elif request.method == 'POST':
cid = request.form['cid']
tm.remove_course(cid)
return flask.redirect(flask.url_for('main_teacher'))
def oauth2callback():
flow = oauth2client.client.flow_from_clientsecrets(
'client_secrets_oauth.json',
scope=[
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile'],
redirect_uri=flask.url_for('oauth2callback', _external=True))
if 'code' not in flask.request.args:
auth_uri = flow.step1_get_authorize_url()
return flask.redirect(auth_uri)
else:
auth_code = flask.request.args.get('code')
credentials = flow.step2_exchange(auth_code)
flask.session['credentials'] = credentials.to_json()
# use token to get user profile from google oauth api
http_auth = credentials.authorize(httplib2.Http())
userinfo_client = apiclient.discovery.build('oauth2', 'v2', http_auth)
user = userinfo_client.userinfo().v2().me().get().execute()
# Todo only allow columbia.edu emails
# if 'columbia.edu' not in user['email']:
# return flask.redirect(flask.url_for('bademail'))
um = users_model.Users()
flask.session['google_user'] = user
flask.session['id'] = um.get_or_create_user(user)
# Now add is_student and is_teacher to flask.session
im = index_model.Index(flask.session['id'])
flask.session['is_student'] = True if im.is_student() else False
flask.session['is_teacher'] = True if im.is_teacher() else False
redirect = flask.session['redirect']
flask.session.pop('redirect', None)
return flask.redirect(redirect)