Skip to content

Commit 2cb8acb

Browse files
committed
strip out old streams when they are no longer present
1 parent b7a7340 commit 2cb8acb

File tree

1 file changed

+34
-19
lines changed

1 file changed

+34
-19
lines changed

src/react/useDeltaStreams.ts

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -103,36 +103,51 @@ export function useDeltaStreams<
103103
| undefined;
104104

105105
const newDeltas = cursorQuery?.streams.deltas;
106-
if (newDeltas?.length && streamMessages) {
106+
if (streamMessages) {
107107
const newDeltasByStreamId = new Map<string, StreamDelta[]>();
108-
for (const delta of newDeltas) {
109-
const oldCursor = cursors[delta.streamId];
110-
if (oldCursor && delta.start < oldCursor) continue;
111-
const existing = newDeltasByStreamId.get(delta.streamId);
112-
if (existing) {
113-
const previousEnd = existing.at(-1)!.end;
114-
assert(
115-
previousEnd === delta.start,
116-
`Gap found in deltas for ${delta.streamId} jumping to ${delta.start} from ${previousEnd}`,
117-
);
118-
existing.push(delta);
119-
} else {
120-
assert(
121-
!oldCursor || oldCursor === delta.start,
122-
`Gap found - first delta after ${oldCursor} is ${delta.start} for stream ${delta.streamId}`,
123-
);
124-
newDeltasByStreamId.set(delta.streamId, [delta]);
108+
if (newDeltas?.length) {
109+
for (const delta of newDeltas) {
110+
const oldCursor = cursors[delta.streamId];
111+
if (oldCursor && delta.start < oldCursor) continue;
112+
const existing = newDeltasByStreamId.get(delta.streamId);
113+
if (existing) {
114+
const previousEnd = existing.at(-1)!.end;
115+
assert(
116+
previousEnd === delta.start,
117+
`Gap found in deltas for ${delta.streamId} jumping to ${delta.start} from ${previousEnd}`,
118+
);
119+
existing.push(delta);
120+
} else {
121+
assert(
122+
!oldCursor || oldCursor === delta.start,
123+
`Gap found - first delta after ${oldCursor} is ${delta.start} for stream ${delta.streamId}`,
124+
);
125+
newDeltasByStreamId.set(delta.streamId, [delta]);
126+
}
125127
}
126128
}
127129
const newCursors: Record<string, number> = {};
130+
let cursorsChanged = false;
128131
for (const { streamId } of streamMessages) {
129132
const cursor =
130133
newDeltasByStreamId.get(streamId)?.at(-1)?.end ?? cursors[streamId];
131134
if (cursor !== undefined) {
132135
newCursors[streamId] = cursor;
136+
if (cursors[streamId] !== cursor) {
137+
cursorsChanged = true;
138+
}
133139
}
134140
}
135-
setCursors(newCursors);
141+
// Also check if any streams were removed from cursors
142+
for (const streamId in cursors) {
143+
if (!(streamId in newCursors)) {
144+
cursorsChanged = true;
145+
break;
146+
}
147+
}
148+
if (cursorsChanged) {
149+
setCursors(newCursors);
150+
}
136151

137152
// we defensively create a new object so object identity matches contents
138153
state.deltaStreams = streamMessages.map((streamMessage) => {

0 commit comments

Comments
 (0)