diff --git a/src/data/languages/languageData.ts b/src/data/languages/languageData.ts index 10710cf4a0..dda7e90139 100644 --- a/src/data/languages/languageData.ts +++ b/src/data/languages/languageData.ts @@ -44,6 +44,7 @@ export default { }, aiTransport: { javascript: '2.19', + react: '2.19', java: '1.6', python: '3.1', swift: '1.2', diff --git a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx index b8e6ea6026..de522c13f1 100644 --- a/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx +++ b/src/pages/docs/ai-transport/token-streaming/message-per-response.mdx @@ -365,6 +365,20 @@ For more details on rate limits and rollup behavior, see [Token streaming limits ## Subscribing to token streams + + + + Subscribers receive different message actions depending on when they join and how they're retrieving messages. Each message has an `action` field that indicates how to process it, and a `serial` field that identifies which message the action relates to: - `message.create`: Indicates a new response has started (i.e. a new message was created). The message `data` contains the initial content (often empty or the first token). Store this as the beginning of a new response using `serial` as the identifier. @@ -449,6 +463,31 @@ channel.subscribe(message -> { } }); ``` +```react +const [responses, setResponses] = useState(new Map()); + +// Subscribe to live messages +useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => { + setResponses((prev) => { + const next = new Map(prev); + switch (message.action) { + case 'message.create': + // New response started + next.set(message.serial, message.data); + break; + case 'message.append': + // Append token to existing response + next.set(message.serial, (next.get(message.serial) || '') + message.data); + break; + case 'message.update': + // Replace entire response content + next.set(message.serial, message.data); + break; + } + return next; + }); +}); +``` ## Client hydration @@ -549,6 +588,31 @@ channel.subscribe(message -> { } }); ``` +```react +// Ensure the outer ChannelProvider has options={{ params: { rewind: '2m' } }} + +const [responses, setResponses] = useState(new Map()); + +// Receive both recent historical (via rewind) and live messages +useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => { + setResponses((prev) => { + const next = new Map(prev); + switch (message.action) { + case 'message.create': + next.set(message.serial, message.data); + break; + case 'message.append': + const current = next.get(message.serial) || ''; + next.set(message.serial, current + message.data); + break; + case 'message.update': + next.set(message.serial, message.data); + break; + } + return next; + }); +}); +``` Rewind supports two formats: @@ -678,6 +742,46 @@ while (page != null) { page = page.hasNext() ? page.next() : null; } ``` +```react +const [responses, setResponses] = useState(new Map()); +const hydrated = useRef(false); + +// Subscribe to live messages and get the history function +const { history } = useChannel('ai:{{RANDOM_CHANNEL_NAME}}', (message) => { + setResponses((prev) => { + const next = new Map(prev); + switch (message.action) { + case 'message.create': + next.set(message.serial, message.data); + break; + case 'message.append': + next.set(message.serial, (next.get(message.serial) || '') + message.data); + break; + case 'message.update': + next.set(message.serial, message.data); + break; + } + return next; + }); +}); + +// Fetch history on mount +useEffect(() => { + if (hydrated.current) return; + hydrated.current = true; + + (async () => { + let page = await history({ untilAttach: true }); + while (page) { + for (const message of page.items) { + // message.data contains the full concatenated text + setResponses((prev) => new Map(prev).set(message.serial, message.data)); + } + page = page.hasNext() ? await page.next() : null; + } + })(); +}, [history]); +``` ### Hydrating an in-progress response @@ -911,6 +1015,40 @@ channel.subscribe(message -> { } }); ``` +```react +// Ensure the outer ChannelProvider has options={{ params: { rewind: '2m' } }} + +// Load completed responses from your database (Set of responseIds) +const completedResponses = useCompletedResponses(); + +const [inProgressResponses, setInProgressResponses] = useState(new Map()); + +// Receive both recent historical and live messages +useChannel('ai:responses', (message) => { + const responseId = message.extras?.headers?.responseId; + + if (!responseId) return; + + // Skip messages for responses already loaded from database + if (completedResponses.has(responseId)) return; + + setInProgressResponses((prev) => { + const next = new Map(prev); + switch (message.action) { + case 'message.create': + next.set(responseId, message.data); + break; + case 'message.append': + next.set(responseId, (next.get(responseId) || '') + message.data); + break; + case 'message.update': + next.set(responseId, message.data); + break; + } + return next; + }); +}); +```