問題描述
在處理 FastAPI 請求時,我需要對列表的每個元素執(zhí)行 CPU 密集型任務(wù).我想在多個 CPU 內(nèi)核上進行此處理.
While serving a FastAPI request, I have a CPU-bound task to do on every element of a list. I'd like to do this processing on multiple CPU cores.
在 FastAPI 中執(zhí)行此操作的正確方法是什么?我可以使用標準的 multiprocessing
模塊嗎?到目前為止,我發(fā)現(xiàn)的所有教程/問題都只涉及 I/O 綁定任務(wù),例如 Web 請求.
What's the proper way to do this within FastAPI? Can I use the standard multiprocessing
module? All the tutorials/questions I found so far only cover I/O-bound tasks like web requests.
推薦答案
async def
端點
您可以使用 loop.run_in_executor使用 ProcessPoolExecutor 在單獨的進程中啟動函數(shù).
async def
endpoint
You could use loop.run_in_executor with ProcessPoolExecutor to start function at a separate process.
@app.post("/async-endpoint")
async def test_endpoint():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_func) # wait result
def
端點
由于 def
端點是 運行隱式 在單獨的線程中,您可以使用模塊的全部功能 multiprocessing 和 concurrent.futures.請注意,在 def
函數(shù)內(nèi)部,可能不使用 await
.樣品:
def
endpoint
Since def
endpoints are run implicitly in a separate thread, you can use the full power of modules multiprocessing and concurrent.futures. Note that inside def
function, await
may not be used. Samples:
@app.post("/def-endpoint")
def test_endpoint():
...
with multiprocessing.Pool(3) as p:
result = p.map(f, [1, 2, 3])
@app.post("/def-endpoint/")
def test_endpoint():
...
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(f, [1, 2, 3])
注意:應該記住,在端點中創(chuàng)建進程池以及創(chuàng)建大量線程會導致響應速度變慢,因為請求增加.
在單獨的進程中執(zhí)行函數(shù)并立即等待結(jié)果的最簡單和最原生的方法是使用 loop.run_in_executor 與 ProcessPoolExecutor.
The easiest and most native way to execute a function in a separate process and immediately wait for the results is to use the loop.run_in_executor with ProcessPoolExecutor.
可以在應用程序啟動時創(chuàng)建池,如下例所示,并且不要忘記在應用程序退出時關(guān)閉.可以使用 設(shè)置池中使用的進程數(shù)max_workers ProcessPoolExecutor
構(gòu)造函數(shù)參數(shù).如果 max_workers
為 None
或未給出,則默認為機器上的處理器數(shù).
A pool, as in the example below, can be created when the application starts and do not forget to shutdown on application exit. The number of processes used in the pool can be set using the max_workers ProcessPoolExecutor
constructor parameter. If max_workers
is None
or not given, it will default to the number of processors on the machine.
這種方法的缺點是請求處理程序(路徑操作)在單獨的進程中等待計算完成,而客戶端連接保持打開狀態(tài).而如果由于某種原因失去了連接,那么結(jié)果將無處可返回.
The disadvantage of this approach is that the request handler (path operation) waits for the computation to complete in a separate process, while the client connection remains open. And if for some reason the connection is lost, then the results will have nowhere to return.
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI
from calc import cpu_bound_func
app = FastAPI()
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
@app.get("/{param}")
async def handler(param: int):
res = await run_in_process(cpu_bound_func, param)
return {"result": res}
@app.on_event("startup")
async def on_startup():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
移至背景
通常,CPU 密集型任務(wù)在后臺執(zhí)行.FastAPI 提供了運行后臺任務(wù)的能力,以便在之后運行> 返回一個響應,您可以在其中啟動并異步等待 CPU 綁定任務(wù)的結(jié)果.
Move to background
Usually, CPU bound tasks are executed in the background. FastAPI offers the ability to run background tasks to be run after returning a response, inside which you can start and asynchronously wait for the result of your CPU bound task.
在這種情況下,例如,您可以立即返回Accepted"
的響應(HTTP代碼202)和唯一的任務(wù)ID
,繼續(xù)計算后臺,客戶端可以稍后使用此ID
請求任務(wù)的狀態(tài).
In this case, for example, you can immediately return a response of "Accepted"
(HTTP code 202) and a unique task ID
, continue calculations in the background, and the client can later request the status of the task using this ID
.
BackgroundTasks
提供了一些功能,特別是,您可以運行其中的幾個(包括在依賴項中).在它們中,您可以使用在依賴項中獲得的資源,這些資源只有在所有任務(wù)完成后才會被清理,而在出現(xiàn)異常時,可以正確處理它們.這可以在這個 中更清楚地看到圖.
BackgroundTasks
provide some features, in particular, you can run several of them (including in dependencies). And in them you can use the resources obtained in the dependencies, which will be cleaned only when all tasks are completed, while in case of exceptions it will be possible to handle them correctly. This can be seen more clearly in this diagram.
以下是執(zhí)行最小任務(wù)跟蹤的示例.假定應用程序的一個實例正在運行.
Below is an example that performs minimal task tracking. One instance of the application running is assumed.
import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
from calc import cpu_bound_func
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {}
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args) # wait and return result
async def start_cpu_bound_task(uid: UUID, param: int) -> None:
jobs[uid].result = await run_in_process(cpu_bound_func, param)
jobs[uid].status = "complete"
@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
return new_task
@app.get("/status/{uid}")
async def status_handler(uid: UUID):
return jobs[uid]
@app.on_event("startup")
async def startup_event():
app.state.executor = ProcessPoolExecutor()
@app.on_event("shutdown")
async def on_shutdown():
app.state.executor.shutdown()
更強大的解決方案
上面所有的例子都很簡單,但是如果你需要一些更強大的系統(tǒng)來進行繁重的分布式計算,那么你可以把消息代理放在一邊RabbitMQ
,Kafka
,NATS
等.以及使用它們的庫,如 Celery.
More powerful solutions
All of the above examples were pretty simple, but if you need some more powerful system for heavy distributed computing, then you can look aside message brokers RabbitMQ
, Kafka
, NATS
and etc. And libraries using them like Celery.
這篇關(guān)于如何在 FastAPI 中進行多處理的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!