Skip to content
307 changes: 150 additions & 157 deletions packages/node_modules/pouchdb-checkpointer/src/index.js
Original file line number Diff line number Diff line change
@@ -1,76 +1,17 @@
import { explainError } from 'pouchdb-utils';
import { collate } from 'pouchdb-collate';

var CHECKPOINT_VERSION = 1;
var REPLICATOR = "pouchdb";
const CHECKPOINT_VERSION = 1;
const REPLICATOR = "pouchdb";
// This is an arbitrary number to limit the
// amount of replication history we save in the checkpoint.
// If we save too much, the checkpoint docs will become very big,
// if we save fewer, we'll run a greater risk of having to
// read all the changes from 0 when checkpoint PUTs fail
// CouchDB 2.0 has a more involved history pruning,
// but let's go for the simple version for now.
var CHECKPOINT_HISTORY_SIZE = 5;
var LOWEST_SEQ = 0;

function updateCheckpoint(db, id, checkpoint, session, returnValue) {
return db.get(id).catch(function (err) {
if (err.status === 404) {
if (db.adapter === 'http' || db.adapter === 'https') {
explainError(
404, 'PouchDB is just checking if a remote checkpoint exists.'
);
}
return {
session_id: session,
_id: id,
history: [],
replicator: REPLICATOR,
version: CHECKPOINT_VERSION
};
}
throw err;
}).then(function (doc) {
if (returnValue.cancelled) {
return;
}

// if the checkpoint has not changed, do not update
if (doc.last_seq === checkpoint) {
return;
}

// Filter out current entry for this replication
doc.history = (doc.history || []).filter(function (item) {
return item.session_id !== session;
});

// Add the latest checkpoint to history
doc.history.unshift({
last_seq: checkpoint,
session_id: session
});

// Just take the last pieces in history, to
// avoid really big checkpoint docs.
// see comment on history size above
doc.history = doc.history.slice(0, CHECKPOINT_HISTORY_SIZE);

doc.version = CHECKPOINT_VERSION;
doc.replicator = REPLICATOR;

doc.session_id = session;
doc.last_seq = checkpoint;

return db.put(doc).catch(function (err) {
if (err.status === 409) {
// retry; someone is trying to write a checkpoint simultaneously
return updateCheckpoint(db, id, checkpoint, session, returnValue);
}
throw err;
});
});
}
const CHECKPOINT_HISTORY_SIZE = 5;
const LOWEST_SEQ = 0;

class CheckpointerInternal {
constructor(src, target, id, returnValue, opts = {
Expand All @@ -92,120 +33,182 @@ class CheckpointerInternal {
}
}

writeCheckpoint(checkpoint, session) {
var self = this;
return this.updateTarget(checkpoint, session).then(function () {
return self.updateSource(checkpoint, session);
});
async writeCheckpoint(checkpoint, session) {
// update target before source every time
// because otherwise, compareHistory will pick a too new seq from source
// after an error writing a checkpoint to the target
await this._updateTarget(checkpoint, session);
return this._updateSource(checkpoint, session);
}

updateTarget(checkpoint, session) {
async _updateTarget(checkpoint, session) {
if (this.opts.writeTargetCheckpoint) {
return updateCheckpoint(this.target, this.id, checkpoint,
return await this._updateCheckpoint(this.target, this.id, checkpoint,
session, this.returnValue);
} else {
return Promise.resolve(true);
}
} return true;
}

updateSource(checkpoint, session) {
async _updateSource(checkpoint, session) {
if (this.opts.writeSourceCheckpoint) {
var self = this;
return updateCheckpoint(this.src, this.id, checkpoint,
session, this.returnValue)
.catch(function (err) {
if (isForbiddenError(err)) {
self.opts.writeSourceCheckpoint = false;
return true;
}
throw err;
});
} else {
return Promise.resolve(true);
try {
return await this._updateCheckpoint(this.src, this.id, checkpoint,
session, this.returnValue);
} catch (err) {
if (isForbiddenError(err)) {
this.opts.writeSourceCheckpoint = false;
return true;
}
throw err;
}
}
}

getCheckpoint() {
var self = this;
async _updateCheckpoint(db, id, checkpoint, session, returnValue) {
//retrieve checkpoint doc from db or create a new one
const doc = await this._initCheckpointDoc(db, id, session);

if (returnValue.cancelled) {
return;
}
// if the checkpoint has not changed, do not update
if (doc.last_seq === checkpoint) {
return;
}

// Filter out current entry for this replication
doc.history = (doc.history || []).filter(item => item.session_id !== session);

// Add the latest checkpoint to history
doc.history.unshift({
last_seq: checkpoint,
session_id: session
});

// Just take the last pieces in history, to
// avoid really big checkpoint docs.
// see comment on history size above
doc.history = doc.history.slice(0, CHECKPOINT_HISTORY_SIZE);

doc.version = CHECKPOINT_VERSION;
doc.replicator = REPLICATOR;

if (!self.opts.writeSourceCheckpoint && !self.opts.writeTargetCheckpoint) {
return Promise.resolve(LOWEST_SEQ);
doc.session_id = session;
doc.last_seq = checkpoint;

try {
return await db.put(doc);
} catch (err) {
if (err.status === 409) {
// retry; someone is trying to write a checkpoint simultaneously
return this._updateCheckpoint(db, id, checkpoint, session, returnValue);
}
throw err;
}
}

async _initCheckpointDoc(db, id, session) {
try {
return await db.get(id);
} catch (err) {
if (err.status === 404) {
if (db.adapter === 'http' || db.adapter === 'https') {
explainError(
404, 'PouchDB is just checking if a remote checkpoint exists.'
);
}
return {
session_id: session,
_id: id,
history: [],
replicator: REPLICATOR,
version: CHECKPOINT_VERSION
};
} else {
throw err;
}
}
}

if (self.opts && self.opts.writeSourceCheckpoint && !self.opts.writeTargetCheckpoint) {
return self.src.get(self.id).then(function (sourceDoc) {
async getCheckpoint() {
if (!this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) {
return LOWEST_SEQ;
}

if (this.opts && this.opts.writeSourceCheckpoint && !this.opts.writeTargetCheckpoint) {
try {
const sourceDoc = await this.src.get(this.id);
return sourceDoc.last_seq || LOWEST_SEQ;
}).catch(function (err) {
} catch (err) {
/* istanbul ignore if */
if (err.status !== 404) {
throw err;
}
return LOWEST_SEQ;
});
}

return self.target.get(self.id).then(function (targetDoc) {
if (self.opts && self.opts.writeTargetCheckpoint && !self.opts.writeSourceCheckpoint) {
return targetDoc.last_seq || LOWEST_SEQ;
}
}

return self.src.get(self.id).then(function (sourceDoc) {
// Since we can't migrate an old version doc to a new one
// (no session id), we just go with the lowest seq in this case
/* istanbul ignore if */
if (targetDoc.version !== sourceDoc.version) {
let targetDoc;
try {
targetDoc = await this.target.get(this.id);
} catch (err) {
if (err.status !== 404) {
throw err;
}
return LOWEST_SEQ;
}
}

var version;
if (targetDoc.version) {
version = targetDoc.version.toString();
} else {
version = "undefined";
}
if (this.opts && this.opts.writeTargetCheckpoint && !this.opts.writeSourceCheckpoint) {
return targetDoc.last_seq || LOWEST_SEQ;
}

if (version in comparisons) {
return comparisons[version](targetDoc, sourceDoc);
}
/* istanbul ignore next */
try {
const sourceDoc = await this.src.get(this.id);
// Since we can't migrate an old version doc to a new one
// (no session id), we just go with the lowest seq in this case
/* istanbul ignore if */
if (targetDoc.version !== sourceDoc.version) {
return LOWEST_SEQ;
}, function (err) {
if (err.status === 404 && targetDoc.last_seq) {
return self.src.put({
_id: self.id,
}
const version = targetDoc.version ? targetDoc.version.toString() : "undefined";

if (version in comparisons) {
return comparisons[version](targetDoc, sourceDoc);
}
/* istanbul ignore next */
return LOWEST_SEQ;
} catch (err) {
if (err.status === 404 && targetDoc.last_seq) {
try {
await this.src.put({
_id: this.id,
last_seq: LOWEST_SEQ
}).then(function () {
return LOWEST_SEQ;
}, function (err) {
if (isForbiddenError(err)) {
self.opts.writeSourceCheckpoint = false;
return targetDoc.last_seq;
}
/* istanbul ignore next */
return LOWEST_SEQ;
});
return LOWEST_SEQ;
} catch (err) {
if (isForbiddenError(err)) {
this.opts.writeSourceCheckpoint = false;
return targetDoc.last_seq;
}
/* istanbul ignore next */
return LOWEST_SEQ;
}
throw err;
});
}).catch(function (err) {
if (err.status !== 404) {
throw err;
}
return LOWEST_SEQ;
});
//missing sourceDoc on initial replication returns LOWEST_SEQ
if (err.status === 404) {
return LOWEST_SEQ;
}
throw err;
}
}
}

var comparisons = {
"undefined": function (targetDoc, sourceDoc) {
const comparisons = {
"undefined": (targetDoc, sourceDoc) => {
// This is the previous comparison function
if (collate(targetDoc.last_seq, sourceDoc.last_seq) === 0) {
return sourceDoc.last_seq;
}
/* istanbul ignore next */
return 0;
return collate(targetDoc.last_seq, sourceDoc.last_seq) === 0 ? sourceDoc.last_seq : 0;
},
"1": function (targetDoc, sourceDoc) {
"1": (targetDoc, sourceDoc) => {
// This is the comparison function ported from CouchDB
return compareReplicationLogs(sourceDoc, targetDoc).last_seq;
}
Expand All @@ -229,10 +232,8 @@ function compareReplicationLogs(srcDoc, tgtDoc) {
function compareReplicationHistory(sourceHistory, targetHistory) {
// the erlang loop via function arguments is not so easy to repeat in JS
// therefore, doing this as recursion
var S = sourceHistory[0];
var sourceRest = sourceHistory.slice(1);
var T = targetHistory[0];
var targetRest = targetHistory.slice(1);
const [S, ...sourceRest] = sourceHistory;
const [T, ...targetRest] = targetHistory;

if (!S || targetHistory.length === 0) {
return {
Expand All @@ -241,7 +242,7 @@ function compareReplicationHistory(sourceHistory, targetHistory) {
};
}

var sourceId = S.session_id;
const sourceId = S.session_id;
/* istanbul ignore if */
if (hasSessionId(sourceId, targetHistory)) {
return {
Expand All @@ -250,7 +251,7 @@ function compareReplicationHistory(sourceHistory, targetHistory) {
};
}

var targetId = T.session_id;
const targetId = T.session_id;
if (hasSessionId(targetId, sourceRest)) {
return {
last_seq: T.last_seq,
Expand All @@ -262,18 +263,10 @@ function compareReplicationHistory(sourceHistory, targetHistory) {
}

function hasSessionId(sessionId, history) {
var props = history[0];
var rest = history.slice(1);

if (!sessionId || history.length === 0) {
return false;
}

if (sessionId === props.session_id) {
return true;
}

return hasSessionId(sessionId, rest);
return history.some(entry => entry.session_id === sessionId);
}

function isForbiddenError(err) {
Expand Down