| title | category | tags | difficulty | description | demonstrates | |||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
LLM Output Replacement |
pipeline-llm |
|
intermediate |
Replaces Deepseek thinking tags with custom messages for TTS |
|
This example shows how to replace Deepseek thinking tags (<think> and </think>) with custom messages before sending to TTS. This prevents the TTS engine from reading out the model's internal thinking process.
- Add a
.envin this directory with your LiveKit and API credentials:LIVEKIT_URL=your_livekit_url LIVEKIT_API_KEY=your_api_key LIVEKIT_API_SECRET=your_api_secret GROQ_API_KEY=your_groq_api_key DEEPGRAM_API_KEY=your_deepgram_api_key OPENAI_API_KEY=your_openai_api_key - Install dependencies:
pip install "livekit-agents[silero,openai,deepgram]" python-dotenv
Set up dotenv, logging, and initialize the AgentServer.
import logging
from dotenv import load_dotenv
from livekit.agents import JobContext, JobProcess, AgentServer, cli, Agent, AgentSession
from livekit.plugins import openai, deepgram, silero
load_dotenv()
logger = logging.getLogger("replacing-llm-output")
logger.setLevel(logging.INFO)
server = AgentServer()Create an agent that uses a custom llm_node to intercept and process the LLM output stream. The agent stores its own LLM instance and overrides the llm_node method to filter out thinking tags.
class SimpleAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="You are a helpful agent."
)
self._llm = openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b")
async def on_enter(self):
self.session.generate_reply()Override the llm_node method to intercept the LLM stream. Process each chunk, replacing <think> with nothing and </think> with a transition phrase.
async def llm_node(self, chat_ctx, tools, model_settings=None):
async def process_stream():
async with self._llm.chat(chat_ctx=chat_ctx, tools=tools, tool_choice=None) as stream:
async for chunk in stream:
if chunk is None:
continue
content = getattr(chunk.delta, 'content', None) if hasattr(chunk, 'delta') else str(chunk)
if content is None:
yield chunk
continue
processed_content = content.replace("<think>", "").replace("</think>", "Okay, I'm ready to respond.")
print(f"Original: {content}, Processed: {processed_content}")
if processed_content != content:
if hasattr(chunk, 'delta') and hasattr(chunk.delta, 'content'):
chunk.delta.content = processed_content
else:
chunk = processed_content
yield chunk
return process_stream()Preload the VAD model once per process to reduce connection latency.
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarmCreate the session with Deepgram STT, OpenAI TTS, and prewarmed VAD. The LLM is handled internally by the agent's custom llm_node.
@server.rtc_session()
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession(
stt=deepgram.STT(),
tts=openai.TTS(),
vad=ctx.proc.userdata["vad"],
preemptive_generation=True,
)
await session.start(agent=SimpleAgent(), room=ctx.room)
await ctx.connect()Start the agent server with the CLI runner.
if __name__ == "__main__":
cli.run_app(server)python replacing_llm_output.py console- The agent uses Groq's API with the Deepseek model which produces
<think>tags during reasoning. - The custom
llm_nodeintercepts the streaming LLM output before it reaches TTS. - Thinking tags are stripped or replaced with a transition phrase ("Okay, I'm ready to respond.").
- The processed stream is passed to TTS, which only speaks the actual response.
- This pattern can be adapted to filter any model-specific output formatting.
import logging
from dotenv import load_dotenv
from livekit.agents import JobContext, JobProcess, AgentServer, cli, Agent, AgentSession
from livekit.plugins import openai, deepgram, silero
load_dotenv()
logger = logging.getLogger("replacing-llm-output")
logger.setLevel(logging.INFO)
class SimpleAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="You are a helpful agent."
)
self._llm = openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b")
async def on_enter(self):
self.session.generate_reply()
async def llm_node(self, chat_ctx, tools, model_settings=None):
async def process_stream():
async with self._llm.chat(chat_ctx=chat_ctx, tools=tools, tool_choice=None) as stream:
async for chunk in stream:
if chunk is None:
continue
content = getattr(chunk.delta, 'content', None) if hasattr(chunk, 'delta') else str(chunk)
if content is None:
yield chunk
continue
processed_content = content.replace("<think>", "").replace("</think>", "Okay, I'm ready to respond.")
print(f"Original: {content}, Processed: {processed_content}")
if processed_content != content:
if hasattr(chunk, 'delta') and hasattr(chunk.delta, 'content'):
chunk.delta.content = processed_content
else:
chunk = processed_content
yield chunk
return process_stream()
server = AgentServer()
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
@server.rtc_session()
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession(
stt=deepgram.STT(),
tts=openai.TTS(),
vad=ctx.proc.userdata["vad"],
preemptive_generation=True,
)
await session.start(agent=SimpleAgent(), room=ctx.room)
await ctx.connect()
if __name__ == "__main__":
cli.run_app(server)