问题描述
我通过POST接受文件。当我将其保存在本地时,可以使用file.read()读取内容,但是会显示通过file.name不正确的名称(16)。当我尝试使用此名称查找它时,出现错误。可能是什么问题?
我的代码:
@router.post(
path="/po/{id_po}/upload",response_model=schema.ContentUploadedResponse,)
async def upload_file(
id_po: int,background_tasks: BackgroundTasks,uploaded_file: UploadFile = File(...)):
"""pass"""
uploaded_file.file.rollover()
uploaded_file.file.flush()
#shutil.copy(uploaded_file.file.name,f'/home/fyzzy/Desktop/api/{uploaded_file.filename}')
background_tasks.add_task(s3_upload,uploaded_file=fp)
return schema.ContentUploadedResponse()
解决方法
UploadFile
只是SpooledTemporaryFile
的包装,可以作为UploadFile.file
来访问。
SpooledTemporaryFile()[...]函数与TemporaryFile()完全一样操作
Given for TemporaryFile
:
返回一个类似文件的对象,该对象可用作临时存储区域。 [..]它会在关闭后立即销毁(包括垃圾回收对象时的隐式关闭)。在Unix下,文件的目录条目要么根本不创建,要么在创建文件后立即删除。其他平台不支持此功能。 您的代码不应依赖使用此功能创建的临时文件,该文件在文件系统中是否具有可见名称。
您应该使用UploadFile的以下异步methods:write
,read
,seek
和close
。它们在线程池中执行并异步等待。
更新:此外,我想引用topic(内部使用shutil.copyfileobj
和UploadFile.file
使用几个有用的实用工具功能:
import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable
from fastapi import UploadFile
def save_upload_file(upload_file: UploadFile,destination: Path) -> None:
try:
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file,buffer)
finally:
upload_file.file.close()
def save_upload_file_tmp(upload_file: UploadFile) -> Path:
try:
suffix = Path(upload_file.filename).suffix
with NamedTemporaryFile(delete=False,suffix=suffix) as tmp:
shutil.copyfileobj(upload_file.file,tmp)
tmp_path = Path(tmp.name)
finally:
upload_file.file.close()
return tmp_path
def handle_upload_file(
upload_file: UploadFile,handler: Callable[[Path],None]
) -> None:
tmp_path = save_upload_file_tmp(upload_file)
try:
handler(tmp_path) # Do something with the saved temp file
finally:
tmp_path.unlink() # Delete the temp file
,注意:您想在
def
端点而不是async def
端点内使用上述函数,因为它们利用了阻塞API。
您可以通过这种方式保存上传的文件,
from fastapi import FastAPI,File,UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
file_location = f"files/{uploaded_file.filename}"
with open(file_location,"wb+") as file_object:
file_object.write(uploaded_file.file.read())
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
这几乎与shutil.copyfileobj(...)
方法的用法相同。
因此,以上功能可以重写为
import shutil
from fastapi import FastAPI,UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
file_location = f"files/{uploaded_file.filename}"
with open(file_location,"wb+") as file_object:
shutil.copyfileobj(uploaded_file.file,file_object)
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
,
就我而言,我需要处理大文件,所以我必须避免将它们全部读入内存。我想要的是将它们以块的形式异步保存到磁盘。
我正在试验这个,它似乎可以完成这项工作(CHUNK_SIZE 是随意选择的,需要进一步测试才能找到最佳大小):
import os
import logging
from fastapi import FastAPI,BackgroundTasks,UploadFile
log = logging.getLogger(__name__)
app = FastAPI()
DESTINATION = "/"
CHUNK_SIZE = 2 ** 20 # 1MB
async def chunked_copy(src,dst):
await src.seek(0)
with open(dst,"wb") as buffer:
while True:
contents = await src.read(CHUNK_SIZE)
if not contents:
log.info(f"Src completely consumed\n")
break
log.info(f"Consumed {len(contents)} bytes from Src file\n")
buffer.write(contents)
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
fullpath = os.path.join(DESTINATION,file.filename)
await chunked_copy(file,fullpath)
return {"File saved to disk at": fullpath}
但是,我很快意识到在完全接收文件之前不会调用 create_upload_file
。因此,如果此代码片段是正确的,它可能对性能有益,但不会启用任何功能,例如向客户端提供有关上传进度的反馈,并且它将在服务器中执行完整的数据复制。不能只访问原始 UploadFile 临时文件,将其刷新并将其移动到其他地方,从而避免复制,这似乎很愚蠢。