Skip to content

Unify streamAdapter/betaStreamAdapter retry logic into generic retryableStream#2018

Open
dgageot wants to merge 1 commit intodocker:mainfrom
dgageot:board/unify-retry-logic-d5b3914d
Open

Unify streamAdapter/betaStreamAdapter retry logic into generic retryableStream#2018
dgageot wants to merge 1 commit intodocker:mainfrom
dgageot:board/unify-retry-logic-d5b3914d

Conversation

@dgageot
Copy link
Member

@dgageot dgageot commented Mar 9, 2026

Extract the duplicated single-retry-on-context-length-error logic from both
streamAdapter and betaStreamAdapter into a shared generic retryableStream[T]
struct. Both adapters now embed retryableStream and use its next() method
instead of duplicating the retry code inline.

The retryFn closures in client.go and beta_client.go now return the raw
*ssestream.Stream[T] directly instead of wrapping it in a full adapter,
since only the stream was ever used from the returned value.

Changes

  • New file: retry.go — Generic retryableStream[T] with the shared retry-on-context-length-error logic.
  • adapter.gostreamAdapter embeds retryableStream; removed duplicated retry code.
  • beta_adapter.gobetaStreamAdapter embeds retryableStream; removed duplicated retry code.
  • client.go / beta_client.goretryFn closures return raw *ssestream.Stream[T] instead of full adapters.

…bleStream

Extract the duplicated single-retry-on-context-length-error logic from both
streamAdapter and betaStreamAdapter into a shared generic retryableStream[T]
struct. Both adapters now embed retryableStream and use its next() method
instead of duplicating the retry code inline.

The retryFn closures in client.go and beta_client.go now return the raw
*ssestream.Stream[T] directly instead of wrapping it in a full adapter,
since only the stream was ever used from the returned value.

Assisted-By: docker-agent
@dgageot dgageot requested a review from a team as a code owner March 9, 2026 19:46
Copy link

@docker-agent docker-agent bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

Assessment: 🟡 NEEDS ATTENTION

This PR successfully consolidates duplicated retry logic into a generic retryableStream[T] structure, reducing code duplication. However, there is one confirmed resource leak issue in the retry path that should be addressed.

Confirmed Issue

MEDIUM: Stream leak in retry failure path (retry.go:35)

When the retry succeeds and r.stream is replaced with newStream, if the subsequent recursive r.next() call fails, the function returns the error without closing the newly assigned stream. This creates a resource leak.

Review Notes

The refactoring is well-structured:

  • Generic retryableStream[T] properly encapsulates retry logic
  • Adapter state (callbacks, flags) correctly persists across retry
  • The getResponseTrailer callback automatically accesses the latest HTTP response
  • Stream initialization guarantees make the removed nil checks safe

if newStream := r.retryFn(); newStream != nil {
r.stream.Close()
r.stream = newStream
return r.next()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEDIUM: Stream leak when retry succeeds but subsequent next() fails

When retry succeeds, the old stream is closed (line 33) and r.stream is replaced with newStream (line 34). However, if the recursive r.next() call on line 35 returns an error, the function returns that error immediately without closing the newStream that was just assigned to r.stream.

This creates a resource leak on the retry failure path.

Suggested fix:

if newStream := r.retryFn(); newStream != nil {
	r.stream.Close()
	r.stream = newStream
	ok, err := r.next()
	if !ok && err != nil {
		r.stream.Close() // Clean up on retry failure
	}
	return ok, err
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant