diff --git a/packages/node_modules/pouchdb-checkpointer/src/index.js b/packages/node_modules/pouchdb-checkpointer/src/index.js index 54e3e014ba..68e7fa9eb3 100644 --- a/packages/node_modules/pouchdb-checkpointer/src/index.js +++ b/packages/node_modules/pouchdb-checkpointer/src/index.js @@ -1,8 +1,8 @@ import { explainError } from 'pouchdb-utils'; import { collate } from 'pouchdb-collate'; -var CHECKPOINT_VERSION = 1; -var REPLICATOR = "pouchdb"; +const CHECKPOINT_VERSION = 1; +const REPLICATOR = "pouchdb"; // This is an arbitrary number to limit the // amount of replication history we save in the checkpoint. // If we save too much, the checkpoint docs will become very big, @@ -10,67 +10,8 @@ var REPLICATOR = "pouchdb"; // read all the changes from 0 when checkpoint PUTs fail // CouchDB 2.0 has a more involved history pruning, // but let's go for the simple version for now. -var CHECKPOINT_HISTORY_SIZE = 5; -var LOWEST_SEQ = 0; - -function updateCheckpoint(db, id, checkpoint, session, returnValue) { - return db.get(id).catch(function (err) { - if (err.status === 404) { - if (db.adapter === 'http' || db.adapter === 'https') { - explainError( - 404, 'PouchDB is just checking if a remote checkpoint exists.' - ); - } - return { - session_id: session, - _id: id, - history: [], - replicator: REPLICATOR, - version: CHECKPOINT_VERSION - }; - } - throw err; - }).then(function (doc) { - if (returnValue.cancelled) { - return; - } - - // if the checkpoint has not changed, do not update - if (doc.last_seq === checkpoint) { - return; - } - - // Filter out current entry for this replication - doc.history = (doc.history || []).filter(function (item) { - return item.session_id !== session; - }); - - // Add the latest checkpoint to history - doc.history.unshift({ - last_seq: checkpoint, - session_id: session - }); - - // Just take the last pieces in history, to - // avoid really big checkpoint docs. - // see comment on history size above - doc.history = doc.history.slice(0, CHECKPOINT_HISTORY_SIZE); - - doc.version = CHECKPOINT_VERSION; - doc.replicator = REPLICATOR; - - doc.session_id = session; - doc.last_seq = checkpoint; - - return db.put(doc).catch(function (err) { - if (err.status === 409) { - // retry; someone is trying to write a checkpoint simultaneously - return updateCheckpoint(db, id, checkpoint, session, returnValue); - } - throw err; - }); - }); -} +const CHECKPOINT_HISTORY_SIZE = 5; +const LOWEST_SEQ = 0; class CheckpointerInternal { constructor(src, target, id, returnValue, opts = { @@ -92,120 +33,182 @@ class CheckpointerInternal { } } - writeCheckpoint(checkpoint, session) { - var self = this; - return this.updateTarget(checkpoint, session).then(function () { - return self.updateSource(checkpoint, session); - }); + async writeCheckpoint(checkpoint, session) { + // update target before source every time + // because otherwise, compareHistory will pick a too new seq from source + // after an error writing a checkpoint to the target + await this._updateTarget(checkpoint, session); + return this._updateSource(checkpoint, session); } - updateTarget(checkpoint, session) { + async _updateTarget(checkpoint, session) { if (this.opts.writeTargetCheckpoint) { - return updateCheckpoint(this.target, this.id, checkpoint, + return await this._updateCheckpoint(this.target, this.id, checkpoint, session, this.returnValue); - } else { - return Promise.resolve(true); - } + } return true; } - updateSource(checkpoint, session) { + async _updateSource(checkpoint, session) { if (this.opts.writeSourceCheckpoint) { - var self = this; - return updateCheckpoint(this.src, this.id, checkpoint, - session, this.returnValue) - .catch(function (err) { - if (isForbiddenError(err)) { - self.opts.writeSourceCheckpoint = false; - return true; - } - throw err; - }); - } else { - return Promise.resolve(true); + try { + return await this._updateCheckpoint(this.src, this.id, checkpoint, + session, this.returnValue); + } catch (err) { + if (isForbiddenError(err)) { + this.opts.writeSourceCheckpoint = false; + return true; + } + throw err; + } } } - getCheckpoint() { - var self = this; + async _updateCheckpoint(db, id, checkpoint, session, returnValue) { + //retrieve checkpoint doc from db or create a new one + const doc = await this._initCheckpointDoc(db, id, session); + + if (returnValue.cancelled) { + return; + } + // if the checkpoint has not changed, do not update + if (doc.last_seq === checkpoint) { + return; + } + + // Filter out current entry for this replication + doc.history = (doc.history || []).filter(item => item.session_id !== session); + + // Add the latest checkpoint to history + doc.history.unshift({ + last_seq: checkpoint, + session_id: session + }); + + // Just take the last pieces in history, to + // avoid really big checkpoint docs. + // see comment on history size above + doc.history = doc.history.slice(0, CHECKPOINT_HISTORY_SIZE); + + doc.version = CHECKPOINT_VERSION; + doc.replicator = REPLICATOR; - if (!self.opts.writeSourceCheckpoint && !self.opts.writeTargetCheckpoint) { - return Promise.resolve(LOWEST_SEQ); + doc.session_id = session; + doc.last_seq = checkpoint; + + try { + return await db.put(doc); + } catch (err) { + if (err.status === 409) { + // retry; someone is trying to write a checkpoint simultaneously + return this._updateCheckpoint(db, id, checkpoint, session, returnValue); + } + throw err; + } + } + + async _initCheckpointDoc(db, id, session) { + try { + return await db.get(id); + } catch (err) { + if (err.status === 404) { + if (db.adapter === 'http' || db.adapter === 'https') { + explainError( + 404, 'PouchDB is just checking if a remote checkpoint exists.' + ); + } + return { + session_id: session, + _id: id, + history: [], + replicator: REPLICATOR, + version: CHECKPOINT_VERSION + }; + } else { + throw err; + } } + } - if (self.opts && self.opts.writeSourceCheckpoint && !self.opts.writeTargetCheckpoint) { - return self.src.get(self.id).then(function (sourceDoc) { + async getCheckpoint() { + if (!this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) { + return LOWEST_SEQ; + } + + if (this.opts && this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) { + try { + const sourceDoc = await this.src.get(this.id); return sourceDoc.last_seq || LOWEST_SEQ; - }).catch(function (err) { + } catch (err) { /* istanbul ignore if */ if (err.status !== 404) { throw err; } return LOWEST_SEQ; - }); - } - - return self.target.get(self.id).then(function (targetDoc) { - if (self.opts && self.opts.writeTargetCheckpoint && !self.opts.writeSourceCheckpoint) { - return targetDoc.last_seq || LOWEST_SEQ; } + } - return self.src.get(self.id).then(function (sourceDoc) { - // Since we can't migrate an old version doc to a new one - // (no session id), we just go with the lowest seq in this case - /* istanbul ignore if */ - if (targetDoc.version !== sourceDoc.version) { + let targetDoc; + try { + targetDoc = await this.target.get(this.id); + } catch (err) { + if (err.status !== 404) { + throw err; + } return LOWEST_SEQ; - } + } - var version; - if (targetDoc.version) { - version = targetDoc.version.toString(); - } else { - version = "undefined"; - } + if (this.opts && this.opts.writeTargetCheckpoint && !this.opts.writeSourceCheckpoint) { + return targetDoc.last_seq || LOWEST_SEQ; + } - if (version in comparisons) { - return comparisons[version](targetDoc, sourceDoc); - } - /* istanbul ignore next */ + try { + const sourceDoc = await this.src.get(this.id); + // Since we can't migrate an old version doc to a new one + // (no session id), we just go with the lowest seq in this case + /* istanbul ignore if */ + if (targetDoc.version !== sourceDoc.version) { return LOWEST_SEQ; - }, function (err) { - if (err.status === 404 && targetDoc.last_seq) { - return self.src.put({ - _id: self.id, + } + const version = targetDoc.version ? targetDoc.version.toString() : "undefined"; + + if (version in comparisons) { + return comparisons[version](targetDoc, sourceDoc); + } + /* istanbul ignore next */ + return LOWEST_SEQ; + } catch (err) { + if (err.status === 404 && targetDoc.last_seq) { + try { + await this.src.put({ + _id: this.id, last_seq: LOWEST_SEQ - }).then(function () { - return LOWEST_SEQ; - }, function (err) { - if (isForbiddenError(err)) { - self.opts.writeSourceCheckpoint = false; - return targetDoc.last_seq; - } - /* istanbul ignore next */ - return LOWEST_SEQ; }); + return LOWEST_SEQ; + } catch (err) { + if (isForbiddenError(err)) { + this.opts.writeSourceCheckpoint = false; + return targetDoc.last_seq; + } + /* istanbul ignore next */ + return LOWEST_SEQ; } - throw err; - }); - }).catch(function (err) { - if (err.status !== 404) { - throw err; } - return LOWEST_SEQ; - }); + //missing sourceDoc on initial replication returns LOWEST_SEQ + if (err.status === 404) { + return LOWEST_SEQ; + } + throw err; + } } } -var comparisons = { - "undefined": function (targetDoc, sourceDoc) { +const comparisons = { + "undefined": (targetDoc, sourceDoc) => { // This is the previous comparison function - if (collate(targetDoc.last_seq, sourceDoc.last_seq) === 0) { - return sourceDoc.last_seq; - } /* istanbul ignore next */ - return 0; + return collate(targetDoc.last_seq, sourceDoc.last_seq) === 0 ? sourceDoc.last_seq : 0; }, - "1": function (targetDoc, sourceDoc) { + "1": (targetDoc, sourceDoc) => { // This is the comparison function ported from CouchDB return compareReplicationLogs(sourceDoc, targetDoc).last_seq; } @@ -229,10 +232,8 @@ function compareReplicationLogs(srcDoc, tgtDoc) { function compareReplicationHistory(sourceHistory, targetHistory) { // the erlang loop via function arguments is not so easy to repeat in JS // therefore, doing this as recursion - var S = sourceHistory[0]; - var sourceRest = sourceHistory.slice(1); - var T = targetHistory[0]; - var targetRest = targetHistory.slice(1); + const [S, ...sourceRest] = sourceHistory; + const [T, ...targetRest] = targetHistory; if (!S || targetHistory.length === 0) { return { @@ -241,7 +242,7 @@ function compareReplicationHistory(sourceHistory, targetHistory) { }; } - var sourceId = S.session_id; + const sourceId = S.session_id; /* istanbul ignore if */ if (hasSessionId(sourceId, targetHistory)) { return { @@ -250,7 +251,7 @@ function compareReplicationHistory(sourceHistory, targetHistory) { }; } - var targetId = T.session_id; + const targetId = T.session_id; if (hasSessionId(targetId, sourceRest)) { return { last_seq: T.last_seq, @@ -262,18 +263,10 @@ function compareReplicationHistory(sourceHistory, targetHistory) { } function hasSessionId(sessionId, history) { - var props = history[0]; - var rest = history.slice(1); - if (!sessionId || history.length === 0) { return false; } - - if (sessionId === props.session_id) { - return true; - } - - return hasSessionId(sessionId, rest); + return history.some(entry => entry.session_id === sessionId); } function isForbiddenError(err) {