你的 async 真的是 async 嗎?FastAPI 平行處理踩坑紀錄
#開發筆記
前言
這是一個 RTSP 攝影機串流模擬工具。主要功能是當收到 API 的請求後,用 ffmpeg 將原本錄製好的 mp4 檔案透過 RTSP 推出去,模擬多台攝影機同時輸入的情境。
但當我同時送出兩個 API request,我發現第二個請求一定會等到第一個請求跑完才開始處理。可是程式碼裡面我明明宣告了 async def,為什麼沒有同時平行處理呢?
ThreadPoolExecutor 是什麼
Python 的程式預設是單執行緒,也就是說同一個時間只會有一條執行路徑在執行。ThreadPoolExecutor 是標準函式庫 concurrent.futures 所提供的工具,讓我們可以同時間執行多個執行緒來處理工作。
基本概念
from concurrent.futures import ThreadPoolExecutor
def do_work(n):
return n * 2
with ThreadPoolExecutor(max_workers=3) as executor:
future = executor.submit(do_work, 10)
result = future.result() # 等待結果,回傳 20
執行 executor.submit 時,會將任務放進 thread pool 並且立即回傳一個 Future 物件,我們可以透過這個物件取得執行的結果,如果工作還沒跑完,程式就會停在這裡。
為什麼使用 ThreadPoolExecutor
這裡先理解當時為什麼使用 ThreadPoolExecutor,而又為什麼在 async 裡會出問題。
在 Python 程式裡面有一個限制 GIL (Global Interpreter Lock),GIL 保證同一個時間內只會有一條執行緒在執行 Python bytecode,因此 ThreadPoolExecutor 對於 CPU 大量運算的任務,基本上沒有加速的效果。但對於 IO 密集的工作(例如:網路傳輸、檔案讀寫等),當執行緒在等待 I/O 的時候會釋放 GIL,讓其他執行緒有機會被執行到,因此當我們同時要求多個網路請求,或者用 ThreadPoolExecutor 同時執行多個 subprocess 時,就可以提升執行的效率。
Event Loop 是什麼
Python 的 asyncio 程式只有一條執行緒,這意味著同一個時間只能做一件事情,但 asyncio 使用 event loop 讓執行緒能夠看起來同時間處理很多事情。
Event Loop 的工作方式
我們可以把 event loop 當作一個銀行行員,坐在辦公室裡處理面前客戶的要求:
- 面對第一個客戶請求
- 執行客戶的需求,直到客戶說「需要等待其他結果,可以先去做別的」
- 行員會請客戶到等待區等候
- 接著先處理下一個客戶的請求
await 就是表示目前需要等待其他結果,讓行員先去處理其他客戶的要求。
async def handle_request():
result = await call_external_api() # 這裡讓出控制權
return result
當執行到 await 的時候,這個 coroutine 把控制權交還給 event loop,event loop 就去處理其他排隊的工作。等 call_external_api() 有結果了,event loop 再回來繼續跑 handle_request()。
回到前言提到的問題
async def start_stream_parallel(request: StreamRequest):
...
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_cam_folder, ...)]
for future in futures:
future.result() # blocking
future.result() 沒有 await,它是同步等待。Event loop 執行到這一行就被停在這裡,直到 process_cam_folder 跑完才能動。這段時間進來的所有 request 全部排隊,因為 event loop 根本沒有機會去處理它們。
修正後換成:await asyncio.gather(*tasks)
具體怎麼改,我們在後面「修改程式碼」的段落會完整說明,這裡先理解方向:把 ThreadPoolExecutor + future.result() 的同步等待,換成 asyncio.gather 的非同步等待。
asyncio.gather 是真正的 async 等待。每個 task 碰到自己內部的 await 就讓出控制權,event loop 可以在多個 task 之間來回調度,這才是並行真正發生的地方。
先撰寫測試,再修改程式
在修改程式碼前,我們必須先寫一個測試來確認它能抓到這個問題。如果先改程式碼再補測試,我們就無法確定這個測試案例是否真的有能力偵測到問題。
這個測試案例的邏輯很簡單:我們同時發送多個 request,並且計算總耗費時間,如果是依序執行,則花費時間應該會是兩倍的延遲,但如果是並行執行的話,花費時間應該是一倍的延遲。
使用 monkeypatch 隔離外部依賴
由於我們只是要驗證程式是否能非同步處理請求,因此我們並不需要真的執行 ffmpeg,也不需要使用 RTSP server 接收推播串流,我們可以使用 pytest 提供的 monkeypatch fixture 將 run_ffmpeg 換成一個假的實作。
monkeypatch.setattr(module, name, value) 的用途是:在測試執行期間,把模組裡的某個屬性(函式、變數、類別)替換成你指定的值,測試結束後自動還原,不會影響其他測試。
# 把 streams_router 模組裡的 run_ffmpeg 換掉
monkeypatch.setattr(sr, "run_ffmpeg", slow_ffmpeg)
# 也可以替換模組層級的變數
monkeypatch.setattr(sr, "VIDEO_BASE_DIR", tmp_path)
這跟直接寫 sr.run_ffmpeg = slow_ffmpeg 差在哪?monkeypatch 會在 fixture scope 結束後自動復原,不用手動 teardown。async fixture 裡這點特別重要,因為手動復原在例外發生時很容易被跳過。
在這個測試裡,我們替換了兩樣東西:VIDEO_BASE_DIR(指向 tmp_path 暫存目錄,不碰真實檔案系統)和 run_ffmpeg(用一個帶 asyncio.sleep 的假函式取代真正的 ffmpeg 呼叫)。這樣一來,我們的測試就只會驗證「平行處理的行為」本身,而不會受到 ffmpeg 有沒有安裝、或者影片檔案存不存在等外部因素的影響。
測試的 fixture
先建一個 slow_client fixture。重點是 mock 的 run_ffmpeg 用 asyncio.sleep(0.5) 模擬耗時的工作:
@pytest_asyncio.fixture
async def slow_client(monkeypatch, tmp_path):
# ... 建立暫存目錄和假的 mp4 檔案 ...
async def slow_ffmpeg(*args, **kwargs):
await asyncio.sleep(0.5) # 模擬耗時
return "dummy ffmpeg command"
monkeypatch.setattr(sr, "run_ffmpeg", slow_ffmpeg)
app = FastAPI()
app.include_router(sr.router, prefix="/api/v1/streams")
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
這裡用 asyncio.sleep 而不是 time.sleep,因為 asyncio.sleep 會在 await 時讓出控制權給 event loop,這才是正確的非同步行為。只要程式碼裡面正確用了 async,兩個 request 就能在 sleep 期間交替執行。
測試本體
@pytest.mark.asyncio
async def test_concurrent_requests_not_blocked(slow_client):
payload = {"cam_ids": ["cam01", "cam02"]} # 模擬請求的資料
start = time.monotonic()
resp1, resp2 = await asyncio.gather(
slow_client.post("/api/v1/streams", json=payload),
slow_client.post("/api/v1/streams", json=payload),
)
elapsed = time.monotonic() - start
assert resp1.status_code == 200
assert resp2.status_code == 200
assert elapsed < 0.8, f"took {elapsed:.2f}s, expected < 0.8s"
測試案例設計重點
asyncio.gather同時發兩個 request,只要 event loop 沒有卡住,那麼兩個 request 會交錯執行,而花費時間應該會小於兩倍的延遲時間。time.monotonic()用來計時花費的時間,不使用time.time()的原因是 monotonic clock 不受系統校時影響,保證只會往前走,適合拿來測量花費時間。- 門檻值 0.8 秒是假設每個 request 的 mock 延遲是 0.5 秒。同時執行的話,總共會花費約 0.5 秒;但如果依序執行的話,則會約 1.0 秒。我們取 0.8 秒當作中間值,是為了要留一些時間給系統開銷,讓測試案例不至於在較慢的機器上面被誤判。
修正前跑一次:確認測試能抓到問題
還沒改 production code 之前先跑一次,確認測試確實會失敗:
FAILED test_concurrent_requests_not_blocked
AssertionError: took 1.02s, expected < 0.8s
1.02 秒 — 幾乎剛好是 0.5 × 2,確認兩個 request 被依序執行了。測試有抓到問題。
修改程式碼
我們的目標是將所有 blocking 呼叫換成真正的 async 版本。
- 第一步,
run_ffmpeg從subprocess.run改成asyncio.create_subprocess_exec:
async def run_ffmpeg(input_file, output_url, stream_loop=None):
cmd = [FFMPEG_PATH, ...]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"ffmpeg failed: {stderr.decode()}")
await asyncio.create_subprocess_exec() 啟動 ffmpeg 子行程後就讓出控制權。ffmpeg 在背景跑,event loop 可以去處理其他 request。等 ffmpeg 跑完,await process.communicate() 收到通知,再繼續往下走。
- 第二步,
start_stream_parallel從ThreadPoolExecutor改成asyncio.gather:
async def start_stream_parallel(request: StreamRequest):
...
tasks = [
process_cam_folder(cam, rtsp_url, ...)
for cam in cam_folders
]
results = await asyncio.gather(*tasks)
asyncio.gather 把多個 coroutine 一起丟進 event loop。每個 process_cam_folder 碰到 await 就會讓出控制權,讓其他 task 有機會執行。
修正後跑測試
4 passed in 0.67s
全部通過。並行測試在 0.67 秒內跑完(含 fixture 建立),遠低於 0.8 秒的門檻。
踩到的坑
mock 要跟著改成 async,原本的 mock 是同步的 lambda,改完之後要一起換成 async:
修正前:同步 mock
monkeypatch.setattr(sr, "run_ffmpeg", lambda *a, **k: "dummy")
修正後:async mock
async def _dummy_run_ffmpeg(*args, **kwargs):
return "dummy ffmpeg command"
monkeypatch.setattr(sr, "run_ffmpeg", _dummy_run_ffmpeg)
- 當程式碼改成
await run_ffmpeg(...)之後,如果 mock 還是回傳普通字串而不是 coroutine,await就會炸掉。mock 的回傳型別一定要跟原本的函式一致。 - slow mock 要用
asyncio.sleep,不是time.sleep
並行測試的 mock 用了 await asyncio.sleep(0.5) 來模擬耗時。如果改用 time.sleep(0.5),就算 production code 已經改好,測試還是會失敗——因為 time.sleep 是同步的 blocking 呼叫,直接把 event loop 卡死。 這不是 production code 的問題,而是測試本身的問題。寫 async 測試時,mock 的行為也要是 async 的,不然測試結果不可信。
結論
使用 async 的時候,要小心處理其他地方的 blocking 呼叫。只要有任何地方(包含程式碼和測試框架本身)誤用了 blocking 的函式,就會使得平行處理失效。在修改程式碼的時候,可以先新增測試案例再修改程式碼,這樣除了驗證功能是否正確,也能夠驗證程式的行為是否正常。
補充
asyncio.gather 的錯誤處理
預設情況下,asyncio.gather 只要其中一個 task 拋出例外,整個 gather 就會立即拋出那個例外,但其他還在跑的 task 不會被取消,它們會繼續在背景執行完畢,只是你拿不到它們的結果。
async def start_stream_parallel(request: StreamRequest):
tasks = [
process_cam_folder(cam, rtsp_url, ...)
for cam in cam_folders
]
results = await asyncio.gather(*tasks) # 任一 task 失敗就直接炸
如果我們想要知道每一個 task 的結果(不管成功或失敗),可以加上 return_exceptions=True:
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i} failed: {result}")
else:
logger.info(f"Task {i} succeeded: {result}")
加了 return_exceptions=True 之後,gather 不會因為某個 task 失敗就中斷,而是把例外當作回傳值放進 results 裡面。這在我們的情境很實用——假設同時推 10 台攝影機的串流,其中 1 台的 mp4 檔案損毀導致 ffmpeg 失敗,我們不希望因為這 1 台就讓其他 9 台也拿不到結果。
需要注意的是,return_exceptions=True 只是改變了例外的傳遞方式,失敗的 task 本身還是會拋出例外,只是被 gather 攔截並包裝成回傳值。如果 task 內部有 try/except 做了自己的錯誤處理,那這些邏輯還是會正常執行。
process.communicate() 的記憶體問題
process.communicate() 會把子行程的 stdout 和 stderr 全部讀進記憶體,等子行程結束後一次回傳。對於短時間執行的指令這沒什麼問題,但如果 ffmpeg 要跑很久(例如推播一段 2 小時的影片),stderr 的 log 可能會累積到很大。
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate() # 全部讀進記憶體
如果你只需要知道 ffmpeg 有沒有成功,不需要完整的 log,可以考慮把 stderr 導向 DEVNULL,避免記憶體持續增長:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL
)
await process.wait()
if process.returncode != 0:
raise RuntimeError(f"ffmpeg exited with code {process.returncode}")
或者,如果需要保留 log 但又不想佔記憶體,可以逐行讀取 stderr 並記錄到 log:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE
)
async for line in process.stderr:
logger.debug(line.decode().strip())
await process.wait()
這裡用 async for 逐行讀取,每一行處理完就可以釋放,不會讓整個 stderr 內容堆在記憶體裡。而且 async for 在等待下一行的時候會讓出控制權,不會阻塞 event loop。
時間門檻的脆弱性
在測試案例裡我們用了 assert elapsed < 0.8 來判斷是否為並行執行,這種基於時間的斷言本質上是脆弱的。在 CI 環境中,如果機器負載較高、CPU 被其他 job 搶走,即使程式碼是正確的並行,也可能因為系統延遲而超過 0.8 秒,造成測試不穩定(flaky test)。
幾個減輕脆弱性的做法:
第一,拉大時間差距。把 mock 延遲設長一點,讓並行和依序執行的差距更明顯:
async def slow_ffmpeg(*args, **kwargs):
await asyncio.sleep(2.0) # 延遲從 0.5 改成 2 秒
return "dummy"
# 並行 ≈ 2 秒,依序 ≈ 4 秒,門檻設 3 秒
assert elapsed < 3.0, f"took {elapsed:.2f}s, expected < 3.0s"
延遲越長,系統開銷佔的比例就越小,測試就越不容易被機器效能影響。但缺點是測試跑得更慢。
第二,用比例而非絕對值判斷。如果你知道依序執行的理論時間,可以用比例來判斷:
sequential_time = 0.5 * 2 # 兩個 task 依序執行的理論時間
assert elapsed < sequential_time * 0.8, (
f"took {elapsed:.2f}s, expected < {sequential_time * 0.8:.2f}s (80% of sequential)"
)
第三,加上重試機制。如果 CI 環境真的不穩定,可以用 pytest-rerunfailures 讓時間相關的測試失敗後自動重跑:
pytest --reruns 2 --reruns-delay 1 -k test_concurrent
這不是最漂亮的解法,但在實務上可以有效降低 flaky test 對開發流程的干擾。最重要的是在測試案例裡面留下註解,說明為什麼選擇這個門檻值、以及在什麼條件下可能需要調整,讓後面維護的人不用猜。