Unify streamAdapter/betaStreamAdapter retry logic into generic retryableStream#2018
Unify streamAdapter/betaStreamAdapter retry logic into generic retryableStream#2018dgageot wants to merge 1 commit intodocker:mainfrom
Conversation
…bleStream Extract the duplicated single-retry-on-context-length-error logic from both streamAdapter and betaStreamAdapter into a shared generic retryableStream[T] struct. Both adapters now embed retryableStream and use its next() method instead of duplicating the retry code inline. The retryFn closures in client.go and beta_client.go now return the raw *ssestream.Stream[T] directly instead of wrapping it in a full adapter, since only the stream was ever used from the returned value. Assisted-By: docker-agent
There was a problem hiding this comment.
Review Summary
Assessment: 🟡 NEEDS ATTENTION
This PR successfully consolidates duplicated retry logic into a generic retryableStream[T] structure, reducing code duplication. However, there is one confirmed resource leak issue in the retry path that should be addressed.
Confirmed Issue
MEDIUM: Stream leak in retry failure path (retry.go:35)
When the retry succeeds and r.stream is replaced with newStream, if the subsequent recursive r.next() call fails, the function returns the error without closing the newly assigned stream. This creates a resource leak.
Review Notes
The refactoring is well-structured:
- Generic
retryableStream[T]properly encapsulates retry logic - Adapter state (callbacks, flags) correctly persists across retry
- The
getResponseTrailercallback automatically accesses the latest HTTP response - Stream initialization guarantees make the removed nil checks safe
| if newStream := r.retryFn(); newStream != nil { | ||
| r.stream.Close() | ||
| r.stream = newStream | ||
| return r.next() |
There was a problem hiding this comment.
MEDIUM: Stream leak when retry succeeds but subsequent next() fails
When retry succeeds, the old stream is closed (line 33) and r.stream is replaced with newStream (line 34). However, if the recursive r.next() call on line 35 returns an error, the function returns that error immediately without closing the newStream that was just assigned to r.stream.
This creates a resource leak on the retry failure path.
Suggested fix:
if newStream := r.retryFn(); newStream != nil {
r.stream.Close()
r.stream = newStream
ok, err := r.next()
if !ok && err != nil {
r.stream.Close() // Clean up on retry failure
}
return ok, err
}
Extract the duplicated single-retry-on-context-length-error logic from both
streamAdapterandbetaStreamAdapterinto a shared genericretryableStream[T]struct. Both adapters now embed
retryableStreamand use itsnext()methodinstead of duplicating the retry code inline.
The
retryFnclosures inclient.goandbeta_client.gonow return the raw*ssestream.Stream[T]directly instead of wrapping it in a full adapter,since only the stream was ever used from the returned value.
Changes
retry.go— GenericretryableStream[T]with the shared retry-on-context-length-error logic.adapter.go—streamAdapterembedsretryableStream; removed duplicated retry code.beta_adapter.go—betaStreamAdapterembedsretryableStream; removed duplicated retry code.client.go/beta_client.go—retryFnclosures return raw*ssestream.Stream[T]instead of full adapters.