Python flask 模块,send_file() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.send_file()。
def get_zip(self, project, ty):
"""Get a ZIP file directly from uploaded directory
or generate one on the fly and upload it if not existing."""
filename = self.download_name(project, ty)
if not self.zip_existing(project, ty):
print "Warning: Generating %s on the fly Now!" % filename
self._make_zip(project, ty)
if isinstance(uploader, local.LocalUploader):
filepath = self._download_path(project)
res = send_file(filename_or_fp=safe_join(filepath, filename),
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=filename)
# fail safe mode for more encoded filenames.
# It seems Flask and Werkzeug do not support RFC 5987 http://greenbytes.de/tech/tc2231/#encoding-2231-char
# res.headers['Content-disposition'] = 'attachment; filename*=%s' % filename
return res
else:
return redirect(url_for('rackspace', filename=filename,
container=self._container(project),
_external=True))
def collage():
if request.method=='GET':
return render_template('collage.html',input=True)
elif request.method == 'POST':
username = request.form['lastfm_username']
duration = request.form['duration']
print(username)
payload = {
'user': username,
'api_key': config.LASTFM_API_KEY,
'method': 'user.gettopalbums',
'period':duration,
'limit':9,
'format':'json'
}
r = requests.get(config.url, params = payload)
filenames = get_album_art(r.text, username)
generate_collage(filenames,username)
return send_file('static/' + username+'.jpg', mimetype='image/jpg')
def get_image_file(uuid, image_file_path):
"""GetimageFile (GET /images/:uuid/file)
Return the image file.
"""
encoded_md5 = get_image_file_md5(uuid, image_file_path)
if Nginx_ENABLED:
res = make_response('')
res.headers['Content-Type'] = 'application/octet-stream'
res.headers['x-accel-redirect'] = '/static/images/%s/file?md5=%s' % (uuid, encoded_md5)
else:
res = make_response(send_file(image_file_path, add_etags=False, cache_timeout=0))
res.headers['Content-Length'] = get_image_file_size(image_file_path)
res.headers['Content-MD5'] = encoded_md5
return res
def get_job_output(job_id, filename, as_attach=False, mimetype=None, tail=None, head=None, job=None):
try:
output_file = job.relative_path(filename)
if tail or head:
if tail and head:
return "Cannot specify tail AND head", 500
cmd = "head" if head else "tail"
count = tail or head
p = subprocess.Popen([cmd, "-n", count, output_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
tail_data, tail_error = p.communicate()
resp = make_response(tail_data)
if as_attach:
resp.headers["Content-disposition"] = "attachment; filename={}".format(filename)
if mimetype:
resp.headers["Content-Type"] = mimetype
return resp
else:
return send_file(output_file, as_attachment=as_attach, mimetype=mimetype)
except Exception as e:
print e
return "File Not Found", 404
def send_file(filename, attachment_filename, mimetype, **kwargs):
response = flask_send_file(filename, mimetype=mimetype)
try:
attachment_filename = attachment_filename.encode('latin-1')
except UnicodeEncodeError:
filenames = {
'filename': unicodedata
.normalize('NFKD', attachment_filename)
.encode('latin-1', 'ignore'),
'filename*': "UTF-8''{}".format(
url_quote(attachment_filename)),
}
else:
filenames = {'filename': attachment_filename}
response.headers.set(
'Content-disposition', 'attachment', **filenames)
return response
def test_send_file_xsendfile(self):
app = flask.Flask(__name__)
app.use_x_sendfile = True
with app.test_request_context():
rv = flask.send_file('static/index.html')
self.assert_true(rv.direct_passthrough)
self.assert_in('x-sendfile', rv.headers)
self.assert_equal(rv.headers['x-sendfile'],
os.path.join(app.root_path, 'static/index.html'))
self.assert_equal(rv.mimetype, 'text/html')
rv.close()
def serve_static_files(p, index_on_error=True):
"""Securely serve static files for the given path using send_file."""
# Determine the canonical path of the file
full_path = os.path.realpath(os.path.join(app.static_folder, p))
# We have a problem if either:
# - the path is not a sub-path of app.static_folder; or
# - the path does not refer to a real file.
if (os.path.commonprefix([app.static_folder, full_path]) != app.static_folder or
not os.path.isfile(full_path)):
file_to_return = app.config.get('STATIC_FILE_ON_404', None)
if file_to_return is not None:
full_path = os.path.realpath(os.path.join(app.static_folder, file_to_return))
else:
return abort(404)
return send_file(full_path)
def shrug():
import os, os.path
from tools.slack_posting import postimage
filename = os.path.join(os.getcwd(), tools.settings.getValue('misc', 'shinoa_shrug'))
if request.method == 'GET':
return send_file(filename, mimetype='image/{}'.format(filename.split('.')[-1]))
elif request.method == 'POST':
if (not (isValidCommand(request, 'shrug'))):
return "You dun goof'd."
postimage(filename, request.form['channel_id'], None, "shrug", "*shrug*")
return ""
def pages(page_path):
if not validate_custom_page_path(page_path):
raise ApiException(error=Error.NOT_ALLOWED, message='The visit of path "{}" is not allowed.'.format(page_path))
rel_url, exists = storage.fix_relative_url('page', page_path)
if exists:
file_path = rel_url
return send_file(file_path)
elif rel_url is None: # pragma: no cover,it seems impossible to make this happen,see code of 'fix_relative_url'
raise ApiException(error=Error.BAD_PATH, message='The path "{}" cannot be recognized.'.format(page_path))
else:
page_d = cache.get('api-handler.' + rel_url)
if page_d is not None:
return page_d # pragma: no cover,here just get the cached dict
page = storage.get_page(rel_url, include_draft=False)
if page is None:
raise ApiException(error=Error.RESOURCE_NOT_EXISTS)
page_d = page.to_dict()
del page_d['raw_content']
page_d['content'] = get_parser(page.format).parse_whole(page.raw_content)
cache.set('api-handler.' + rel_url, page_d, timeout=2 * 60)
return page_d
def report_get(task_id, report_format="json"):
task = Task.query.get(task_id)
if not task:
return json_error(404, "Task not found")
if task.status == Task.DELETED:
return json_error(404, "Task report has been deleted")
if task.status != Task.FINISHED:
return json_error(420, "Task not finished yet")
report_path = os.path.join(settings.reports_directory,
"%d" % task_id, "report.%s" % report_format)
if not os.path.isfile(report_path):
return json_error(404, "Report format not found")
return send_file(report_path)
def pcap_get(task_id):
task = Task.query.get(task_id)
if not task:
return json_error(404, "Task files has been deleted")
if task.status != Task.FINISHED:
return json_error(420, "Task not finished yet")
pcap_path = os.path.join(settings.reports_directory,
"%s" % task_id, "dump.pcap")
if not os.path.isfile(pcap_path):
return json_error(404, "Pcap file not found")
return send_file(pcap_path)
def shakemap_overlay(shakemap_id):
session = Session()
shakemap = (session.query(ShakeMap)
.filter(ShakeMap.shakemap_id == shakemap_id)
.order_by(desc(ShakeMap.shakemap_version))
.limit(1)).first()
if shakemap is not None:
img = os.path.join(app.config['EARTHquakeS'],
shakemap_id,
shakemap_id + '-' + str(shakemap.shakemap_version),
'ii_overlay.png')
else:
img = app.send_static_file('sc_logo.png')
Session.remove()
return send_file(img, mimetype='image/gif')
def index():
path = request.args.get('path')
frame = request.args.get('frame')
id = request.args.get('id')
basename = os.path.split(path)[1]
video_path = '{}/{}'.format(FILE_DIR, basename)
if not os.path.isfile(video_path):
blob = bucket.get_blob(path)
with open(video_path, 'wb') as f:
blob.download_to_file(f)
sp.check_call(shlex.split("ffmpeg -y -i {} -vf \"select=eq(n\\,{})\" -frames:v 1 {}/%05d.jpg".format(video_path, frame, FILE_DIR)))
blob = bucket.blob('public/thumbnails/tvnews/frame_{}.jpg'.format(id))
img_path = '{}/00001.jpg'.format(FILE_DIR)
blob.upload_from_filename(img_path)
return send_file(img_path)
def media_file(filename):
"""
Retrieves a media file.
"""
outfile = media_storage.get_file(filename)
return send_file(outfile,mimetype='image/png')
def showHtml(filename):
return send_file(os.path.join(app.config['REPORT'], filename))
# ????????
def download_homework(filename):
return send_file(os.path.join(app.config['HOMEWORK'], filename))
def fonts(route):
return send_file('ui/cached_dist/fonts/{}'.format(route))
def route(route):
return send_file('ui/cached_dist/index.html')
def test_send_file_regular(self):
app = flask.Flask(__name__)
with app.test_request_context():
rv = flask.send_file('static/index.html')
self.assert_true(rv.direct_passthrough)
self.assert_equal(rv.mimetype, 'text/html')
with app.open_resource('static/index.html') as f:
rv.direct_passthrough = False
self.assert_equal(rv.data, f.read())
rv.close()
def test_attachment(self):
app = flask.Flask(__name__)
with catch_warnings() as captured:
with app.test_request_context():
f = open(os.path.join(app.root_path, 'static/index.html'))
rv = flask.send_file(f, as_attachment=True)
value, options = parse_options_header(rv.headers['Content-disposition'])
self.assert_equal(value, 'attachment')
rv.close()
# mimetypes + etag
self.assert_equal(len(captured), 2)
with app.test_request_context():
self.assert_equal(options['filename'], 'index.html')
rv = flask.send_file('static/index.html', as_attachment=True)
value, options = parse_options_header(rv.headers['Content-disposition'])
self.assert_equal(value, 'attachment')
self.assert_equal(options['filename'], 'index.html')
rv.close()
with app.test_request_context():
rv = flask.send_file(StringIO('Test'), as_attachment=True,
attachment_filename='index.txt',
add_etags=False)
self.assert_equal(rv.mimetype, 'text/plain')
value, 'index.txt')
rv.close()
def test_static_file(self):
app = flask.Flask(__name__)
# default cache timeout is 12 hours
with app.test_request_context():
# Test with static file handler.
rv = app.send_static_file('index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 12 * 60 * 60)
rv.close()
# Test again with direct use of send_file utility.
rv = flask.send_file('static/index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 12 * 60 * 60)
rv.close()
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 3600
with app.test_request_context():
# Test with static file handler.
rv = app.send_static_file('index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 3600)
rv.close()
# Test again with direct use of send_file utility.
rv = flask.send_file('static/index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 3600)
rv.close()
class StaticFileApp(flask.Flask):
def get_send_file_max_age(self, filename):
return 10
app = StaticFileApp(__name__)
with app.test_request_context():
# Test with static file handler.
rv = app.send_static_file('index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 10)
rv.close()
# Test again with direct use of send_file utility.
rv = flask.send_file('static/index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 10)
rv.close()
def document():
url = request.args.get('url')
if not url or not os.path.exists(url):
return abort(400, "File %s not found " % url)
return send_file(url)
def api_request_basic():
"""Establishes the endpoint that is used for the basic fetch mode
POST body should be json formated.
required field(s):
- url : actual url that is requested to be fetched by the minion
Optional field(s):
- user-agent : user agent to be used during the request of the url
if this argument is not provided,then the default
user agent in the config file will be used.
"""
try:
data = request.get_json()
if data is None:
raise BadRequest
file_path = run_job(data, "basic")
return send_file(file_path)
except BadRequest as e:
return prepare_400("api_request_basic", str(e))
except ValueError as e:
return prepare_400("api_request_basic", str(e))
except Exception as e:
print type(e)
return prepare_500("api_request_basic", str(e))
def api_request_website():
"""Establishes the endpoint that is used for the website fetch mode
POST body should be json formated.
required field(s):
- url : actual url that is requested to be fetched by the minion
Optional field(s):
- user-agent : user agent to be used during the request of the url
if this argument is not provided,then the default
user agent in the config file will be used.
"""
try:
data = request.get_json()
if data is None:
raise BadRequest
file_path = run_job(data, "website")
return send_file(file_path)
except BadRequest as e:
return prepare_400("api_request_website", str(e))
except ValueError as e:
return prepare_400("api_request_website", str(e))
except Exception as e:
print type(e)
return prepare_500("api_request_website", str(e))
def api_basic_results(job_id):
"""Establishes endpoint for retrieval of historical boomerang results
:param job_id: id of the specific job that you wish to look up
"""
if minion.config['boomerang_STORE_RESULTS']:
try:
file_path = get_job_results(job_id)
return send_file(file_path)
except BadRequest as e:
return prepare_400("api_basic_results", str(e))
except ValueError:
return prepare_404('api_basic_results')
except Exception as e:
return prepare_500("api_basic_results", str(e))
else:
return prepare_400("api_basic_results", error="Config set to not store results")
def GET_v1_compile_job_id_hex(job_id):
"""Download a compiled firmware
"""
job = get_job_Metadata(job_id)
if not job:
return error("Compile job not found", 404)
if job['result']['firmware']:
return send_file(job['result']['firmware'], mimetype='application/octet-stream', attachment_filename=job['result']['firmware_filename'])
return error("Compile job not finished or other error.", 422)
def GET_v1_compile_job_id_src(job_id):
"""Download a completed compile job.
"""
job = get_job_Metadata(job_id)
if not job:
return error("Compile job not found", 404)
if job['result']['firmware']:
source_zip = qmk_storage.get('%(id)s/%(source_archive)s' % job['result'])
return send_file(source_zip, attachment_filename=job['result']['source_archive'])
return error("Compile job not finished or other error.", 422)
def cat_picture():
image_name = request.args.get('image_name')
image_name = image_name.replace('..', '')
return send_file(os.path.join(os.getcwd(), image_name))
def cat_picture():
image_name = request.args.get('image_name')
if not '..' in image_name:
return 404
return send_file(os.path.join(os.getcwd(), image_name))
def cat_picture():
image_name = request.args.get('image_name')
if not image_name:
return 404
return send_file(os.path.join(os.getcwd(), image_name))
def api_top_hits():
return send_file(common_filepaths['top-hits-1k'])
def test_attachment(self):
app = flask.Flask(__name__)
with catch_warnings() as captured:
with app.test_request_context():
f = open(os.path.join(app.root_path, 'index.txt')
rv.close()
def test_static_file(self):
app = flask.Flask(__name__)
# default cache timeout is 12 hours
with app.test_request_context():
# Test with static file handler.
rv = app.send_static_file('index.html')
cc = parse_cache_control_header(rv.headers['Cache-Control'])
self.assert_equal(cc.max_age, 10)
rv.close()
def api_family_export_sampleszip(family_id, tlp_level):
my_family = api.get_elem_by_type("family", family_id)
zpath = api.familycontrol.generate_samples_zip_file(my_family, tlp_level)
if zpath is None:
return ""
return send_file("../" + zpath,
attachment_filename="export.tar.gz")
def download_family_file(family_id, file_id):
"""
Family attachment download endpoint.
"""
attachment = api.get_elem_by_type("family_file", file_id)
data_file = attachment.filepath
if not os.path.exists(data_file):
abort(404)
return send_file('../' + data_file,
as_attachment=True,
attachment_filename=os.path.basename(data_file))
def api_get_sample_file(sid):
"""
Return the sample binary
"""
sample = api.get_elem_by_type("sample", sid)
data_file = sample.storage_file
return send_file('../' + data_file,
attachment_filename=os.path.basename(data_file))
def cache(filename):
if not config.CACHE_IMAGES:
return g.plex.get_thumb_data(filename)
cache_dir = os.path.join(config.data_dir, "cache")
cache_file = os.path.join(cache_dir, filename)
if not os.path.exists(cache_file + ".jpg"):
if helper.cache_file(filename, g.plex):
return send_from_directory(cache_dir, filename + ".jpg")
else:
return send_file('static/images/poster.png')
else:
return send_from_directory(cache_dir, filename + ".jpg")
def image_jpg():
myCamera.getimage().save('/home/pi/boot/image.jpg')
return send_file('/home/pi/boot/image.jpg', attachment_filename='image.jpg')
def home():
return send_file(os.path.realpath(os.path.join(app.static_folder, 'index.html')))
def font():
return send_file(os.path.realpath(os.path.join('SFPixelate-Bold.ttf')))
#return send_from_directory('/','SFPixelate-Bold.ttf')
def get_vxstream_download(sha256, eid, ftype):
Sample.query.filter_by(sha256=sha256).first_or_404()
headers = {
'Accept': 'text/html',
'User-Agent': 'VxStream SandBox API Client'}
params = {'type': ftype, 'environmentId': eid}
vx = vxstream.api.get('result/{}'.format(sha256),
params=params, headers=headers)
if ftype in ['xml', 'html', 'bin', 'pcap']:
ftype += '.gz'
return send_file(BytesIO(vx),
attachment_filename='{}.{}'.format(sha256, ftype),
as_attachment=True)
def get_fireeye_download(sha256, ftype):
raise ApiException({}, 501)
Sample.query.filter_by(sha256=sha256).first_or_404()
headers = {
'Accept': 'text/html',
'User-Agent': 'FireEye SandBox API Client'}
params = {'type': ftype, 'environmentId': eid}
vx = fireeye.api.get('result/{}'.format(sha256),
params=params,
as_attachment=True)
def download_file(file_id):
"""Download file
**Example request**:
.. sourcecode:: http
GET /api/1.0/files/1 HTTP/1.1
Host: do.cert.europa.eu
Accept: application/json
**Example response**:
.. sourcecode:: http
HTTP/1.0 200 OK
Content-Type: application/json
Content-disposition: attachment; filename=CIMBL-244-EU.zip
Content-Length: 55277
Content-Type: application/zip
:param file_id: file's unique ID
:reqheader Accept: Content type(s) accepted by the client
:resheader Content-Type: this depends on `Accept` header or request
:status 200: File found
:status 404: Resource not found
"""
dfile = DeliverableFile.query.filter_by(id=file_id).first_or_404()
cfg = current_app.config
return send_file(os.path.join(cfg['APP_UPLOADS'], dfile.name),
attachment_filename=dfile.name, as_attachment=True)