// the path to the "working directory"
// can be structured like
// [set 1](https://github.com/adrai/node-cqrs-saga/tree/master/test/integration/fixture)
sagaPath: '/path/to/my/files',
// optional, default is 800
// if using in scaled systems and not guaranteeing that each event for a saga "instance"
// dispatches to the same worker process, this module tries to catch the concurrency issues and
// retries to handle the event after a timeout between 0 and the defined value
retryOnConcurrencyTimeout: 1000,
// optional, default is in-memory
// currently supports: mongodb, redis, azuretable and inmemory
// hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
// mongodb:
sagaStore: {
type: 'mongodb',
host: 'localhost', // optional
port: 27017, // optional
dbName: 'domain', // optional
collectionName: 'sagas', // optional
timeout: 10000 // optional
// authSource: 'authedicationDatabase', // optional
// username: 'technicalDbUser', // optional
// password: 'secret' // optional
// url: 'mongodb://user:pass@host:port/db?opts // optional
},
// or redis:
sagaStore: {
type: 'redis',
host: 'localhost', // optional
port: 6379, // optional
db: 0, // optional
prefix: 'domain_saga', // optional
timeout: 10000 // optional
// password: 'secret' // optional
},
// optional, default is in-memory
// the revisionguard only works if aggregateId and revision are defined in event definition
// currently supports: mongodb, redis, tingodb, azuretable and inmemory
// hint settings like: [eventstore](https://github.com/adrai/node-eventstore#provide-implementation-for-storage)
revisionGuard: {
queueTimeout: 1000, // optional, | timeout for non-handled events in the internal in-memory queue
queueTimeoutMaxLoops: 3, // optional, | maximal loop count for non-handled event in the internal in-| memory queue
startRevisionNumber: 1, // optional, if defined the denormaizer waits for an event with that revision to be used as first
type: 'redis',
host: 'localhost', // optional
port: 6379, // optional
db: 0, // optional
prefix: 'readmodel_revision', // optional
timeout: 10000 // optional
// password: 'secret' // optional
}
});
// sagaStore
pm.sagaStore.on('connect', function() {
console.log('sagaStore connected');
});
pm.sagaStore.on('disconnect', function() {
console.log('sagaStore disconnected');
});
// revisionGuardStore
pm.revisionGuardStore.on('connect', function() {
console.log('revisionGuardStore connected');
});
pm.revisionGuardStore.on('disconnect', function() {
console.log('revisionGuardStore disconnected');
});
// anything (sagaStore or revisionGuardStore)
pm.on('connect', function() {
console.log('something connected');
});
pm.on('disconnect', function() {
console.log('something disconnected');
});
For example in a cloud environment the disconnect event could be used to kill the process.
The values describes the path to that property in the event message.
pm.defineEvent({
// optional, default is 'name'
name: 'name',
// optional, only makes sense if contexts are defined in the 'domainPath' structure
context: 'context.name',
// optional, only makes sense if aggregates with names are defined in the 'domainPath' structure
aggregate: 'aggregate.name',
// optional, default is 'aggregate.id'
aggregateId: 'aggregate.id',
// optional, default is 'revision'
// will represent the aggregate revision, can be used in next command
revision: 'revision',
// optional
version: 'version',
// optional, if defined theses values will be copied to the command (can be used to transport information like userId, etc..)
meta: 'meta'
});
The values describes the path to that property in the command message.
pm.defineCommand({
// optional, default is 'id'
id: 'id',
// optional, if defined the values of the event will be copied to the command (can be used to transport information like userId, etc..)
meta: 'meta'
});
you can define a synchronous function
pm.idGenerator(function () {
var id = require('node-uuid').v4().toString();
return id;
});
or you can define an asynchronous function
pm.idGenerator(function (callback) {
setTimeout(function () {
var id = require('node-uuid').v4().toString();
callback(null, id);
}, 50);
});
you can define a synchronous function
// pass commands to bus
pm.onCommand(function (cmd) {
bus.emit('command', cmd);
});
or you can define an asynchronous function
// pass commands to bus
pm.onCommand(function (cmd, callback) {
bus.emit('command', cmd, function ack () {
callback();
});
});
pm.onEventMissing(function (info, evt) {
// grab the missing events, depending from info values...
// info.aggregateId
// info.aggregateRevision
// info.aggregate
// info.context
// info.guardRevision
// and call handle...
pm.handle(missingEvent, function (err) {
if (err) { console.log(err); }
});
});
you can get the last guarded event:
pm.getLastEvent(function (err, evt) {
if (event.occurredAt < Date.now()) {
// ...
}
});
pm.init(function (err, warnings) {
// this callback is called when all is ready...
// warnings: if no warnings warnings is null, else it's an array containing errors during require of files
});
// or
pm.init(); // callback is optional
pm.handle({
id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
name: 'orderCreated',
aggregate: {
id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
name: 'order'
},
context: {
name: 'sale'
},
payload: {
totalCosts: 520,
seats: ['4f', '8a']
},
revision: 0,
version: 1,
meta: {
userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
}
}); // callback is optional
or
pm.handle({
id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
name: 'orderCreated',
aggregate: {
id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
name: 'order'
},
context: {
name: 'sale'
},
payload: {
totalCosts: 520,
seats: ['4f', '8a']
},
revision: 0,
version: 1,
meta: {
userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
}
}, function (errs, cmds) {
// this callback is called when event is handled successfully or unsuccessfully
// errs can be of type:
// - null
// - Array of Errors
//
// cmds: same as passed in 'onCommand' function
});
more infos, can be useful if testing
pm.handle({
id: 'b80ade36-dd05-4340-8a8b-846eea6e286f',
name: 'orderCreated',
aggregate: {
id: '3b4d44b0-34fb-4ceb-b212-68fe7a7c2f70',
name: 'order'
},
context: {
name: 'sale'
},
payload: {
totalCosts: 520,
seats: ['4f', '8a']
},
meta: {
userId: 'ccd65819-4da4-4df9-9f24-5b10bf89ef89'
}
}, function (errs, cmds, sagaModels) {
// this callback is called when event is handled successfully or unsuccessfully
// errs: is the same as described before
|
// cmds: same as passed in 'onCommand' function
// cmds: in case of no error or in case of error here is the array of all commands that should be published
|
// sagaModels: represents the saga data after have handled the event
});
pm.init(function (err) {
pm.getInfo();
// ==>
// {
// "sagas": [
// {
// "name": "orderConfirmed",
// "aggregate": "order",
// "context": "sale",
// "version": 0
// },
// {
// "name": "orderCreated",
// "aggregate": "order",
// "context": "sale",
// "version": 0
// },
// {
// "name": "paymentAccepted",
// "aggregate": "payment",
// "context": "sale",
// "version": 2
// },
// {
// "name": "seatsReserved",
// "aggregate": "reservation",
// "context": "sale",
// "version": 0
// }
// ]
// }
});
module.exports = require('cqrs-saga').defineSaga({
// optional, default is file name without extension
name: 'orderCreated',
// optional
aggregate: 'order',
// optional
context: 'sale',
// optional, default 0
version: 1,
// optional, default false
// if true it will check if there is already a saga in the db and only if there is something it will continue...
existing: false,
// optional, will catch the event only if it contains the defined properties
containingProperties: ['aggregate.id', 'payload.totalCosts', 'payload.seats'],
// optional, if not defined it will pass the whole event...
payload: 'payload',
// optional, if not defined it will generate a new id
// it will try to load the saga from the db by this id
id: 'aggregate.id',
// optional, default Infinity, all sagas will be sorted by this value
priority: 1
}, function (evt, saga, callback) {
saga.set('orderId', evt.aggregate.id);
saga.set('totalCosts', evt.payload.totalCosts);
// or
// saga.set({ orderId: evt.aggregate.id, totalCosts: evt.payload.totalCosts });
var cmd = {
// if you don't pass an id it will generate a new one
id: 'my own command id',
name: 'makeReservation',
aggregate: {
name: 'reservation'
},
context: {
name: 'sale'
},
payload: {
transactionId: saga.id,
seats: saga.has('seats') ? saga.get('seats') : evt.payload.seats
},
// to transport meta infos (like userId)...
// if not defined, it will use the meta value of the event
meta: evt.meta
};
|
saga.addCommandToSend(cmd);
|
// optionally define a timeout
// this can be useful if you have an other process that will fetch timeouted sagas
var tomorrow = new Date();
tomorrow.setDate((new Date()).getDate() + 1);
var timeoutCmd = {
// if you don't pass an id it will generate a new one
id: 'my own command id',
name: 'cancelOrder',
aggregate: {
name: 'order',
id: evt.aggregate.id
},
context: {
name: 'sale'
},
payload: {
transactionId: saga.id
},
// to transport meta infos (like userId)...
// if not defined, it will use the meta value of the event
meta: evt.meta
};
saga.defineTimeout(tomorrow, [timeoutCmd]);
// or
// saga.defineTimeout(tomorrow, timeoutCmd);
// or
// saga.defineTimeout(tomorrow);
saga.commit(callback);
});
// optional define a function to that returns an id that will be used as saga id
//.useAsId(function (evt) {
// return 'newId';
//});
// or
//.useAsId(function (evt, callback) {
// callback(null, 'newId');
//});
//
// optional define a function that checks if an event should be handled
//.defineShouldHandle(function (evt, saga) {
// return true;
//});
// or
//.defineShouldHandle(function (evt, saga, callback) {
// callback(null, true');
//});
Use this function to get all timeouted sagas.
pm.getTimeoutedSagas(function (err, sagas) {
if (err) { return console.log('ohh!'); }
sagas.forEach(function (saga) {
// saga.id...
// saga.getTimeoutAt();
var cmds = saga.getTimeoutCommands();
cmds.forEach(function (cmd) {
saga.addCommandToSend(cmd);
});
saga.commit(function (err) {
// if you have registered the pm.onCommand handler it will be automatically executed,
// if you have not registered the pm.onCommand handler you need to publish and set the command to dispatched on your own!
cmds.forEach(function (cmd) {
// publish cmd...
// msgBus.send(cmd);
// ... and set to dispatched...
pm.setCommandToDispatched(cmd.id, saga.id, function (err) {});
});
});
// or if saga does not clean itself after timouted and/or no commands are defined, then:
// pm.removeSaga(saga || saga.id, function (err) {});
// or
// saga.destroy();
// saga.commit(function (err) {});
});
});
Use this function to get all sagas that are older then the passed date.
pm.getOlderSagas(new Date(2010, 2, 4), function (err, sagas) {
if (err) { return console.log('ohh!'); }
sagas.forEach(function (saga) {
// saga.id...
// saga.getTimeoutAt();
// saga.getTimeoutCommands();
// if saga does not clean itself after timouted and/or no commands are defined, then:
pm.removeSaga(saga || saga.id, function (err) {});
// or
// saga.destroy();
// saga.commit(function (err) {});
});
});
Use getUndispatchedCommands to get all undispatched commands.
Use setCommandToDispatched to mark a command as dispatched. (will remove it from the db)
pm.getUndispatchedCommands(function (err, cmds) {
if (err) { return console.log('ohh!'); }
cmds.forEach(function (cmd) {
// cmd is: { sagaId: 'the id of the saga', commandId: 'the id of the command', commitStamp: 'a date', command: { /* the command */ } }
pm.setCommandToDispatched(cmd.commandId, cmd.sagaId, function (err) {});
});
});