From 129d79489f92fcd24978f0777c3de06b247bf06f Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Mon, 2 Feb 2026 15:47:53 +0100 Subject: [PATCH] fix(core): Intercept .withResponse() to preserve OpenAI stream instrumentation --- .../tracing/openai/scenario-with-response.mjs | 153 +++++++++++++++ .../suites/tracing/openai/test.ts | 38 ++++ packages/core/src/tracing/openai/index.ts | 185 +++++++++++++----- 3 files changed, 325 insertions(+), 51 deletions(-) create mode 100644 dev-packages/node-integration-tests/suites/tracing/openai/scenario-with-response.mjs diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/scenario-with-response.mjs b/dev-packages/node-integration-tests/suites/tracing/openai/scenario-with-response.mjs new file mode 100644 index 000000000000..2923f6ca6c4f --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/openai/scenario-with-response.mjs @@ -0,0 +1,153 @@ +import * as Sentry from '@sentry/node'; +import express from 'express'; +import OpenAI from 'openai'; + +function startMockServer() { + const app = express(); + app.use(express.json()); + + app.post('/openai/chat/completions', (req, res) => { + const { model } = req.body; + + res.set({ + 'x-request-id': 'req_withresponse_test', + 'openai-organization': 'test-org', + 'openai-processing-ms': '150', + 'openai-version': '2020-10-01', + }); + + res.send({ + id: 'chatcmpl-withresponse', + object: 'chat.completion', + created: 1677652288, + model: model, + choices: [ + { + index: 0, + message: { + role: 'assistant', + content: 'Testing .withResponse() method!', + }, + finish_reason: 'stop', + }, + ], + usage: { + prompt_tokens: 8, + completion_tokens: 12, + total_tokens: 20, + }, + }); + }); + + return new Promise(resolve => { + const server = app.listen(0, () => { + resolve(server); + }); + }); +} + +async function run() { + const server = await startMockServer(); + + await Sentry.startSpan({ op: 'function', name: 'main' }, async () => { + const client = new OpenAI({ + baseURL: `http://localhost:${server.address().port}/openai`, + apiKey: 'mock-api-key', + }); + + // Verify .withResponse() method exists and can be called + const result = client.chat.completions.create({ + model: 'gpt-4', + messages: [{ role: 'user', content: 'Test withResponse' }], + }); + + // Verify method exists + if (typeof result.withResponse !== 'function') { + throw new Error('.withResponse() method does not exist'); + } + + // Call .withResponse() and verify structure + const withResponseResult = await result.withResponse(); + + // Verify all three properties exist + if (!withResponseResult.data) { + throw new Error('.withResponse() did not return data'); + } + if (!withResponseResult.response) { + throw new Error('.withResponse() did not return response'); + } + if (withResponseResult.request_id === undefined) { + throw new Error('.withResponse() did not return request_id'); + } + + // Verify data structure matches expected OpenAI response + const { data } = withResponseResult; + if (data.id !== 'chatcmpl-withresponse') { + throw new Error(`Expected data.id to be 'chatcmpl-withresponse', got '${data.id}'`); + } + if (data.choices[0].message.content !== 'Testing .withResponse() method!') { + throw new Error(`Expected specific content, got '${data.choices[0].message.content}'`); + } + if (data.usage.total_tokens !== 20) { + throw new Error(`Expected 20 total tokens, got ${data.usage.total_tokens}`); + } + + // Verify response is a Response object with correct headers + if (!(withResponseResult.response instanceof Response)) { + throw new Error('response is not a Response object'); + } + if (withResponseResult.response.headers.get('x-request-id') !== 'req_withresponse_test') { + throw new Error( + `Expected x-request-id header 'req_withresponse_test', got '${withResponseResult.response.headers.get('x-request-id')}'`, + ); + } + + // Verify request_id matches the header + if (withResponseResult.request_id !== 'req_withresponse_test') { + throw new Error(`Expected request_id 'req_withresponse_test', got '${withResponseResult.request_id}'`); + } + + // Test 2: Verify .asResponse() method works + const result2 = client.chat.completions.create({ + model: 'gpt-4', + messages: [{ role: 'user', content: 'Test asResponse' }], + }); + + // Verify method exists + if (typeof result2.asResponse !== 'function') { + throw new Error('.asResponse() method does not exist'); + } + + // Call .asResponse() and verify it returns raw Response + const rawResponse = await result2.asResponse(); + + if (!(rawResponse instanceof Response)) { + throw new Error('.asResponse() did not return a Response object'); + } + + // Verify response has correct status + if (rawResponse.status !== 200) { + throw new Error(`Expected status 200, got ${rawResponse.status}`); + } + + // Verify response headers + if (rawResponse.headers.get('x-request-id') !== 'req_withresponse_test') { + throw new Error( + `Expected x-request-id header 'req_withresponse_test', got '${rawResponse.headers.get('x-request-id')}'`, + ); + } + + // Verify we can manually parse the body + const body = await rawResponse.json(); + if (body.id !== 'chatcmpl-withresponse') { + throw new Error(`Expected body.id 'chatcmpl-withresponse', got '${body.id}'`); + } + if (body.choices[0].message.content !== 'Testing .withResponse() method!') { + throw new Error(`Expected specific content in body, got '${body.choices[0].message.content}'`); + } + }); + + server.close(); +} + +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts index df432d292bba..5753a494fa31 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts @@ -945,4 +945,42 @@ describe('OpenAI integration', () => { }); }, ); + + createEsmAndCjsTests(__dirname, 'scenario-with-response.mjs', 'instrument.mjs', (createRunner, test) => { + test('preserves .withResponse() method and works correctly', async () => { + await createRunner() + .ignore('event') + .expect({ + transaction: { + transaction: 'main', + spans: expect.arrayContaining([ + // First call using .withResponse() + expect.objectContaining({ + data: expect.objectContaining({ + [GEN_AI_OPERATION_NAME_ATTRIBUTE]: 'chat', + [GEN_AI_REQUEST_MODEL_ATTRIBUTE]: 'gpt-4', + [GEN_AI_RESPONSE_ID_ATTRIBUTE]: 'chatcmpl-withresponse', + }), + description: 'chat gpt-4', + op: 'gen_ai.chat', + status: 'ok', + }), + // Second call using .asResponse() + expect.objectContaining({ + data: expect.objectContaining({ + [GEN_AI_OPERATION_NAME_ATTRIBUTE]: 'chat', + [GEN_AI_REQUEST_MODEL_ATTRIBUTE]: 'gpt-4', + [GEN_AI_RESPONSE_ID_ATTRIBUTE]: 'chatcmpl-withresponse', + }), + description: 'chat gpt-4', + op: 'gen_ai.chat', + status: 'ok', + }), + ]), + }, + }) + .start() + .completed(); + }); + }); }); diff --git a/packages/core/src/tracing/openai/index.ts b/packages/core/src/tracing/openai/index.ts index b0d26f92c36c..9568567227df 100644 --- a/packages/core/src/tracing/openai/index.ts +++ b/packages/core/src/tracing/openai/index.ts @@ -1,9 +1,12 @@ import { getClient } from '../../currentScopes'; +import { DEBUG_BUILD } from '../../debug-build'; import { captureException } from '../../exports'; import { SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN } from '../../semanticAttributes'; import { SPAN_STATUS_ERROR } from '../../tracing'; import { startSpan, startSpanManual } from '../../tracing/trace'; import type { Span, SpanAttributeValue } from '../../types-hoist/span'; +import { debug } from '../../utils/debug-logger'; +import { isThenable } from '../../utils/is'; import { GEN_AI_EMBEDDINGS_INPUT_ATTRIBUTE, GEN_AI_INPUT_MESSAGES_ATTRIBUTE, @@ -53,7 +56,16 @@ function extractAvailableTools(params: Record): string | undefi : []; const availableTools = [...tools, ...webSearchOptions]; - return availableTools.length > 0 ? JSON.stringify(availableTools) : undefined; + if (availableTools.length === 0) { + return undefined; + } + + try { + return JSON.stringify(availableTools); + } catch (error) { + DEBUG_BUILD && debug.error('Failed to serialize OpenAI tools:', error); + return undefined; + } } /** @@ -161,6 +173,75 @@ function addRequestAttributes(span: Span, params: Record, opera } } +/** + * Creates a wrapped version of .withResponse() that replaces the data field + * with the instrumented result while preserving metadata (response, request_id). + */ +async function createWithResponseWrapper( + originalWithResponse: Promise, + instrumentedPromise: Promise, +): Promise { + // Attach catch handler to originalWithResponse immediately to prevent unhandled rejection + // If instrumentedPromise rejects first, we still need this handled + const safeOriginalWithResponse = originalWithResponse.catch(error => { + captureException(error, { + mechanism: { + handled: false, + type: 'auto.ai.openai', + }, + }); + throw error; + }); + + const instrumentedResult = await instrumentedPromise; + const originalWrapper = await safeOriginalWithResponse; + + // Combine instrumented result with original metadata + if (originalWrapper && typeof originalWrapper === 'object' && 'data' in originalWrapper) { + return { + ...originalWrapper, + data: instrumentedResult, + }; + } + return instrumentedResult; +} + +/** + * Wraps a promise-like object to preserve additional methods (like .withResponse()) + */ +function wrapPromiseWithMethods(originalPromiseLike: Promise, instrumentedPromise: Promise): Promise { + // If the original result is not thenable, return the instrumented promise + // Should not happen with current OpenAI SDK instrumented methods, but just in case. + if (!isThenable(originalPromiseLike)) { + return instrumentedPromise; + } + + // Create a proxy that forwards Promise methods to instrumentedPromise + // and preserves additional methods from the original result + return new Proxy(originalPromiseLike, { + get(target: object, prop: string | symbol): unknown { + // For standard Promise methods (.then, .catch, .finally, Symbol.toStringTag), + // use instrumentedPromise to preserve Sentry instrumentation. + // For custom methods (like .withResponse()), use the original target. + const useInstrumentedPromise = prop in Promise.prototype || prop === Symbol.toStringTag; + const source = useInstrumentedPromise ? instrumentedPromise : target; + + const value = Reflect.get(source, prop) as unknown; + + // Special handling for .withResponse() to preserve instrumentation + // .withResponse() returns { data: T, response: Response, request_id: string } + if (prop === 'withResponse' && typeof value === 'function') { + return function wrappedWithResponse(this: unknown): unknown { + const originalWithResponse = (value as (...args: unknown[]) => unknown).call(target); + return createWithResponseWrapper(originalWithResponse, instrumentedPromise); + }; + } + + return typeof value === 'function' ? value.bind(source) : value; + }, + }) as Promise; +} + /** * Instrument a method with Sentry spans * Following Sentry AI Agents Manual Instrumentation conventions @@ -172,7 +253,7 @@ function instrumentMethod( context: unknown, options: OpenAiOptions, ): (...args: T) => Promise { - return async function instrumentedMethod(...args: T): Promise { + return function instrumentedMethod(...args: T): Promise { const requestAttributes = extractRequestAttributes(args, methodPath); const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown'; const operationName = getOperationName(methodPath); @@ -180,77 +261,79 @@ function instrumentMethod( const params = args[0] as Record | undefined; const isStreamRequested = params && typeof params === 'object' && params.stream === true; + const spanConfig = { + name: `${operationName} ${model}${isStreamRequested ? ' stream-response' : ''}`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }; + if (isStreamRequested) { - // For streaming responses, use manual span management to properly handle the async generator lifecycle - return startSpanManual( - { - name: `${operationName} ${model} stream-response`, - op: getSpanOperation(methodPath), - attributes: requestAttributes as Record, - }, - async (span: Span) => { - try { - if (options.recordInputs && params) { - addRequestAttributes(span, params, operationName); - } + let originalResult!: Promise; - const result = await originalMethod.apply(context, args); + const instrumentedPromise = startSpanManual(spanConfig, (span: Span) => { + originalResult = originalMethod.apply(context, args); + if (options.recordInputs && params) { + addRequestAttributes(span, params, operationName); + } + + // Return async processing + return (async () => { + try { + const result = await originalResult; return instrumentStream( result as OpenAIStream, span, options.recordOutputs ?? false, ) as unknown as R; } catch (error) { - // For streaming requests that fail before stream creation, we still want to record - // them as streaming requests but end the span gracefully span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); captureException(error, { mechanism: { handled: false, type: 'auto.ai.openai.stream', - data: { - function: methodPath, - }, + data: { function: methodPath }, }, }); span.end(); throw error; } - }, - ); - } else { - // Non-streaming responses - return startSpan( - { - name: `${operationName} ${model}`, - op: getSpanOperation(methodPath), - attributes: requestAttributes as Record, - }, - async (span: Span) => { - try { - if (options.recordInputs && params) { - addRequestAttributes(span, params, operationName); - } + })(); + }); - const result = await originalMethod.apply(context, args); - addResponseAttributes(span, result, options.recordOutputs); - return result; - } catch (error) { - captureException(error, { - mechanism: { - handled: false, - type: 'auto.ai.openai', - data: { - function: methodPath, - }, - }, - }); - throw error; - } + return wrapPromiseWithMethods(originalResult, instrumentedPromise); + } + + // Non-streaming + let originalResult!: Promise; + + const instrumentedPromise = startSpan(spanConfig, (span: Span) => { + // Call synchronously to capture the promise + originalResult = originalMethod.apply(context, args); + + if (options.recordInputs && params) { + addRequestAttributes(span, params, operationName); + } + + return originalResult.then( + result => { + addResponseAttributes(span, result, options.recordOutputs); + return result; + }, + error => { + captureException(error, { + mechanism: { + handled: false, + type: 'auto.ai.openai', + data: { function: methodPath }, + }, + }); + throw error; }, ); - } + }); + + return wrapPromiseWithMethods(originalResult, instrumentedPromise); }; }