ChangeMachine¶

ChangeMachine was written to handle the CouchDB change processing required in Steelmesh. It is responsible for monitoring and responding to changes in a number of couchdb instances and taking appropriate actions in response to those changes.
The implementation of ChangeMachine is reasonably simple thanks to the flatiron neuron queueing library and through leveraging changemate notifiers.
Contents¶
Getting Started¶
ChangeMachine is designed to work in similar fashion to manufacturing plant, whereby a plant is made up of many machines that perform a single function, and items (or materials) enter the machine, get processed and leave the machine.
In most cases, a machine processes an item successfully:
machine.on('process', function(item) {
// do something with the item
// mark the item as done
item.done();
});
But sometimes things can go wrong:
machine.on('process', function(item) {
try {
// do something with the item
// mark the item as done
item.done();
}
catch (e) {
// mark the item as failed
item.fail({ error: e });
}
});
Additionally, sometimes you might receive an error when something is done but not want to have to call a separate fail function. You can do this by passing an error in the options for any of the done, fail or skip methods and the status will be remapped to fail:
machine.on('process', function(item) {
// write the item data to a file
writeItemData(item, function(err) {
item.done({ error: err });
});
});
Regardless of how you flag that an item has failed, you probably want to know about it:
machine.on('fail', function(item, err) {
console.log('got a failed item: ' + item.id);
});
Use Cases¶
To be completed
ChangeMachine Examples¶
This section of the documentation comments on the examples that are included with the ChangeMachine source.
Simple Example using Changemate Notifier¶
This simple example demonstrates responding to changes from an external couchdb:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | var cm = require('../'),
machine = new cm.Machine('<:couch:> http://sidelab.iriscouch.com/seattle_neighbourhood', {
concurrency: 25 // override neurons default concurrency of 50
});
machine.on('enter', function(item) {
console.log(' entered: ' + item.id, machine.stats());
});
// perform actions for each of the
machine.on('process', function(item) {
console.log('processing: ' + item.id, machine.stats());
setTimeout(function() {
item.done();
console.log(' done: ' + item.id, machine.stats());
}, Math.random() * 5000);
});
|
If it is all working nicely, you should see output similar to simple.output.txt.
Simple Example demonstrating ready queue¶
In the example above, items existed in either the waiting
or processing
queues. This because the machine had a process
event that could be used to process the items as they enter the machine. In a case where a process
event handler is not defined, however, the items ready for processing (according to neuron’s concurrency setting) will be placed in the ready
queue.
Let’s modify the previous example to wire up the process event after 5 seconds:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | var cm = require('../'),
machine = new cm.Machine('<:couch:> http://sidelab.iriscouch.com/seattle_neighbourhood', {
concurrency: 25 // override neurons default concurrency of 50
});
machine.on('enter', function(item) {
console.log(' entered: ' + item.id, machine.stats());
});
setTimeout(function() {
// perform actions for each of the
machine.on('process', function(item) {
console.log('processing: ' + item.id, machine.stats());
setTimeout(function() {
item.done();
console.log(' done: ' + item.id, machine.stats());
}, Math.random() * 5000);
});
}, 5000);
|
In the output for this example, you should see that before processing starts a number of items are reported in the ready
queue. Once the process
event is connected however, the items move directly from a waiting
status to processing
.
Checkpointing¶
At this stage ChangeMachine implements very simple checkpointing storage but it works nicely and event attempts to synchronously persist data when the process exit event is detected.
Below is an example that demonstrates how a state store is configured:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | const cm = require('../');
const path = require('path');
const sourceUrl = '<:couch:> http://fluxant.cloudant.com/seattle_neighbourhood'
// create a new changemachine instance that will read updates from a remote db
const machine = new cm.Machine(sourceUrl, {
// as updates are processed, we will keep a checkpoint of whether we are up
// to using the following changemachine storage
storage: new cm.JsonStore({ filename: path.resolve(__dirname, 'checkpoint.json') })
});
let counter = 0;
// perform actions for each of the
machine.on('process', function(item) {
console.log('processing item sequence: ' + item.seq);
counter++;
item.done();
// if we have processed 10 items, then stop
if (counter >= 10) {
machine.notifier.close();
}
});
|
Serializing Data from CouchDB¶
If you wanted to extract all the JSON data from documents in a couch database (not the attachments though - although it could be combined with attachmate to achieve that) the following example is probably of interest:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | var cm = require('changemachine'),
fs = require('fs'),
path = require('path'),
machine = new cm.Machine('<:couch:> http://sidelab.iriscouch.com/seattle_neighbourhood', {
include_docs: true,
concurrency: 10 // override neurons default concurrency of 50
}),
dataPath = path.resolve(__dirname, 'data');
// make the data directory
fs.mkdir(dataPath);
// perform actions for each of the
console.log('waiting for change information');
machine.on('process', function(item) {
try {
var text = JSON.stringify(item.doc);
fs.writeFile(path.join(dataPath, item.id + '.json'), text, 'utf8', function(err) {
if (err) {
item.fail(err);
}
else {
item.done();
}
console.log('wrote ' + item.id + '.json', machine.stats());
});
}
catch (e) {
console.log('failed writing: ' + item.id, e);
item.fail(e);
}
});
|
Non Notifier Change Machines¶
While ChangeMachine is designed to work in conjuction with changemate, it is possible to create items and process them manually also.
Example to be completed