@@ -103,36 +103,51 @@ export function useDeltaStreams<
103103 | undefined ;
104104
105105 const newDeltas = cursorQuery ?. streams . deltas ;
106- if ( newDeltas ?. length && streamMessages ) {
106+ if ( streamMessages ) {
107107 const newDeltasByStreamId = new Map < string , StreamDelta [ ] > ( ) ;
108- for ( const delta of newDeltas ) {
109- const oldCursor = cursors [ delta . streamId ] ;
110- if ( oldCursor && delta . start < oldCursor ) continue ;
111- const existing = newDeltasByStreamId . get ( delta . streamId ) ;
112- if ( existing ) {
113- const previousEnd = existing . at ( - 1 ) ! . end ;
114- assert (
115- previousEnd === delta . start ,
116- `Gap found in deltas for ${ delta . streamId } jumping to ${ delta . start } from ${ previousEnd } ` ,
117- ) ;
118- existing . push ( delta ) ;
119- } else {
120- assert (
121- ! oldCursor || oldCursor === delta . start ,
122- `Gap found - first delta after ${ oldCursor } is ${ delta . start } for stream ${ delta . streamId } ` ,
123- ) ;
124- newDeltasByStreamId . set ( delta . streamId , [ delta ] ) ;
108+ if ( newDeltas ?. length ) {
109+ for ( const delta of newDeltas ) {
110+ const oldCursor = cursors [ delta . streamId ] ;
111+ if ( oldCursor && delta . start < oldCursor ) continue ;
112+ const existing = newDeltasByStreamId . get ( delta . streamId ) ;
113+ if ( existing ) {
114+ const previousEnd = existing . at ( - 1 ) ! . end ;
115+ assert (
116+ previousEnd === delta . start ,
117+ `Gap found in deltas for ${ delta . streamId } jumping to ${ delta . start } from ${ previousEnd } ` ,
118+ ) ;
119+ existing . push ( delta ) ;
120+ } else {
121+ assert (
122+ ! oldCursor || oldCursor === delta . start ,
123+ `Gap found - first delta after ${ oldCursor } is ${ delta . start } for stream ${ delta . streamId } ` ,
124+ ) ;
125+ newDeltasByStreamId . set ( delta . streamId , [ delta ] ) ;
126+ }
125127 }
126128 }
127129 const newCursors : Record < string , number > = { } ;
130+ let cursorsChanged = false ;
128131 for ( const { streamId } of streamMessages ) {
129132 const cursor =
130133 newDeltasByStreamId . get ( streamId ) ?. at ( - 1 ) ?. end ?? cursors [ streamId ] ;
131134 if ( cursor !== undefined ) {
132135 newCursors [ streamId ] = cursor ;
136+ if ( cursors [ streamId ] !== cursor ) {
137+ cursorsChanged = true ;
138+ }
133139 }
134140 }
135- setCursors ( newCursors ) ;
141+ // Also check if any streams were removed from cursors
142+ for ( const streamId in cursors ) {
143+ if ( ! ( streamId in newCursors ) ) {
144+ cursorsChanged = true ;
145+ break ;
146+ }
147+ }
148+ if ( cursorsChanged ) {
149+ setCursors ( newCursors ) ;
150+ }
136151
137152 // we defensively create a new object so object identity matches contents
138153 state . deltaStreams = streamMessages . map ( ( streamMessage ) => {
0 commit comments