SQLAlchemy 2.0への移行を検討しているものの、特に非同期セッション管理の実装方法で手が止まっていませんか。従来のバージョンから大きく変わったセッション管理のパラダイムは、多くのエンジニアにとって一つの大きな壁です。
本記事では、実装例を交えながら、SQLAlchemy 2.0の非同期セッション管理への移行をステップバイステップで解説します。コピペで実行可能なコード例も用意しているので、今日から移行を始められます。
SQLAlchemy 2.0のセッション管理が変わった理由
SQLAlchemy 2.0は、Python非同期処理の標準化に伴い、セッション管理を根本的に刷新しました。従来の1.4系では、非同期対応が部分的であったのに対し、2.0では完全な非同期ファーストアーキテクチャへと移行しています。
この変更により、以下のメリットが生まれています。
- 高スケーラビリティ:限られたリソースで大量の並行リクエストを処理可能
- 型安全性の向上:SQLAlchemy 2.0の型ヒント完全サポート
- コードの可読性:async/awaitの明示的な記述で意図が明確化
- パフォーマンス:I/O待機時間の有効活用による整体スループットの向上
おすすめ書籍・ガジェット
- SQLAlchemy 2.0入門ガイド:公式ドキュメントを日本語で体系的に学べる実務向け参考書
- Python非同期実践ガイド:asyncioとasync/awaitの実装パターンが網羅的に学べる
- Logicool MX Keys:長時間のコーディングセッションで快適性を実現する機械式キーボード
SQLAlchemy 1.4から2.0への移行ステップ
ステップ1:依存関係の更新
まず、requirements.txtまたはpyproject.tomlを更新します。SQLAlchemy 2.0では、asyncio対応ドライバーの使用が前提です。
# requirements.txt(更新例)
SQLAlchemy==2.0.23
sqlalchemy[asyncio]==2.0.23
asyncpg==0.29.0 # PostgreSQL用非同期ドライバー
# または aiosqlite==0.19.0(SQLite用)
環境に応じて非同期ドライバーを選択してください。PostgreSQLの場合はasyncpg、SQLiteはaiosqliteが標準です。
ステップ2:エンジンの初期化を非同期対応に変更
従来のエンジン初期化とは異なり、SQLAlchemy 2.0では明示的にAsyncEngineを使用します。
# sqlalchemy_config.py(SQLAlchemy 1.4)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine(
"postgresql://user:password@localhost/dbname",
echo=True
)
SessionLocal = sessionmaker(bind=engine)
# sqlalchemy_config.py(SQLAlchemy 2.0:非同期対応)
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/dbname",
echo=True
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
async_engine=True
)
重要なポイントは、URLスキーム前に非同期ドライバー名(asyncpg、aiosqlite等)を付与することです。この指定がないとエラーが発生します。
ステップ3:セッション依存性の実装
FastAPIなどのフレームワークを使用する場合、セッション管理をDependencyInjectionで統一します。
# database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from typing import AsyncGenerator
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=False)
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with async_session() as session:
try:
yield session
finally:
await session.close()
# main.py(FastAPI例)
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
app = FastAPI()
@app.get("/users")
async def list_users(session: AsyncSession = Depends(get_db)):
result = await session.execute(select(User))
users = result.scalars().all()
return users
AsyncSessionをDependencyとして注入することで、リクエストごとにセッションが自動的に管理されます。
非同期セッション管理の実装例
基本的なCRUD操作
SQLAlchemy 2.0では、全てのデータベース操作が非同期化されます。以下の例をご覧ください。
# models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True)
# crud.py(SQLAlchemy 2.0)
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def create_user(
session: AsyncSession,
name: str,
email: str
) -> User:
user = User(name=name, email=email)
session.add(user)
await session.commit()
await session.refresh(user)
return user
async def get_user_by_id(
session: AsyncSession,
user_id: int
) -> User | None:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def update_user(
session: AsyncSession,
user_id: int,
**kwargs
) -> User | None:
user = await get_user_by_id(session, user_id)
if user:
for key, value in kwargs.items():
setattr(user, key, value)
await session.commit()
await session.refresh(user)
return user
async def delete_user(
session: AsyncSession,
user_id: int
) -> bool:
user = await get_user_by_id(session, user_id)
if user:
await session.delete(user)
await session.commit()
return True
return False
全ての関数がasync関数となり、データベース操作の前にawaitキーワードが必須です。これにより、I/O待機中に他のタスクが実行される仕組みが確立されます。
複雑なクエリと関連レコードの取得
リレーション管理も非同期対応が必要です。
# models.py(リレーション拡張版)
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(50))
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String(200))
user_id = Column(Integer, ForeignKey("users.id"))
author = relationship("User", back_populates="posts")
# crud.py(関連レコード取得)
from sqlalchemy.orm import selectinload
async def get_user_with_posts(
session: AsyncSession,
user_id: int
) -> User | None:
result = await session.execute(
select(User)
.where(User.id == user_id)
.options(selectinload(User.posts))
)
return result.scalar_one_or_none()
selectinloadを使用することで、N+1クエリ問題を回避できます。これはSQLAlchemy 2.0でも同様に有効です。
トランザクション管理
複数の操作を一つのトランザクションで実行する場合、以下のようにコンテキストマネージャーを使用します。
async def transfer_posts(
session: AsyncSession,
from_user_id: int,
to_user_id: int
) -> bool:
try:
async with session.begin():
from_user = await get_user_by_id(session, from_user_id)
to_user = await get_user_by_id(session, to_user_id)
if not from_user or not to_user:
return False
result = await session.execute(
select(Post).where(Post.user_id == from_user_id)
)
posts = result.scalars().all()
for post in posts:
post.user_id = to_user_id
await session.commit()
return True
except Exception as e:
await session.rollback()
print(f"Transaction failed: {e}")
return False
async with session.begin()により、エラー時の自動ロールバックが保証されます。