- ×
Realtime processing framework for Node.js
Filed under application tools › frameworksShow AllStraw
Realtime processing framework for Node.js
Version 0.4.1
Straw lets you run a Topology of worker Nodes that consume, process, generate and emit messages.
Use Cases
Use it anywhere you need data processed in real-time.
Straw is ideal for building flux style reactive webapps.
You create processing nodes that pass messages to each other or the outside world.
Straw's approach makes it easy to break your problem down in to small steps and develop iteratively.
Each step in the flow is a separate unix process that Straw manages for you, automatically making use of multiple cores, and simplifying spreading the load across multiple machines.
ASX Energy uses Straw to consume live market data via a FIX feed from the exchange, deal with each different type of message from the market, route them to historical storage, implement a delayed feed from the live one, and stream messages to web clients in real-time over socket.io.
Haystack provides an example of how you might do something similar.
Resources
Mailing List
Introduction
Using Straw you connect together a set of worker Nodes.
Each Node is run in it's own process. Messages are passed in and out of Nodes as JSON.
A simple Topology might look like this
ping --> count --> print
Nodes can have multiple inputs and outputs. Messages can be passed out default output or any number of arbitrarily named outputs.
Messages are queued between Nodes, with each Node processing one message at a time.
Redis is used for message passing but Nodes are shielded from implementation. All you need to write is the processing code, extend a handler for receiving messages and call a method to output a message.
There is nothing preventing a node receiving or sending outside the Topology, e.g. write to a database, fetch or listen for network data.
A library method is provided to inject or receive messages from outside the Topology so you can play nicely with existing infrastructure, for example having data pipe in to an Express server for publishing out via socket.io.
Installing
To use Straw in your Node.JS app:
$ npm install straw
Hacking
To play with or work on Straw:
$ git clone git@github.com:simonswain/straw.git $ cd straw $ npm install
Run the tests (
npm install -g grunt-cli
first):$ npm test
Run some examples:
$ node examples/ping-count-print.js
Usage
By convention you create your Nodes in a folder called
nodes
, make a new empty Topology, add nodes to it, then tell it to start processing.This example has a Node that generates timestamps once a second, with it's output going to another that counts the number of pings.
The inputs and outputs connect the nodes together.
var straw = require('straw'); var topo = straw.create(); topo.add([{ id: 'ping', node: 'ping', output:'ping-out' }, { id: 'count', node: 'count', input: 'ping-out', output:'count-out' },{ id: 'print', node: 'print', input: 'count-out' }], function(){ topo.start(); });
To create a Node, you export a module that overrides Straw's stub methods as required.
All nodes have
initialize
,start
,stop
andprocess
methods, which must execute a callback when done. Replace any of these, and add your own private methods if required.Ping is an example of a node that only generates output. Nodes can either consume input, produce output, or both.
var straw = require('straw') module.exports = straw.node({ timer: null, opts: {interval: 1000}, initialize: function(opts, done){ this.opts.interval = opts && opts.interval || 1000; done(); }, start: function(done) { this.timer = setInterval(this.ping.bind(this), this.opts.interval); done(false); }, stop: function(done) { clearInterval(this.timer); done(false); }, ping: function() { this.output({'ping': new Date().getTime()}); } });
#process()
is called every time a message received at the Node's input. It's your handler for inbound messages. For any interesting work you will most probably have to do something here.Your code needs to call
#output()
whenever you have a message to send out from the node, and must excute thedone
callback when finished.Nodes process messages sequentially.
done
lets Straw know you're ready for the next message.Here, the done callback is being passed to
#output
and will be executed once the message has been successfully sent out of the node.var straw = require('straw'); module.exports = straw.node({ total: 0, process: function(msg, done) { this.total ++; this.output({count: this.total}, done); } });
Calling
console.log
from within a node will output timestamped messages to Straw's loggeer showing you which Node they came from.Run the topology like this:
$ node examples/ping-count-print.js info: 2014-05-14 16:25:38 ADD ping info: 2014-05-14 16:25:39 INIT ping 15435 info: 2014-05-14 16:25:39 ADD count info: 2014-05-14 16:25:39 MESSAGE ping INITIALIZED info: 2014-05-14 16:25:39 INIT count 15437 info: 2014-05-14 16:25:39 ADD print info: 2014-05-14 16:25:39 MESSAGE count INITIALIZED info: 2014-05-14 16:25:39 INIT print 15439 info: 2014-05-14 16:25:39 MESSAGE print INITIALIZED info: 2014-05-14 16:25:39 PIPE ping-out 0 info: 2014-05-14 16:25:39 PIPE count-out 0 info: 2014-05-14 16:25:39 TOPOLOGY PURGED info: 2014-05-14 16:25:39 MESSAGE ping STARTING info: 2014-05-14 16:25:39 STARTED ping info: 2014-05-14 16:25:39 MESSAGE ping STARTED info: 2014-05-14 16:25:39 MESSAGE print STARTING info: 2014-05-14 16:25:39 STARTED print info: 2014-05-14 16:25:39 MESSAGE print STARTED info: 2014-05-14 16:25:39 MESSAGE count STARTING info: 2014-05-14 16:25:39 STARTED count info: 2014-05-14 16:25:39 TOPOLOGY STARTED info: 2014-05-14 16:25:39 MESSAGE count STARTED info: 2014-05-14 16:25:40 STDOUT print {"count":1} info: 2014-05-14 16:25:41 STDOUT print {"count":2} info: 2014-05-14 16:25:42 STDOUT print {"count":3}
Press
^C
to stop.Live reload
If you make any changes to a node file it's process will be terminated, re-initialized, and if it was running, restarted. This is really handy in development. Try running the ping-count-print example, edit
examples/nodes/print/index.js
(just add a space somewhere) then save it. You will see output in the log letting you know it's been stopped and restarted.Topology
A Topology is a collection of Nodes. In Straw, you create a Topology, add nodes to it, indicating named pipes that connect the nodes together.
Once you've made a Topology you can start or stop it processing, get runtime stats from it and destroy it when done.
var straw = require('straw'); var topo = straw.create();
You can pass options in to your topology if you need to tell it where Redis is, or to define a
nodes_dir
.Logging can be silenced via the
logging
option. By default it's enabled.var opts = { nodes_dir: __dirname + '/nodes', redis: { host: '127.0.0.1', port: 6379, prefix: 'straw-example' }, logging: {silent: true} }; var topo = straw.create(opts);
redis.prefix
will be prepended to all Redis keys used by that Topology. This is useful for partitioning Topologies on the same server. It is also passed in to the nodes so they can use it.Topology Methods
#add(node)
adds a node.#start()
will start your topology processing. Passive nodes (that just receive inputs) will start checking their inbound pipes for messages. The#start
method on each node will be called to initiate any active processing.#stop()
will call the#stop()
method on all nodes, and stop messages being consumed once the current message on each node is finished.#purge
clears all queued messages from the topology's pipes.#stats(callback(err,data){})
will provide real-time stats on the nodes and pipes in the topology. For nodes, the in and out message counts are given. For pipes, the number of messages currently queued. Seeexamples/stats
.#inspect()
returns the current structure of the topology.#destroy(callback)
stops the topology and destroys all it's nodes.Nodes
You add Nodes to your Topology like so:
topo.add({ 'id: '<unique-name-for-node>', 'node': '<file node is in>', 'input':'in-pipe-name', 'output':'out-pipe-name', 'outputs': { 'name': 'pipe', 'another: ['another-pipe-1', 'another-pipe-2'] } }, callback);
You can add multiple nodes by providing an array of objects instead of a single object.
To specify the location of a node relative to your topology code, use
__dirname + '/where/is/my/node.js'
.Normally you will want to store your node files in a folder called '
nodes'
in the same location as the code that is using themIt's fine making a folder called nodes and referencing each node's file directly. Be sure to use an absolute path, e.g.
__dirname + './path/to/nodes/some-node.js'
in your Topology definition, as nodes are run in their own process and will have no concept of the directory your topology exists in.As a convenience, if you pass in options to your Topolology containing
nodes_dir
: __dirname + '/path/to/nodes'` you can identify your Nodes by their filename (without an extension) and Straw will take care of finding the files for you. The demos in the examples folder do it this way.input
andoutput
can either be the key of a single pipe, or an array of pipe keys. This lets you aggregate input and branch output. If the output field is an array, the same message will be sent to each of them.// single input: 'some-pipe' output: 'that-pipe' // multiple input: ['this-pipe','that-pipe'] output: ['this-pipe','that-pipe']
You can provide multiple named outputs from a node. This lets you call
#output(<name>, message, callback)
to send a message to a specific output. Use this when you need to do routing based on the message content.Named outputs are specified as key-value pairs. The key is the name of the output. The value can be a string (single pipe) or array (multiple destinations for the same output).
// named outputs outputs: { 'futures':'futures', 'options':['options-1','options-2'] }
input
,output
andoutputs
are optional. If your node doesn't need input or output then you don't need them.STDOUT
from the node (e.g.console.log
) will be captured to the Topology and logged.Any other fields you add will be passed in to the Node as options for it to use as it sees fit.
The callback will be executed once all the nodes you provided have been added and initialized. From there, the topology is ready to start.
topology#start
will tell all the nodes to start processing - they will begin to pull methods off their pipes and process them. Thestart
method on any Nodes that have one will be called.topology#stop
will let nodes finish processing their current message, and call thestop
method on any Nodes that have one.Use the
start
andstop
methods to create nodes that do processing without relying on incoming messages (e.g connecting to a remote service and getting data)topology#destroy
will stop all the nodes and quit the Topology.topology#purge
will flush any unprocessed messages in the pipes.Nodes
These methods can/must be overridden depending on the required functionality of your node;
The
done
callbacks are required.In the
#initialize
method, you must calldone()
when you're finished.var straw = require('straw'); var Node = straw.node({ initialize: function(opts, done) { // process incoming options from the topology definition, set up // anything you need (e.g. database connection, initial data) and // when all finished run the done callback. done(); }, process: function(msg, done) { // process an incoming message. msg will be JSON. // this example just passes the message through. normally you // would do some work on it here. // ... // and send it via the default output this.output(msg, done); }, start: function(done) { // start some background processing here e.g. fetch or // generate data, set an interval timer. done(); }, stop: function(done) { // stop background processing. will be called when // pausing processing or terminating node. done(); } });
Pipes
Pipes are implemented using Redis lists -
lpush
andbrpop
.When more than one Node is connected to a given output, only one will receive each message. This lets you easily load-balance output from a node by connecting more than one downstream node to it's output.
When a node finishes processing a message it must call the
done
callback. This signals it's ready for the next message.If you want a message to go to several nodes, create multiple outputs and connect one node to each.
examples/busy-worker.js
andexamples/busy-workers.js
show this in operation.If no purge flag is set or if set to true, pipes are cleared when the Topology is started so un-processed messages from previous runs are not consumed. To retain them across restarts set purge to false.
Tap In/Out
You can connect to a Topology from existing code. These Tap methods behave the same as those you would write inside your nodes.
var tap = new straw.tap({ input: 'from-topology-pipe, output: 'to-topology-pipe, redis: { ... optional redis config ... } }); tap.output(msg); tap..on('message', function() { // ... });
Installing as a service
Once you have your Topology tested and working you'll probably want to install it as a service.
Place this somewhere like
/etc/init/myapp.conf
. The path to your node binary may be different, particularly if you are using nvm.#!upstart description "My Topology" author "simon" start on startup stop on shutdown script export HOME="/path/to/app" echo $$ > /var/run/my-app.pid exec sudo -u sysop /usr/bin/node /path/to/app/myapp.js >> /var/log/app/myapp.log 2>&1 end script
$ sudo service myapp start
Thanks
Straw takes some inspiration from Storm and Max/MSP. It uses some code and concepts from Backbone for the node definitions and event handling.
Release History
- 14/11/2012 0.1.0 Initial release
- 15/11/2012 0.1.1 StatsD support
- 22/11/2012 0.1.2 Round-robin pipes
- 23/01/2013 0.1.3 Taps
- 31/01/2013 0.1.5 Cleaning up callback usage
- 08/04/2013 0.1.6 Added pidsfile support
- 19/07/2013 0.2.0 Removed Pubsub. Enforced callbacks.
- 28/10/2013 0.2.2 Bugfixes
- 23/06/2014 0.2.5 Redis remote host fix
- 25/06/2014 0.3.0 Version 3
- 14/11/2014 0.3.1 Remove Hiredis, added bind on stop (thanks @a-s-o)
- 10/03/2015 0.3.2 Update packages
- 24/07/2016 0.4.1 Update node and deps
License
Copyright (c) 2012-2016 Simon Swain
Licensed under the MIT license.