diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts index f63f693fe88..29a05ddf8ca 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts @@ -43,6 +43,7 @@ export class Message { public connectedStatus = false; public connectedStatus$ = new Subject(); private ws: WebSocketSubject> | null = null; + private wsSubscription: Subscription | null = null; private open$ = new Subject(); private close$ = new Subject(); private sent$ = new Subject>(); @@ -99,13 +100,26 @@ export class Message { if (!this.wsUrl) { throw new Error('WebSocket URL is not set. Please call setWsUrl() before connect()'); } + + // Unsubscribe from existing subscription first + if (this.wsSubscription) { + this.wsSubscription.unsubscribe(); + this.wsSubscription = null; + } + + // Then close existing WebSocket + if (this.ws) { + this.ws.complete(); + this.ws = null; + } + this.ws = webSocket>({ url: this.wsUrl, openObserver: this.open$, closeObserver: this.close$ }); - this.ws + this.wsSubscription = this.ws .pipe( // reconnect retryWhen(errors => errors.pipe(mergeMap(() => this.close$.pipe(take(1), delay(4000))))) @@ -190,6 +204,10 @@ export class Message { } destroy(): void { + if (this.wsSubscription) { + this.wsSubscription.unsubscribe(); + this.wsSubscription = null; + } if (this.ws) { this.ws.complete(); this.ws = null; diff --git a/zeppelin-web-angular/src/app/pages/workspace/notebook/notebook.component.ts b/zeppelin-web-angular/src/app/pages/workspace/notebook/notebook.component.ts index b22cd012d36..bf51ec2ad02 100644 --- a/zeppelin-web-angular/src/app/pages/workspace/notebook/notebook.component.ts +++ b/zeppelin-web-angular/src/app/pages/workspace/notebook/notebook.component.ts @@ -435,18 +435,31 @@ export class NotebookComponent extends MessageListenersManager implements OnInit this.noteVarShareService.clear(); }); this.activatedRoute.params.pipe(takeUntil(this.destroy$)).subscribe(param => { - const { noteId, revisionId } = param; - if (revisionId) { - this.messageService.noteRevision(noteId, revisionId); - } else { - this.messageService.getNote(noteId); - } - this.revisionView = !!revisionId; + this.revisionView = !!param.revisionId; this.cdr.markForCheck(); - this.messageService.listRevisionHistory(noteId); - // TODO(hsuanxyz) scroll to current paragraph }); this.revisionView = !!this.activatedRoute.snapshot.params.revisionId; + + // Fetch note when WebSocket connects or reconnects + this.messageService.connectedStatus$ + .pipe(startWith(this.messageService.connectedStatus), takeUntil(this.destroy$)) + .subscribe(connected => { + console.log('connectedStatus$ changed to ', connected ? 'connected' : 'disconnected'); + if (connected) { + const { noteId, revisionId } = this.activatedRoute.snapshot.params; + if (!noteId) { + throw new Error('Query parameter `noteId` is required.'); + } + if (revisionId) { + this.messageService.noteRevision(noteId, revisionId); + } else { + this.messageService.getNote(noteId); + } + this.cdr.markForCheck(); + this.messageService.listRevisionHistory(noteId); + // TODO(hsuanxyz) scroll to current paragraph + } + }); } removeParagraphFromNgZ(): void {