Claude APIのストリーミング実装をPython×FastAPIで完全解説|リアルタイム応答の実装方法

AI・ChatGPT活用

AIアプリケーション開発をしていると、ユーザーに長めの回答をリアルタイムで表示したいニーズが出てきますよね。Claude APIを使っているなら、なおさら「なぜかAPIからの応答が遅く感じる」「一文字ずつ表示したい」という悩みが生まれます。その理由は、ストリーミング機能を活用していないからかもしれません。

本記事では、Claude APIのストリーミング機能をPythonとFastAPIで実装する方法を、コード例を交えながら完全に解説します。リアルタイムでトークンを受け取り、ユーザーに段階的に回答を表示する仕組みは、モダンなAIアプリケーション開発の必須スキルです。

Claude APIストリーミングとは

Claude APIのストリーミングは、APIからの応答をトークン単位で分割して、リアルタイムに受け取る機能です。通常の非ストリーミングAPIでは、Claude全体の回答が完成してからまとめて返されます。対してストリーミングでは、1トークン生成されるたびにクライアントへ送信されます。

これにより以下のメリットが生まれます。

  • ユーザー体験の向上:回答が「生成されている」リアルタイム感が得られる
  • 応答時間の短縮:最初のトークンが届くまでの時間が短くなる
  • 帯域幅の効率化:大容量応答の場合、段階的に処理できる
  • UIの豊かさ:チャットアプリなど、より自然なUXが実現できる

前提条件と環境構築

必要なライブラリのインストール

まず、Python環境にClaudeクライアントとFastAPIをインストールしましょう。

pip install anthropic fastapi uvicorn python-dotenv

バージョン確認として、次のコマンドで正しくインストールされたか確認できます。

python -c "import anthropic; print(anthropic.__version__)"

APIキーの設定

AnthropicのコンソールからAPI キーを取得し、環境変数として設定します。

export ANTHROPIC_API_KEY="your-api-key-here"

Pythonコード内では、python-dotenvを使って読み込みます。

from dotenv import load_dotenv
import os

load_dotenv()
api_key = os.getenv("ANTHROPIC_API_KEY")

ステップ1:基本的なストリーミング実装

シンプルなストリーミング例

まずは、最もシンプルなストリーミング実装から始めましょう。以下のコードは、Claude APIのストリーミングを使用して、プロンプトに対する回答をトークン単位で取得します。

from anthropic import Anthropic

client = Anthropic()

def stream_response(user_message: str):
    with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": user_message}
        ]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
        print()

if __name__ == "__main__":
    stream_response("Pythonの非同期プログラミングについて簡潔に説明してください。")

このコードの重要なポイントはwith client.messages.stream()です。これは、APIのストリーミングレスポンスをコンテキストマネージャで管理し、自動的にリソースをクローズします。

ステップ2:FastAPIでのストリーミングエンドポイント実装

基本的なエンドポイント設計

FastAPIを使って、ストリーミング対応のエンドポイントを実装します。クライアント側で段階的にデータを受け取るため、StreamingResponseを活用します。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
import os
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

def generate_stream(user_message: str):
    with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[
            {"role": "user", "content": user_message}
        ]
    ) as stream:
        for text in stream.text_stream:
            yield text

@app.post("/chat/stream")
async def chat_stream(request: dict):
    user_message = request.get("message", "")
    return StreamingResponse(
        generate_stream(user_message),
        media_type="text/plain; charset=utf-8"
    )

このエンドポイントにPOSTリクエストを送信すると、ストリーミングで回答が返されます。

フロントエンド側での受け取り方

JavaScriptでのfetch APIを使った受け取り例を示します。

async function streamChatResponse(message) {
    const response = await fetch('/chat/stream', {
        method: 'POST',
        headers: {'Content-Type': 'application/json'},
        body: JSON.stringify({ message: message })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        console.log(chunk);
        document.getElementById('response').textContent += chunk;
    }
}

ステップ3:会話履歴を保持したストリーミング実装

マルチターンの会話対応

実務では、複数ターンの会話履歴を管理する必要があります。以下は、セッション管理を含めた実装です。

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from anthropic import Anthropic
from typing import List
import os
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

conversations = {}

def generate_stream_with_history(session_id: str, user_message: str):
    if session_id not in conversations:
        conversations[session_id] = []
    
    conversations[session_id].append({
        "role": "user",
        "content": user_message
    })
    
    with client.messages.stream(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=conversations[session_id]
    ) as stream:
        full_response = ""
        for text in stream.text_stream:
            full_response += text
            yield text
        
        conversations[session_id].append({
            "role": "assistant",
            "content": full_response
        })

@app.post("/chat/{session_id}/stream")
async def chat_with_history(session_id: str, request: dict):
    user_message = request.get("message", "")
    return StreamingResponse(
        generate_stream_with_history(session_id, user_message),
        media_type="text/plain; charset=utf-8"
    )

このアプローチにより、同じセッションIDを使い続けることで、会話の文脈を保持できます。

ステップ4:エラーハンドリングと本番対応

適切なエラーハンドリング

本番環境では、API呼び出しが失敗することも想定する必要があります。以下は堅牢なエラーハンドリング実装です。

from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from anthropic import Anthropic, APIError, APIConnectionError
import os
import logging
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()
client = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
logger = logging.getLogger(__name__)

def generate_stream_with_error_handling(user_message: str):
    try:
        with client.messages.stream(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=[
                {"role": "user", "content": user_message}
            ]
        ) as stream:
            for text in stream.text_stream:
                yield text
    except APIConnectionError as e:
        logger.error(f"Connection error: {e}")
        yield "申し訳ありません。APIへの接続に失敗しました。"
    except APIError as e:
        logger.error(f"API error: {e}")
        yield "申し訳ありません。APIでエラーが発生しました。"

@app.post("/chat/stream")
async def chat_stream(request: dict):
    user_message = request.get("message", "")
    if not user_message or not user_message.strip():
        raise HTTPException(status_code=400, detail="Message cannot be empty")
    
    return StreamingResponse(
        generate_stream_with_error_handling(user_message),
        media_type="text/plain; charset=utf-8"
    )

レート制限への対応

Claude APIはレート制限を設けています。本番運用では、リトライロジックを実装しましょう。

import asyncio
from anthropic import APIError, RateLimitError

async def stream_with_retry(user_message: str, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            with client.messages.stream(
                model="claude-3-5-sonnet-20241022",
                max_tokens=1024,
                messages=[
                    {"role": "user", "content": user_message}
                ]
            ) as stream:
                for text in stream.text_stream:
                    yield text
            return
        except RateLimitError as e:
            if attempt < max_retries - 1:
                wait_time = (2 ** attempt) * 5
                logger.warning(f"Rate limited. Retrying after {wait_time}s")
                await asyncio.sleep(wait_time)
            else:
                yield "APIレート制限に達しました。しばらく後にお試しください。"

おすすめ書籍・ガジェット

🤖 このブログはAIで自動運営しています。 同じ仕組みを御社にも導入できます。 無料相談はこちら
タイトルとURLをコピーしました