ChatGPT の優れた応答精度を自分のアプリケーションに統合したいのに、テキスト入出力だけでは物足りない。リアルタイムで音声を扱える AI エージェントを構築したいけれど、どうやって実装すればよいか分からない。そんなエンジニアの悩みは尽きません。
朗報です。OpenAI が提供する Realtime API を使えば、低遅延の音声対応 AI エージェントを Python で簡単に構築できます。この記事では、セットアップから本番運用まで、実践的なステップを網羅します。
OpenAI Realtime API とは何か
OpenAI Realtime API は、WebSocket を通じて音声と テキストをリアルタイムで処理できる API です。従来の REST API と異なり、双方向通信により遅延を最小化します。
主な特徴は以下の通りです。
- 低遅延(200〜500ms)での音声入出力
- クライアント側での音声処理オプション
- ストリーミング対応による自然な会話
- 複数の音声フォーマット対応
- セッション内での継続的な文脈保持
テキスト API と比較すると、音声エージェントの開発難度が大幅に低下します。
事前準備:環境構築と認証設定
Python で Realtime API を使用するには、いくつかの準備が必要です。
1. API キーの取得
OpenAI 公式サイトから API キーを取得してください。
- OpenAI Platform にログイン
- 「API keys」セクションから新規キーを生成
- 環境変数に設定:
export OPENAI_API_KEY="your-key"
2. 必要なライブラリのインストール
以下を実行してください。
pip install openai websockets python-dotenv pyaudio
各ライブラリの役割は下記の通りです。
- openai:OpenAI 公式 SDK
- websockets:WebSocket 通信用
- python-dotenv:環境変数管理
- pyaudio:マイク・スピーカーアクセス
3. 環境ファイルの作成
プロジェクト直下に .env ファイルを作成します。
OPENAI_API_KEY=your_actual_api_key_here
REALTIME_API_URL=wss://api.openai.com/v1/realtime
MODEL=gpt-4-realtime-preview
最初の実装:シンプルな音声エージェント
まずは基本的な音声チャットエージェントを実装します。
WebSocket 接続の実装
import asyncio
import websockets
import json
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("OPENAI_API_KEY")
REALTIME_URL = os.getenv("REALTIME_API_URL")
async def connect_realtime():
"""Realtime API への WebSocket 接続"""
headers = {
"Authorization": f"Bearer {API_KEY}",
"OpenAI-Beta": "realtime=v1"
}
async with websockets.connect(REALTIME_URL, subprotocols=["realtime"]) as websocket:
# セッション開始メッセージを送信
session_config = {
"type": "session.update",
"session": {
"model": "gpt-4-realtime-preview",
"instructions": "You are a helpful AI assistant. Respond naturally in Japanese."
}
}
await websocket.send(json.dumps(session_config))
# サーバー応答を受け取る
response = await websocket.recv()
print("Session started:", json.loads(response))
# 実行
asyncio.run(connect_realtime())
このコードは WebSocket 接続を確立し、セッション設定を送信します。
音声入力の処理
マイクから音声を取得する実装を追加します。
import pyaudio
import base64
async def send_audio_input(websocket):
"""マイクから音声を取得して送信"""
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 24000
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
print("録音開始...")
# 3秒間録音
for _ in range(0, int(RATE / CHUNK * 3)):
data = stream.read(CHUNK)
audio_base64 = base64.b64encode(data).decode('utf-8')
message = {
"type": "input_audio_buffer.append",
"audio": audio_base64
}
await websocket.send(json.dumps(message))
# 入力終了を通知
await websocket.send(json.dumps({"type": "input_audio_buffer.commit"}))
stream.stop_stream()
stream.close()
p.terminate()
print("録音終了")
応答の受け取りと音声出力
AI からの応答を処理し、スピーカーから出力します。
async def receive_and_play_audio(websocket):
"""AI の応答を受け取って音声出力"""
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=24000,
output=True
)
while True:
try:
response = await websocket.recv()
data = json.loads(response)
if data.get("type") == "response.audio.delta":
# 音声デルタ(差分)を受け取る
audio_data = base64.b64decode(data.get("delta", ""))
stream.write(audio_data)
elif data.get("type") == "response.text.delta":
# テキスト応答も同時に表示
print(f"AI: {data.get('delta', '')}", end="", flush=True)
elif data.get("type") == "response.done":
print("\n応答完了")
break
except asyncio.TimeoutError:
break
stream.stop_stream()
stream.close()
p.terminate()
実装のポイント:エラーハンドリング
Python 非同期処理でエラーが頻発する理由 の記事でも触れられている通り、非同期処理ではエラー処理が重要です。
Realtime API でも同様にタイムアウトや接続切断を適切に処理しましょう。
async def robust_realtime_chat():
"""エラーハンドリング付きの実装"""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
async with websockets.connect(
REALTIME_URL,
subprotocols=["realtime"],
ping_interval=20,
ping_timeout=10
) as websocket:
# セッション初期化
await websocket.send(json.dumps({
"type": "session.update",
"session": {"model": "gpt-4-realtime-preview"}
}))
# 音声入力・出力処理
await asyncio.gather(
send_audio_input(websocket),
receive_and_play_audio(websocket)
)
break # 成功時はループを抜ける
except websockets.exceptions.ConnectionClosed:
retry_count += 1
print(f"接続切断。再試行 {retry_count}/{max_retries}")
await asyncio.sleep(2 ** retry_count) # 指数バックオフ
except Exception as e:
print(f"エラー発生: {e}")
retry_count += 1
await asyncio.sleep(2)
高度な設定:AIエージェントのカスタマイズ
Realtime API では、エージェントの動作を細かくカスタマイズできます。
システムプロンプトの設定
AI の振る舞いを制御するため、詳細なシステムプロンプトを設定します。
system_instructions = """
あなたはカスタマーサポート AI です。以下のルールに従ってください:
1. 敬語を使用し、丁寧な対応をする
2. 不明な点は確認してから回答する
3. 問題が解決しない場合は人間へのエスカレーションを提案する
4. 回答は 3 文以内で簡潔にまとめる
5. 日本語のみで応答する
サポート対象製品:
- 製品 A(機能 1、機能 2)
- 製品 B(機能 3、機能 4)
"""
session_config = {
"type": "session.update",
"session": {
"model": "gpt-4-realtime-preview",
"instructions": system_instructions,
"voice": "alloy", # 音声の選択(alloy, echo, fable, onyx など)
"input_audio_format": "pcm16",
"output_audio_format": "pcm16"
}
}
await websocket.send(json.dumps(session_config))
会話履歴の管理
複数ターンの会話で文脈を保持するには、会話履歴を管理する必要があります。
class ConversationManager:
def __init__(self):
self.conversation_history = []
def add_user_message(self, content):
"""ユーザーメッセージを追加"""
self.conversation_history.append({
"role": "user",
"content": content
})
def add_assistant_message(self, content):
"""アシスタントメッセージを追加"""
self.conversation_history.append({
"role": "assistant",
"content": content
})
def get_context(self):
"""会話文脈を取得(直近 5 ターン)"""
return self.conversation_history[-10:]
# 使用例
conv_manager = ConversationManager()
conv_manager.add_user_message("このサービスの料金は?")
conv_manager.add_assistant_message("料金は月額 5000 円です")
本番環境での配備とベストプラクティス
開発から本番へ移行する際の留意点を説明します。
レート制限への対応
OpenAI API にはレート制限があります。適切に対応しましょう。
- 1 分あたりの リクエスト数を監視
- リトライロジックに指数バックオフを実装
- キャッシング戦略を検討
- 複数 API キーでの負荷分散
import time
class RateLimiter:
def __init__(self, max_requests=100, time_window=60):
self.max_requests = max_requests
self.time_window = time_window
self.requests = []
async def wait_if_needed(self):
"""レート制限に基づいて待機"""
now = time.time()
# 古いリクエストを削除
self.requests = [t for t in self.requests if now - t < self.time_window]
if len(self.requests) >= self.max_requests:
sleep_time = self.time_window - (now - self.requests[0])
await asyncio.sleep(sleep_time)
self.requests.append(now)
limiter = RateLimiter()
await limiter.wait_if_needed()
セキュリティ対策
本番環境では以下の対策が必須です。
- API キーを環境変数で管理(コードに埋め込まない)
- HTTPS/WebSocket Secure(wss://)を使用
- ユーザー入力のサニタイズ
- ログには個人情報を記録しない
- 定期的な依存ライブラリの更新
よくあるトラブルシューティング
実装時に遭遇しやすい問題と対策を紹介します。
音声が途切れる場合
バッファサイズやネットワーク遅延が原因の場合が多いです。
# バッファサイズを増加
CHUNK = 2048 # デフォルトの 1024 から増加
RATE = 24000
# WebSocket のキープアライブ設定
async with websockets.connect(
REALTIME_URL,
ping_interval=20, # 20 秒ごとに ping
ping_timeout=10,
max_size=10_000_000 # 最大メッセージサイズを拡大
) as websocket:
pass
接続がすぐに切れる
認証やセッション設定に問題がある可能性があります。
- API キーが有効か確認
- リクエストヘッダの
Authorizationフォーマットを確認 - セッション設定メッセージを正しく送信しているか確認
- API のレート制限に達していないか確認
おすすめ書籍・ガジェット
- ChatGPT API 活用ガイド — OpenAI API 全般の実装パターンを網羅した実践書
- Python 非同期プログラミング asyncio 完全解説 — 本記事の非同期処理を深く理解するなら必読
- HHKB Pro キーボード — 長時間のコーディングに最適なエンジニア向けガジェット
まとめ:実装の次のステップ
OpenAI Realtime API を使えば、Python で音声対応 AI エージェントを短期間で構築できます。この記事で紹介したコード例を参考に、まずはシンプルな実装から始めることをお勧めします。
さらに高度な実装に進む際は、Claude Computer Use で始めるブラウザ自動化 のような外部システムとの統合も視野に入れましょう。複数の AI ツールを組み合わせることで、より強力なエージェントが実現できます。
また、デュアルモニター縦置き設定 で開発環境を整備することも、生産性向上に役立ちます。
OpenAI Realtime API と通常の API の違いは何ですか?
通常の OpenAI API(REST ベース)はリクエスト送信後にレスポンスを受け取る方式ですが、Realtime API は WebSocket を使った双方向通信です。これにより音声の低遅延処理が可能になり、会話のような自然なインタラクションが実現できます。また、Realtime API はストリーミング応答により、テキストや音声をリアルタイムで段階的に受け取れます。
Python 初心者でも Realtime API を実装できますか?
はい、本記事で紹介したコード例を参考にすれば実装可能です。ただし、非同期プログラミング(async/await)の基本的な理解があると、より応用的な実装ができます。WebSocket の概念や JSON フォーマットの理解も役立ちますが、これらは実装しながら習得できます。
Realtime API の利用コストはいくらですか?
OpenAI Realtime API の料金は入力と出力で異なります。通常、1 分あたりの音声処理に対して課金される方式です。詳細な価格は OpenAI の公式プライシングページを確認してください。本番運用では API キーごとの使用額を監視し、予期しない高額請求を防ぐことが重要です。
音声フォーマットは pcm16 以外にも対応していますか?
Realtime API は主に pcm16(16 ビット PCM)フォーマットに対応しています。ただし、将来的に他のフォーマット対応が追加される可能性があるため、OpenAI の最新ドキュメントを確認することをお勧めします。PCM 以外のフォーマットを使用する場合は、事前に変換処理を挟む必要があります。
複数ユーザーの同時接続に対応できますか?
はい、各ユーザーで独立した WebSocket 接続を確立することで対応できます。本記事のコード例では単一接続を示していますが、本番環境では Flask や FastAPI などの Web フレームワークを使用して、複数クライアントからの接続を管理するのが一般的です。その際はレート制限やコネクション数の上限に注意が必要です。