Skip to content

Commit

Permalink
Merge pull request #2959 from kobaska/add-multi-tenancy
Browse files Browse the repository at this point in the history
Support multi-tenancy in change replication - allow apps to extend Change model with extra properties
  • Loading branch information
bajtos authored Mar 9, 2017
2 parents 011dc1f + 3f4b5ec commit 0448184
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 16 deletions.
25 changes: 20 additions & 5 deletions common/models/change.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,32 @@ module.exports = function(Change) {

cb = cb || utils.createPromiseCallback();

change.currentRevision(function(err, rev) {
const model = this.getModelCtor();
const id = this.getModelId();

model.findById(id, function(err, inst) {
if (err) return cb(err);

if (inst) {
inst.fillCustomChangeProperties(change, function() {
const rev = Change.revisionForInst(inst);
prepareAndDoRectify(rev);
});
} else {
prepareAndDoRectify(null);
}
});

return cb.promise;

function prepareAndDoRectify(rev) {
// avoid setting rev and prev to the same value
if (currentRev === rev) {
change.debug('rev and prev are equal (not updating anything)');
return cb(null, change);
}

// FIXME(@bajtos) Allo callers to pass in the checkpoint value
// FIXME(@bajtos) Allow callers to pass in the checkpoint value
// (or even better - a memoized async function to get the cp value)
// That will enable `rectifyAll` to cache the checkpoint value
change.constructor.getCheckpointModel().current(
Expand All @@ -202,8 +218,7 @@ module.exports = function(Change) {
doRectify(checkpoint, rev);
}
);
});
return cb.promise;
}

function doRectify(checkpoint, rev) {
if (rev) {
Expand All @@ -228,7 +243,7 @@ module.exports = function(Change) {
if (currentRev) {
change.prev = currentRev;
} else if (!change.prev) {
change.debug('ERROR - could not determing prev');
change.debug('ERROR - could not determine prev');
change.prev = Change.UNKNOWN;
}
change.debug('updated prev');
Expand Down
86 changes: 78 additions & 8 deletions lib/persisted-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -1052,17 +1052,15 @@ module.exports = function(registry) {
var idName = this.dataSource.idName(this.modelName);
var Change = this.getChangeModel();
var model = this;
const changeFilter = this.createChangeFilter(since, filter);

filter = filter || {};
filter.fields = {};
filter.where = filter.where || {};
filter.fields[idName] = true;

// TODO(ritch) this whole thing could be optimized a bit more
Change.find({where: {
checkpoint: {gte: since},
modelName: this.modelName,
}}, function(err, changes) {
Change.find(changeFilter, function(err, changes) {
if (err) return callback(err);
if (!Array.isArray(changes) || changes.length === 0) return callback(null, []);
var ids = changes.map(function(change) {
Expand Down Expand Up @@ -1759,11 +1757,12 @@ module.exports = function(registry) {
assert(BaseChangeModel,
'Change model must be defined before enabling change replication');

const additionalChangeModelProperties =
this.settings.additionalChangeModelProperties || {};

this.Change = BaseChangeModel.extend(this.modelName + '-change',
{},
{
trackModel: this,
}
additionalChangeModelProperties,
{trackModel: this}
);

if (this.dataSource) {
Expand Down Expand Up @@ -1928,6 +1927,77 @@ module.exports = function(registry) {
}
};

/**
* Get the filter for searching related changes.
*
* Models should override this function to copy properties
* from the model instance filter into the change search filter.
*
* ```js
* module.exports = (TargetModel, config) => {
* TargetModel.createChangeFilter = function(since, modelFilter) {
* const filter = this.base.createChangeFilter.apply(this, arguments);
* if (modelFilter && modelFilter.where && modelFilter.where.tenantId) {
* filter.where.tenantId = modelFilter.where.tenantId;
* }
* return filter;
* };
* };
* ```
*
* @param {Number} since Return only changes since this checkpoint.
* @param {Object} modelFilter Filter describing which model instances to
* include in the list of changes.
* @returns {Object} The filter object to pass to `Change.find()`. Default:
* ```
* {where: {checkpoint: {gte: since}, modelName: this.modelName}}
* ```
*/
PersistedModel.createChangeFilter = function(since, modelFilter) {
return {
where: {
checkpoint: {gte: since},
modelName: this.modelName,
},
};
};

/**
* Add custom data to the Change instance.
*
* Models should override this function to duplicate model instance properties
* to the Change instance properties, typically to allow the changes() method
* to filter the changes using these duplicated properties directly while
* querying the Change model.
*
* ```js
* module.exports = (TargetModel, config) => {
* TargetModel.prototype.fillCustomChangeProperties = function(change, cb) {
* var inst = this;
* const base = this.constructor.base;
* base.prototype.fillCustomChangeProperties.call(this, change, err => {
* if (err) return cb(err);
*
* if (inst && inst.tenantId) {
* change.tenantId = inst.tenantId;
* } else {
* change.tenantId = null;
* }
*
* cb();
* });
* };
* };
* ```
*
* @callback {Function} callback
* @param {Error} err Error object; see [Error object](http://loopback.io/doc/en/lb3/Error-object.html).
*/
PersistedModel.prototype.fillCustomChangeProperties = function(change, cb) {
// no-op by default
cb();
};

PersistedModel.setup();

return PersistedModel;
Expand Down
70 changes: 67 additions & 3 deletions test/change.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ var async = require('async');
var expect = require('./helpers/expect');
var loopback = require('../');

var Change, TestModel;

describe('Change', function() {
let Change, TestModel;

beforeEach(function() {
var memory = loopback.createDataSource({
connector: loopback.Memory,
Expand Down Expand Up @@ -321,7 +321,6 @@ describe('Change', function() {
change.rectify()
.then(function(ch) {
assert.equal(ch.rev, test.revisionForModel);

done();
})
.catch(done);
Expand Down Expand Up @@ -609,3 +608,68 @@ describe('Change', function() {
});
});
});

describe('Change with with custom properties', function() {
let Change, TestModel;

beforeEach(function() {
let memory = loopback.createDataSource({
connector: loopback.Memory,
});

TestModel = loopback.PersistedModel.extend('ChangeTestModelWithTenant',
{
id: {id: true, type: 'string', defaultFn: 'guid'},
tenantId: 'string',
},
{
trackChanges: true,
additionalChangeModelProperties: {tenantId: 'string'},
});
this.modelName = TestModel.modelName;

TestModel.prototype.fillCustomChangeProperties = function(change, cb) {
var inst = this;

if (inst && inst.tenantId) {
change.tenantId = inst.tenantId;
} else {
change.tenantId = null;
}

cb();
};

TestModel.attachTo(memory);
TestModel._defineChangeModel();
Change = TestModel.getChangeModel();
});

describe('change.rectify', function() {
const TENANT_ID = '123';
let change;

beforeEach(givenChangeInstance);

it('stores the custom property in the Change instance', function() {
return change.rectify().then(function(ch) {
expect(ch.toObject()).to.have.property('tenantId', TENANT_ID);
});
});

function givenChangeInstance() {
const data = {
foo: 'bar',
tenantId: TENANT_ID,
};

return TestModel.create(data)
.then(function(model) {
const modelName = TestModel.modelName;
return Change.findOrCreateChange(modelName, model.id);
}).then(function(ch) {
change = ch;
});
}
});
});
128 changes: 128 additions & 0 deletions test/replication.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1905,3 +1905,131 @@ describe('Replication / Change APIs', function() {
});
}
});

describe('Replication / Change APIs with custom change properties', function() {
this.timeout(10000);
var dataSource, useSinceFilter, SourceModel, TargetModel, startingCheckpoint;
var tid = 0; // per-test unique id used e.g. to build unique model names

beforeEach(function() {
tid++;
useSinceFilter = false;
var test = this;

dataSource = this.dataSource = loopback.createDataSource({
connector: loopback.Memory,
});
SourceModel = this.SourceModel = PersistedModel.extend(
'SourceModelWithCustomChangeProperties-' + tid,
{
id: {id: true, type: String, defaultFn: 'guid'},
customProperty: {type: 'string'},
},
{
trackChanges: true,
additionalChangeModelProperties: {customProperty: {type: 'string'}},
});

SourceModel.createChangeFilter = function(since, modelFilter) {
const filter = this.base.createChangeFilter.apply(this, arguments);
if (modelFilter && modelFilter.where && modelFilter.where.customProperty)
filter.where.customProperty = modelFilter.where.customProperty;
return filter;
};

SourceModel.prototype.fillCustomChangeProperties = function(change, cb) {
const customProperty = this.customProperty;
const base = this.constructor.base;
base.prototype.fillCustomChangeProperties.call(this, change, err => {
if (err) return cb(err);
change.customProperty = customProperty;
cb();
});
};

SourceModel.attachTo(dataSource);

TargetModel = this.TargetModel = PersistedModel.extend(
'TargetModelWithCustomChangeProperties-' + tid,
{
id: {id: true, type: String, defaultFn: 'guid'},
customProperty: {type: 'string'},
},
{
trackChanges: true,
additionalChangeModelProperties: {customProperty: {type: 'string'}},
});

var ChangeModelForTarget = TargetModel.Change;
ChangeModelForTarget.Checkpoint = loopback.Checkpoint.extend('TargetCheckpoint');
ChangeModelForTarget.Checkpoint.attachTo(dataSource);

TargetModel.attachTo(dataSource);

startingCheckpoint = -1;
});

describe('Model._defineChangeModel()', function() {
it('defines change model with custom properties', function() {
var changeModel = SourceModel.getChangeModel();
var changeModelProperties = changeModel.definition.properties;

expect(changeModelProperties).to.have.property('customProperty');
});
});

describe('Model.changes(since, filter, callback)', function() {
beforeEach(givenSomeSourceModelInstances);

it('queries changes using customized filter', function(done) {
var filterUsed = mockChangeFind(this.SourceModel);

SourceModel.changes(
startingCheckpoint,
{where: {customProperty: '123'}},
function(err, changes) {
if (err) return done(err);
expect(filterUsed[0]).to.eql({
where: {
checkpoint: {gte: -1},
modelName: SourceModel.modelName,
customProperty: '123',
},
});
done();
});
});

it('query returns the matching changes', function(done) {
SourceModel.changes(
startingCheckpoint,
{where: {customProperty: '123'}},
function(err, changes) {
expect(changes).to.have.length(1);
expect(changes[0]).to.have.property('customProperty', '123');
done();
});
});

function givenSomeSourceModelInstances(done) {
const data = [
{name: 'foo', customProperty: '123'},
{name: 'foo', customPropertyValue: '456'},
];
this.SourceModel.create(data, done);
}
});

function mockChangeFind(Model) {
var filterUsed = [];

Model.getChangeModel().find = function(filter, cb) {
filterUsed.push(filter);
if (cb) {
process.nextTick(cb);
}
};

return filterUsed;
}
});

0 comments on commit 0448184

Please sign in to comment.