IVYXON
記事一覧に戻る
一般Tips上級🧪 Recipe

FastAPIで非同期処理とバックグラウンドタスク、楽に動かしたいんだけど?

FastAPIとSQLAlchemy(async)で、非同期処理やバックグラウンドタスクを実装する実録を紹介するよ。

2026年6月1日6分で読めます

FastAPIでバックグラウンド処理や複数の外部APIを並行で叩くのって、ちょっと面倒に感じるよね。特にDBも非同期にしたいとなると、書き方迷いがち。

でも、Claude Codeに雑に投げるだけで、I/O待ちを並行化するコードが爆速で手に入るんだ。うちのプロジェクトでもこれでガンガン回してるから、そのやり方を教えてあげる。

一番雑な投げ方

まずはコレ、コピペしてみて。

FastAPIでSQLAlchemy(async)を使ってて、`/api/system/refresh`エンドポイントから外部APIを複数叩いてDBを更新するバックグラウンドタスクを実装して。データ取得は非同期で並行処理してほしい。

これだけで、BackgroundTasksを使った非同期処理の骨格と、asyncio.gatherを使った並行取得のコードがだいたい出来上がるはず。

もうちょい具体的に投げるパターン

もうちょっと細かく指示したいときは、こんな感じで投げてみて。

外部API並行取得とDB更新をバックグラウンドで

複数の外部データソースから情報を取ってきて、それをまとめてDBに書き込む、みたいな処理ってよくあるじゃん?そういうのを全部バックグラウンドで非同期にやってほしいときに使うんだ。

FastAPIの`/api/system/refresh`エンドポイントで、複数の外部市場データAPI(yfinanceとニュースAPIを想定)からデータを非同期で並行取得して、SQLAlchemy(async)でDBに更新するタスクを作って。このDB更新処理はユーザーを待たせないようにバックグラウンドで実行してほしい。

特定の処理をAPSchedulerで定期実行するパターン

「毎日夜中に自動でデータ更新したい!」なんて時は、APSchedulerを組み合わせるのが便利だよ。FastAPIアプリの起動時にスケジューラをセットアップさせるんだ。

FastAPIアプリ起動時にAPSchedulerを初期化して、毎日深夜3時に`src/services/market_data.py`にある`fetch_and_update_all_stocks`関数を非同期で実行するよう設定して。これはDBを更新する処理だよ。

複数の非同期処理をまとめて待つパターン

APIエンドポイント内で、複数の非同期処理の結果を待ってからレスポンスを返したい、という場合はasyncio.gatherを使うよう明確に指示しちゃえばいい。

FastAPIのエンドポイントで、銘柄の現在価格取得と、過去のテクニカル指標計算、ニュース記事の取得の3つの非同期処理を並行で実行して、全ての結果が揃ってからレスポンスを返してほしい。`asyncio.gather`を使ってこれらの処理を待機して。

実践例 / 実録

うちの「US大型株サーベイアプリ」で実際にどう動かしてるか話すね。

このアプリ、40銘柄くらいのデータを毎日自動で取ってきて、スコアリングして推奨を出すんだ。FastAPIがバックエンドで動いてて、SQLAlchemy(async)でSQLiteにデータ突っ込んでる。

例えば、管理画面から「データ更新!」ってボタンを押すと、POST /api/system/refreshエンドポイントが呼ばれるんだよ。するとClaude Codeが生成してくれたコードが、BackgroundTasksを使って裏側でデータ更新を始めてくれる。ユーザーは更新が終わるまで待たなくていいから、UIが固まったりしないってわけ。

データ取得は、src/services/market_data.pyってファイルにまとめてるんだけど、ここで40銘柄分の株価データとかニュースとかを外部APIから取ってくる。これ全部同期で取ってたらAPIが固まっちゃうから、asyncio.gatherを使って全銘柄の取得を並行でやってるんだ。マジで爆速。

あと、「毎日深夜3時にデータを自動更新する」ってやつもやってるよ。これはFastAPIアプリの起動時にAPSchedulerを仕込んでおけば、毎日決まった時間に勝手にデータ取ってきてDBに突っ込んでくれる。手動でやる必要ないから、運用がまじで楽になった。

イメージとしてはこんな感じ。

# main.py (FastAPIアプリのエントリーポイント)
from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager

from .data.database import get_async_session
from .services.market_data import fetch_and_update_all_stocks

scheduler = AsyncIOScheduler()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # アプリ起動時にスケジューラを開始
    scheduler.add_job(
        fetch_and_update_all_stocks,
        'cron',
        hour=3,
        minute=0,
        args=[get_async_session], # セッションプロバイダを渡す
        name="Daily stock data refresh"
    )
    scheduler.start()
    yield
    # アプリ終了時にスケジューラをシャットダウン
    scheduler.shutdown()

app = FastAPI(lifespan=lifespan)

@app.post("/api/system/refresh")
async def trigger_data_refresh(
    background_tasks: BackgroundTasks,
    db_session: AsyncSession = Depends(get_async_session)
):
    background_tasks.add_task(fetch_and_update_all_stocks, db_session)
    return {"message": "データ更新をバックグラウンドで開始したよ"}

こんな感じでプロンプトとCLAUDE.mdをちゃんと書けば、FastAPIの非同期処理やバックグラウンドタスクもいい感じに生成してくれるよ。

つまずきポイント

いくつかハマりやすいポイントがあるから、知っておくといいよ。

非同期DBセッションの管理

FastAPIの依存性注入でAsyncSessionを使うとき、ちゃんとyieldでセッションをクローズするの忘れがち。じゃないとコネクションがリークしちゃうことがあるから気をつけてね。

# data/database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

ASYNC_DATABASE_URL = "sqlite+aiosqlite:///./sql_app.db"
engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_async_session():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

同期ブロッキング処理と非同期処理の共存

yfinanceみたいな外部ライブラリは、ほとんどが同期処理なんだ。そのままawaitなしで呼ぶと、結局FastAPIのイベントループをブロックしちゃって、非同期の意味がなくなることがある。そういう時はrun_in_threadpoolを使うといいよ。

# FastAPIが同期処理を別スレッドで実行してくれる
from fastapi import FastAPI
from starlette.concurrency import run_in_threadpool
import yfinance as yf

@app.get("/sync_blocking_data")
async def get_blocking_data(ticker: str):
    # yfinanceの同期処理を別スレッドで実行
    data = await run_in_threadpool(yf.Ticker(ticker).history, period="1d")
    return {"data": data.to_dict()}

あと、yfinanceを使うときは、以前記事にした「yfinanceで株価データ取ってきてって言ったのにバグるんだけど何これ?」も参考にしてみて。データ取得の細かい罠とか、キャッシュの使い方が解説されてるよ。

APSchedulerのライフサイクル

APSchedulerは、FastAPIアプリの起動時に開始して、アプリ終了時にはきっちりシャットダウンしないと、プロセスが残っちゃうことがあるんだ。lifespanコンテキストマネージャー(FastAPI 0.100.0以降)を使うのが一番スマートだよ。古いバージョンなら@app.on_eventデコレータを使えばいい。