Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds a new Python package for voice-based agent interactions using Azure Voice Live SDK, enabling real-time voice conversations with streaming audio, voice activity detection, and function calling support.
Key Changes
- Introduces
VoiceLiveAgentclass for real-time voice interactions with Azure OpenAI - Implements streaming audio capabilities with server-side voice activity detection (VAD)
- Adds WebSocket handler for browser-based voice interfaces
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 31 comments.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Package configuration defining dependencies and metadata for the azure-voice-live package |
| streaming_voice_chat.py | Example demonstrating streaming voice chat with microphone input, audio playback, and interruption support |
| websocket_handler.py | WebSocket handler bridging browser connections to Azure Voice Live SDK |
| _web/init.py | Web module initialization exposing VoiceWebSocketHandler |
| _voice_live_session.py | Session management for Azure Voice Live WebSocket connections |
| _voice_live_agent.py | Core VoiceLiveAgent implementation with streaming and function calling support |
| _types.py | Type definitions for AudioContent and VoiceOptions |
| _event_processor.py | Event processor converting Azure Voice Live events to Agent Framework updates |
| _audio_utils.py | Audio encoding/decoding utilities for PCM16 format handling |
| init.py | Package initialization exporting main public classes |
| README.md | Documentation describing package features and installation instructions |
| except Exception as e: | ||
| print(f"\n⚠️ Error playing audio: {e}") |
There was a problem hiding this comment.
The error handling uses a bare 'except Exception' clause that catches all exceptions. The error is printed but not logged, and execution continues, potentially masking critical issues.
| print(f"\n[DEBUG] Function call detected: {props.get('name')}") | ||
| # Store pending function call to execute after response_done | ||
| pending_function_call = { | ||
| "call_id": props.get("call_id"), | ||
| "name": props.get("name"), | ||
| "arguments": props.get("arguments") | ||
| } | ||
|
|
||
| # When response is done, execute any pending function call | ||
| elif event_type == "response_complete" and pending_function_call: | ||
| print(f"[DEBUG] Response done, executing pending function call") | ||
| # Execute function and create new response | ||
| asyncio.create_task(self._handle_function_call_after_response( | ||
| pending_function_call["call_id"], | ||
| pending_function_call["name"], | ||
| pending_function_call["arguments"] | ||
| )) | ||
| pending_function_call = None | ||
|
|
||
| yield update | ||
|
|
||
| async def _handle_function_call_after_response(self, call_id: str, name: str, arguments: str) -> None: | ||
| """Handle function call execution after response is done, then trigger new response. | ||
|
|
||
| Args: | ||
| call_id: Function call ID | ||
| name: Function name | ||
| arguments: JSON string of arguments | ||
| """ | ||
| import json | ||
|
|
||
| print(f"[DEBUG] Executing function: {name} with call_id={call_id}, args={arguments}") | ||
|
|
||
| try: | ||
| # Parse arguments | ||
| args_dict = json.loads(arguments) if arguments else {} | ||
|
|
||
| # Find the function | ||
| function = None | ||
| for tool in self._tools: | ||
| if tool.name == name: | ||
| function = tool | ||
| break | ||
|
|
||
| if not function: | ||
| result = f"Error: Function '{name}' not found" | ||
| print(f"[DEBUG] Function not found: {name}") | ||
| else: | ||
| # Execute the function | ||
| print(f"[DEBUG] Calling function {name} with args: {args_dict}") | ||
| result = await function(**args_dict) | ||
| print(f"[DEBUG] Function {name} returned: {result}") | ||
|
|
||
| # Send result back | ||
| print(f"[DEBUG] Sending function result for call_id={call_id}") | ||
| await self._session.send_function_result(call_id, str(result)) | ||
| print(f"[DEBUG] Function result sent successfully") | ||
|
|
||
| # Now trigger a new response to process the function result | ||
| # This is safe because we waited for RESPONSE_DONE | ||
| print(f"[DEBUG] Creating new response to process function result") | ||
| await self._session.create_response() | ||
| print(f"[DEBUG] New response created") | ||
|
|
||
| except Exception as e: | ||
| error_msg = f"Error executing {name}: {e}" | ||
| print(f"[DEBUG] Exception in function execution: {e}") | ||
| import traceback | ||
| traceback.print_exc() | ||
| try: | ||
| await self._session.send_function_result(call_id, error_msg) | ||
| await self._session.create_response() | ||
| except Exception as e2: | ||
| print(f"[DEBUG] Failed to send error result: {e2}") | ||
|
|
||
| async def cancel_response(self) -> None: | ||
| """Cancel the ongoing agent response. | ||
|
|
||
| This is used for interruption handling - when the user starts speaking | ||
| while the agent is responding, call this to stop the agent's response. | ||
|
|
||
| Raises: | ||
| RuntimeError: If session is not connected | ||
|
|
||
| Example: | ||
| ```python | ||
| await agent.connect() | ||
|
|
||
| # If user interrupts, cancel the response | ||
| if user_started_speaking and agent_is_speaking: | ||
| await agent.cancel_response() | ||
|
|
||
| await agent.disconnect() | ||
| ``` | ||
| """ | ||
| if not self._session: | ||
| raise RuntimeError("Must call connect() before canceling response") | ||
|
|
||
| await self._session.cancel_response() | ||
|
|
||
| def _build_session_config(self) -> Any: | ||
| """Build Azure Voice Live session configuration. | ||
|
|
||
| Returns: | ||
| RequestSession configuration object | ||
| """ | ||
| from azure.ai.voicelive.models import ( | ||
| AzureStandardVoice, | ||
| InputAudioFormat, | ||
| Modality, | ||
| OutputAudioFormat, | ||
| RequestSession, | ||
| ServerVad, | ||
| ) | ||
|
|
||
| # Configure VAD | ||
| turn_detection = None | ||
| if self._enable_vad: | ||
| turn_detection = ServerVad( | ||
| threshold=self._vad_threshold, | ||
| prefix_padding_ms=self._vad_prefix_padding_ms, | ||
| silence_duration_ms=self._vad_silence_duration_ms, | ||
| ) | ||
|
|
||
| # Configure transcription | ||
| input_audio_transcription = None | ||
| if self._input_audio_transcription: | ||
| input_audio_transcription = {"model": "whisper-1"} | ||
|
|
||
| # Build session config | ||
| return RequestSession( | ||
| modalities=[Modality.TEXT, Modality.AUDIO], | ||
| instructions=self._instructions, | ||
| voice=AzureStandardVoice(name=self._voice), | ||
| input_audio_format=InputAudioFormat.PCM16, | ||
| output_audio_format=OutputAudioFormat.PCM16, | ||
| input_audio_transcription=input_audio_transcription, | ||
| turn_detection=turn_detection, | ||
| tools=self._convert_tools_to_azure_format(), | ||
| temperature=self._temperature, | ||
| max_response_output_tokens=self._max_response_tokens, | ||
| ) | ||
|
|
||
| def _convert_tools_to_azure_format(self) -> list[Any]: | ||
| """Convert AIFunction tools to Azure Voice Live format. | ||
|
|
||
| Returns: | ||
| List of FunctionTool objects in Azure format | ||
| """ | ||
| from azure.ai.voicelive.models import FunctionTool | ||
|
|
||
| azure_tools = [] | ||
|
|
||
| for tool in self._tools: | ||
| # Get the JSON schema from the tool | ||
| if hasattr(tool, 'to_json_schema_spec'): | ||
| schema = tool.to_json_schema_spec() | ||
|
|
||
| # Extract function details from schema | ||
| func_spec = schema.get('function', {}) | ||
|
|
||
| # Create Azure FunctionTool using dict-style assignment | ||
| azure_tool = FunctionTool() | ||
| azure_tool['type'] = 'function' | ||
| azure_tool['name'] = func_spec.get('name', tool.name) | ||
| azure_tool['description'] = func_spec.get('description', '') | ||
| azure_tool['parameters'] = func_spec.get('parameters', {}) | ||
|
|
||
| azure_tools.append(azure_tool) | ||
| print(f"[DEBUG] Tool converted: {tool.name} -> {dict(azure_tool)}") | ||
| else: | ||
| # Fallback for non-AIFunction tools | ||
| azure_tool = FunctionTool() | ||
| azure_tool['type'] = 'function' | ||
| azure_tool['name'] = getattr(tool, 'name', 'unknown') | ||
| azure_tool['description'] = getattr(tool, 'description', '') | ||
| azure_tool['parameters'] = tool.parameters() if callable(getattr(tool, 'parameters', None)) else {} | ||
|
|
||
| azure_tools.append(azure_tool) | ||
| print(f"[DEBUG] Tool converted (fallback): {dict(azure_tool)}") | ||
|
|
||
| print(f"[DEBUG] Total tools converted: {len(azure_tools)}") | ||
| return azure_tools |
There was a problem hiding this comment.
Debug print statements should be removed or replaced with proper logging before production use. These print statements are scattered throughout the codebase and can clutter output.
| self, session: VoiceLiveSession, input: str | bytes | ||
| ) -> AsyncIterable[AgentRunResponseUpdate]: | ||
| """Internal method to stream with a given session.""" | ||
| # Send input | ||
| if isinstance(input, str): | ||
| await session.send_text(input) | ||
| await session.create_response() | ||
| else: | ||
| await session.send_audio(input, commit=self._enable_vad) |
There was a problem hiding this comment.
The function accepts 'input' as a parameter name which shadows the built-in Python function. Consider renaming to 'user_input', 'query', or 'message' to avoid confusion.
| self, session: VoiceLiveSession, input: str | bytes | |
| ) -> AsyncIterable[AgentRunResponseUpdate]: | |
| """Internal method to stream with a given session.""" | |
| # Send input | |
| if isinstance(input, str): | |
| await session.send_text(input) | |
| await session.create_response() | |
| else: | |
| await session.send_audio(input, commit=self._enable_vad) | |
| self, session: VoiceLiveSession, user_input: str | bytes | |
| ) -> AsyncIterable[AgentRunResponseUpdate]: | |
| """Internal method to stream with a given session.""" | |
| # Send input | |
| if isinstance(user_input, str): | |
| await session.send_text(user_input) | |
| await session.create_response() | |
| else: | |
| await session.send_audio(user_input, commit=self._enable_vad) |
| except Exception as e: | ||
| error_msg = f"Error executing {name}: {e}" | ||
| print(f"[DEBUG] Exception in function execution: {e}") | ||
| import traceback | ||
| traceback.print_exc() | ||
| try: | ||
| await self._session.send_function_result(call_id, error_msg) | ||
| await self._session.create_response() | ||
| except Exception as e2: | ||
| print(f"[DEBUG] Failed to send error result: {e2}") |
There was a problem hiding this comment.
The error handling uses a bare 'except Exception' clause that catches and prints all exceptions with traceback. This should use proper logging and potentially re-raise critical exceptions rather than continuing execution.
| except Exception as e: | ||
| error_msg = f"Expert agent error: {str(e)}" | ||
| print(f" ❌ [Expert Agent] {error_msg}") | ||
| import traceback | ||
| traceback.print_exc() | ||
| return error_msg |
There was a problem hiding this comment.
The error handling uses a bare 'except Exception' clause that catches and prints all exceptions with traceback. This should use proper logging and potentially re-raise critical exceptions rather than continuing execution.
| class AudioPlayer: | ||
| """Handles real-time audio playback with interruption support.""" | ||
|
|
||
| def __init__(self): | ||
| self.audio = pyaudio.PyAudio() | ||
| self.stream = None | ||
| self.queue = queue.Queue() | ||
| self.playing = False | ||
| self.chunks_played = 0 | ||
|
|
There was a problem hiding this comment.
The class lacks proper documentation. According to guidelines, all public classes should have docstrings explaining their purpose and usage.
| print(f"Error receiving from browser: {e}") | ||
|
|
||
| async def _send_to_browser(self, websocket: Any, session: VoiceLiveSession) -> None: | ||
| """Receive from Azure and forward to browser. | ||
|
|
||
| Args: | ||
| websocket: FastAPI WebSocket instance | ||
| session: VoiceLiveSession instance | ||
| """ | ||
| try: | ||
| from azure.ai.voicelive.models import ServerEventType | ||
|
|
||
| async for event in session._connection: | ||
| event_type = event.type | ||
|
|
||
| if event_type == ServerEventType.RESPONSE_AUDIO_DELTA: | ||
| # Send audio chunk to browser | ||
| if hasattr(event, "delta") and event.delta: | ||
| await websocket.send_bytes(event.delta) | ||
|
|
||
| elif event_type == ServerEventType.RESPONSE_AUDIO_TRANSCRIPT_DELTA: | ||
| # Send transcript delta | ||
| if hasattr(event, "delta") and event.delta: | ||
| await websocket.send_json({"type": "transcript", "text": event.delta}) | ||
|
|
||
| elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: | ||
| # User started speaking | ||
| await websocket.send_json({"type": "speech_started"}) | ||
|
|
||
| elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: | ||
| # User stopped speaking | ||
| await websocket.send_json({"type": "speech_stopped"}) | ||
|
|
||
| elif event_type == ServerEventType.RESPONSE_CREATED: | ||
| # Response started | ||
| response_id = event.response.id if hasattr(event.response, "id") else None | ||
| await websocket.send_json({"type": "response_started", "response_id": response_id}) | ||
|
|
||
| elif event_type == ServerEventType.RESPONSE_DONE: | ||
| # Response complete | ||
| await websocket.send_json({"type": "response_complete"}) | ||
|
|
||
| elif event_type == ServerEventType.ERROR: | ||
| # Error event | ||
| error_msg = str(event.error) if hasattr(event, "error") else "Unknown error" | ||
| await websocket.send_json({"type": "error", "message": error_msg}) | ||
|
|
||
| except Exception as e: | ||
| print(f"Error sending to browser: {e}") |
There was a problem hiding this comment.
Debug print statements should be removed or replaced with proper logging before production use. These print statements are scattered throughout the codebase and can clutter output.
| except Exception as e: | ||
| print(f"\n❌ Playback loop error: {e}") | ||
| import traceback | ||
| traceback.print_exc() |
There was a problem hiding this comment.
The error handling uses a bare 'except Exception' clause that catches and prints all exceptions with traceback. This should use proper logging and potentially re-raise critical exceptions rather than continuing execution.
|
|
||
| dependencies = [ | ||
| "agent-framework>=0.1.0", | ||
| "azure-ai-voicelive", # Azure Voice Live SDK (preview) |
There was a problem hiding this comment.
The comment uses an incorrect comment format. Python sample code should use standard comment syntax without special markdown formatting.
| "azure-ai-voicelive", # Azure Voice Live SDK (preview) | |
| "azure-ai-voicelive", |
|
|
||
| import base64 | ||
| import wave | ||
| from typing import BinaryIO |
There was a problem hiding this comment.
Import of 'BinaryIO' is not used.
| from typing import BinaryIO |
|
@szhaomsft while we appreciate the initiative to add this, we are working on a more broadly compatible design for voice live and other realtime services, see the WIP here: https://github.com/eavanvalkenburg/agent-framework/blob/voice_agents/docs/decisions/00XX-realtime-agents.md so we first want to decide how we want to abstract this as a whole before we ship something that we then have to change significantly. |
Motivation and Context
add voice live agent