Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export class Message {
public connectedStatus = false;
public connectedStatus$ = new Subject<boolean>();
private ws: WebSocketSubject<WebSocketMessage<MessageDataTypeMap>> | null = null;
private wsSubscription: Subscription | null = null;
private open$ = new Subject<Event>();
private close$ = new Subject<CloseEvent>();
private sent$ = new Subject<WebSocketMessage<MessageSendDataTypeMap>>();
Expand Down Expand Up @@ -99,13 +100,26 @@ export class Message {
if (!this.wsUrl) {
throw new Error('WebSocket URL is not set. Please call setWsUrl() before connect()');
}

// Unsubscribe from existing subscription first
if (this.wsSubscription) {
this.wsSubscription.unsubscribe();
this.wsSubscription = null;
}

// Then close existing WebSocket
if (this.ws) {
this.ws.complete();
this.ws = null;
}

this.ws = webSocket<WebSocketMessage<MessageDataTypeMap>>({
url: this.wsUrl,
openObserver: this.open$,
closeObserver: this.close$
});

this.ws
this.wsSubscription = this.ws
.pipe(
// reconnect
retryWhen(errors => errors.pipe(mergeMap(() => this.close$.pipe(take(1), delay(4000)))))
Expand Down Expand Up @@ -190,6 +204,10 @@ export class Message {
}

destroy(): void {
if (this.wsSubscription) {
this.wsSubscription.unsubscribe();
this.wsSubscription = null;
}
if (this.ws) {
this.ws.complete();
this.ws = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,18 +435,31 @@ export class NotebookComponent extends MessageListenersManager implements OnInit
this.noteVarShareService.clear();
});
this.activatedRoute.params.pipe(takeUntil(this.destroy$)).subscribe(param => {
const { noteId, revisionId } = param;
if (revisionId) {
this.messageService.noteRevision(noteId, revisionId);
} else {
this.messageService.getNote(noteId);
}
this.revisionView = !!revisionId;
this.revisionView = !!param.revisionId;
this.cdr.markForCheck();
this.messageService.listRevisionHistory(noteId);
// TODO(hsuanxyz) scroll to current paragraph
});
this.revisionView = !!this.activatedRoute.snapshot.params.revisionId;

// Fetch note when WebSocket connects or reconnects
this.messageService.connectedStatus$
.pipe(startWith(this.messageService.connectedStatus), takeUntil(this.destroy$))
.subscribe(connected => {
console.log('connectedStatus$ changed to ', connected ? 'connected' : 'disconnected');
if (connected) {
const { noteId, revisionId } = this.activatedRoute.snapshot.params;
if (!noteId) {
throw new Error('Query parameter `noteId` is required.');
}
if (revisionId) {
this.messageService.noteRevision(noteId, revisionId);
} else {
this.messageService.getNote(noteId);
}
this.cdr.markForCheck();
this.messageService.listRevisionHistory(noteId);
// TODO(hsuanxyz) scroll to current paragraph
}
});
}

removeParagraphFromNgZ(): void {
Expand Down
Loading