AIアプリケーション開発をしていると、ユーザーに長めの回答をリアルタイムで表示したいニーズが出てきますよね。Claude APIを使っているなら、なおさら「なぜかAPIからの応答が遅く感じる」「一文字ずつ表示したい」という悩みが生まれます。その理由は、ストリーミング機能を活用していないからかもしれません。
本記事では、Claude APIのストリーミング機能をPythonとFastAPIで実装する方法を、コード例を交えながら完全に解説します。リアルタイムでトークンを受け取り、ユーザーに段階的に回答を表示する仕組みは、モダンなAIアプリケーション開発の必須スキルです。
Claude APIストリーミングとは
Claude APIのストリーミングは、APIからの応答をトークン単位で分割して、リアルタイムに受け取る機能です。通常の非ストリーミングAPIでは、Claude全体の回答が完成してからまとめて返されます。対してストリーミングでは、1トークン生成されるたびにクライアントへ送信されます。
これにより以下のメリットが生まれます。
- ユーザー体験の向上:回答が「生成されている」リアルタイム感が得られる
- 応答時間の短縮:最初のトークンが届くまでの時間が短くなる
- 帯域幅の効率化:大容量応答の場合、段階的に処理できる
- UIの豊かさ:チャットアプリなど、より自然なUXが実現できる
前提条件と環境構築
必要なライブラリのインストール
まず、Python環境にClaudeクライアントとFastAPIをインストールしましょう。
pip install anthropic fastapi uvicorn python-dotenv
バージョン確認として、次のコマンドで正しくインストールされたか確認できます。
python -c "import anthropic; print(anthropic.__version__)"
APIキーの設定
AnthropicのコンソールからAPI キーを取得し、環境変数として設定します。
export ANTHROPIC_API_KEY="your-api-key-here"
Pythonコード内では、python-dotenvを使って読み込みます。
from dotenv import load_dotenv
import os
load_dotenv()
api_key = os.getenv("ANTHROPIC_API_KEY")
ステップ1:基本的なストリーミング実装
シンプルなストリーミング例
まずは、最もシンプルなストリーミング実装から始めましょう。以下のコードは、Claude APIのストリーミングを使用して、プロンプトに対する回答をトークン単位で取得します。
from anthropic import Anthropic
client = Anthropic()
def stream_response(user_message: str):
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": user_message}
]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
if __name__ == "__main__":
stream_response("Pythonの非同期プログラミングについて簡潔に説明してください。")
このコードの重要なポイントはwith client.messages.stream()です。これは、APIのストリーミングレスポンスをコンテキストマネージャで管理し、自動的にリソースをクローズします。
ステップ2:FastAPIでのストリーミングエンドポイント実装
基本的なエンドポイント設計
FastAPIを使って、ストリーミング対応のエンドポイントを実装します。クライアント側で段階的にデータを受け取るため、StreamingResponseを活用します。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
def generate_stream(user_message: str):
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": user_message}
]
) as stream:
for text in stream.text_stream:
yield text
@app.post("/chat/stream")
async def chat_stream(request: dict):
user_message = request.get("message", "")
return StreamingResponse(
generate_stream(user_message),
media_type="text/plain; charset=utf-8"
)
このエンドポイントにPOSTリクエストを送信すると、ストリーミングで回答が返されます。
フロントエンド側での受け取り方
JavaScriptでのfetch APIを使った受け取り例を示します。
async function streamChatResponse(message) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({ message: message })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
console.log(chunk);
document.getElementById('response').textContent += chunk;
}
}
ステップ3:会話履歴を保持したストリーミング実装
マルチターンの会話対応
実務では、複数ターンの会話履歴を管理する必要があります。以下は、セッション管理を含めた実装です。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
from typing import List
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
conversations = {}
def generate_stream_with_history(session_id: str, user_message: str):
if session_id not in conversations:
conversations[session_id] = []
conversations[session_id].append({
"role": "user",
"content": user_message
})
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=conversations[session_id]
) as stream:
full_response = ""
for text in stream.text_stream:
full_response += text
yield text
conversations[session_id].append({
"role": "assistant",
"content": full_response
})
@app.post("/chat/{session_id}/stream")
async def chat_with_history(session_id: str, request: dict):
user_message = request.get("message", "")
return StreamingResponse(
generate_stream_with_history(session_id, user_message),
media_type="text/plain; charset=utf-8"
)
このアプローチにより、同じセッションIDを使い続けることで、会話の文脈を保持できます。
ステップ4:エラーハンドリングと本番対応
適切なエラーハンドリング
本番環境では、API呼び出しが失敗することも想定する必要があります。以下は堅牢なエラーハンドリング実装です。
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from anthropic import Anthropic, APIError, APIConnectionError
import os
import logging
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
logger = logging.getLogger(__name__)
def generate_stream_with_error_handling(user_message: str):
try:
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": user_message}
]
) as stream:
for text in stream.text_stream:
yield text
except APIConnectionError as e:
logger.error(f"Connection error: {e}")
yield "申し訳ありません。APIへの接続に失敗しました。"
except APIError as e:
logger.error(f"API error: {e}")
yield "申し訳ありません。APIでエラーが発生しました。"
@app.post("/chat/stream")
async def chat_stream(request: dict):
user_message = request.get("message", "")
if not user_message or not user_message.strip():
raise HTTPException(status_code=400, detail="Message cannot be empty")
return StreamingResponse(
generate_stream_with_error_handling(user_message),
media_type="text/plain; charset=utf-8"
)
レート制限への対応
Claude APIはレート制限を設けています。本番運用では、リトライロジックを実装しましょう。
import asyncio
from anthropic import APIError, RateLimitError
async def stream_with_retry(user_message: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": user_message}
]
) as stream:
for text in stream.text_stream:
yield text
return
except RateLimitError as e:
if attempt < max_retries - 1:
wait_time = (2 ** attempt) * 5
logger.warning(f"Rate limited. Retrying after {wait_time}s")
await asyncio.sleep(wait_time)
else:
yield "APIレート制限に達しました。しばらく後にお試しください。"
おすすめ書籍・ガジェット
- FastAPI実践ガイド — 非同期APIの設計からデプ