From 5d35ef18c245dc405fa7662b463e64b7fdf75846 Mon Sep 17 00:00:00 2001 From: David Luecke Date: Sun, 6 Apr 2014 10:54:44 -0600 Subject: [PATCH] Implementing event filtering and default params for Primus. Extracted common Socket functionality. --- lib/providers/index.js | 4 +- lib/providers/socket/commons.js | 50 ++++++++++ lib/providers/{ => socket}/primus.js | 47 ++++----- lib/providers/socket/socketio.js | 63 ++++++++++++ lib/providers/socketio.js | 100 ------------------- readme.md | 17 ++++ test/providers/primus.test.js | 143 +++++++++++++++++++++++++-- 7 files changed, 292 insertions(+), 132 deletions(-) create mode 100644 lib/providers/socket/commons.js rename lib/providers/{ => socket}/primus.js (55%) create mode 100644 lib/providers/socket/socketio.js delete mode 100644 lib/providers/socketio.js diff --git a/lib/providers/index.js b/lib/providers/index.js index 8b8e267ac7..e46f18ef89 100644 --- a/lib/providers/index.js +++ b/lib/providers/index.js @@ -2,6 +2,6 @@ module.exports = { rest: require('./rest'), - socketio: require('./socketio'), - primus: require('./primus') + socketio: require('./socket/socketio'), + primus: require('./socket/primus') }; diff --git a/lib/providers/socket/commons.js b/lib/providers/socket/commons.js new file mode 100644 index 0000000000..40711e804d --- /dev/null +++ b/lib/providers/socket/commons.js @@ -0,0 +1,50 @@ +var _ = require('lodash'); + +// The position of the params parameters for a service method so that we can extend them +// default is 1 +var paramsPositions = { + find: 0, + update: 2, + patch: 2 +}; + +// Set up the service method handlers for a service and socket. +exports.setupMethodHandler = function setupMethodHandler (emitter, params, service, path, method) { + var name = path + '::' + method; + var position = typeof paramsPositions[method] !== 'undefined' ? paramsPositions[method] : 1; + + if (typeof service[method] === 'function') { + emitter.on(name, function () { + var args = _.toArray(arguments); + args[position] = _.extend({}, args[position], params); + service[method].apply(service, args); + }); + } +}; + +// Set up event handlers for a given service and connected sockets. +// Send it through the service dispatching mechanism (`removed(data, params, callback)`, +// `updated(data, params, callback)` and `created(data, params, callback)`) if it +// exists. +exports.setupEventHandler = function setupEventHandler (info, service, path, ev) { + var defaultDispatcher = function (data, params, callback) { + callback(null, data); + }; + + service.on(ev, function (data) { + // Check if there is a method on the service with the same name as the event + var dispatcher = typeof service[ev] === 'function' ? service[ev] : defaultDispatcher; + var eventName = path + ' ' + ev; + + info.emitters().forEach(function (emitter) { + dispatcher(data, info.params(emitter), function (error, dispatchData) { + if (error) { + emitter[info.method]('error', error); + } else if (dispatchData) { + emitter[info.method](eventName, dispatchData); + } + }); + }); + }); +}; + diff --git a/lib/providers/primus.js b/lib/providers/socket/primus.js similarity index 55% rename from lib/providers/primus.js rename to lib/providers/socket/primus.js index 04aeb19172..8aa2463401 100644 --- a/lib/providers/primus.js +++ b/lib/providers/socket/primus.js @@ -4,11 +4,11 @@ var _ = require('lodash'); var Proto = require('uberproto'); var Primus = require('primus'); var Emitter = require('primus-emitter'); +var commons = require('./commons'); module.exports = function(config, configurer) { return function() { var app = this; - var services = {}; app.enable('feathers primus'); @@ -23,30 +23,35 @@ module.exports = function(config, configurer) { } var primus = this.primus = new Primus(server, config); + var info = { + emitters: function() { + return primus; + }, + params: function(spark) { + return spark.request.feathers; + }, + method: 'send' + }; + primus.use('emitter', Emitter); - _.each(services, function(service, path) { - // If the service emits events that we want to listen to (Event mixin) - if (typeof service.on === 'function' && service._serviceEvents) { - _.each(service._serviceEvents, function(ev) { - service.on(ev, function(data) { - primus.forEach(function (spark) { - spark.send(path + ' ' + ev, data); - }); - }); + // For a new connection, set up the service method handlers + primus.on('connection', function (spark) { + _.each(self.services, function (service, path) { + _.each(self.methods, function (method) { + commons.setupMethodHandler(spark, spark.request.feathers, service, path, method); }); - } + }); }); - primus.on('connection', function(spark) { - _.each(services, function(service, path) { - _.each(self.methods, function(method) { - var name = path + '::' + method; - if (service[method]) { - spark.on(name, service[method].bind(service)); - } + // Set up events and event dispatching + _.each(self.services, function (service, path) { + // If the service emits events that we want to listen to (Event mixin) + if (typeof service.on === 'function' && service._serviceEvents) { + _.each(service._serviceEvents, function (ev) { + commons.setupEventHandler(info, service, path, ev); }); - }); + } }); if (typeof configurer === 'function') { @@ -56,9 +61,5 @@ module.exports = function(config, configurer) { return result; } }, app); - - app.providers.push(function(path, service) { - services[path] = service; - }); }; }; diff --git a/lib/providers/socket/socketio.js b/lib/providers/socket/socketio.js new file mode 100644 index 0000000000..fe3a9aff67 --- /dev/null +++ b/lib/providers/socket/socketio.js @@ -0,0 +1,63 @@ +'use strict'; + +var _ = require('lodash'); +var socketio = require('socket.io'); +var Proto = require('uberproto'); +var commons = require('./commons'); + +module.exports = function (config) { + return function () { + var app = this; + + app.enable('feathers socketio'); + + // Monkey patch app.setup(server) + Proto.mixin({ + setup: function (server) { + var self = this; + var result = this._super.apply(this, arguments); + + if (this.disabled('feathers socketio')) { + return result; + } + + var io = this.io = socketio.listen(server); + // The info object we can pass to commons.setupEventHandler + var info = { + emitters: function() { + return io.sockets.clients(); + }, + params: function(socket) { + return socket.handshake.feathers; + }, + method: 'emit' + }; + + // For a new connection, set up the service method handlers + io.sockets.on('connection', function (socket) { + _.each(self.services, function (service, path) { + _.each(self.methods, function (method) { + commons.setupMethodHandler(socket, socket.handshake.feathers, service, path, method); + }); + }); + }); + + // Set up events and event dispatching + _.each(self.services, function (service, path) { + // If the service emits events that we want to listen to (Event mixin) + if (typeof service.on === 'function' && service._serviceEvents) { + _.each(service._serviceEvents, function (ev) { + commons.setupEventHandler(info, service, path, ev); + }); + } + }); + + if (typeof config === 'function') { + config.call(this, io); + } + + return result; + } + }, app); + }; +}; diff --git a/lib/providers/socketio.js b/lib/providers/socketio.js deleted file mode 100644 index 6197326bb7..0000000000 --- a/lib/providers/socketio.js +++ /dev/null @@ -1,100 +0,0 @@ -'use strict'; - -var _ = require('lodash'); -var socketio = require('socket.io'); -var Proto = require('uberproto'); - -// The position of the params parameters for a service method so that we can extend them -// default is 1 -var paramsPositions = { - find: 0, - update: 2, - patch: 2 -}; - -// Set up the service method handlers for a service and socket. -function setupMethodHandler (socket, service, path, method) { - var name = path + '::' + method; - var position = typeof paramsPositions[method] !== 'undefined' ? paramsPositions[method] : 1; - - if (typeof service[method] === 'function') { - socket.on(name, function () { - var args = _.toArray(arguments); - args[position] = _.extend({}, args[position], socket.handshake.feathers); - service[method].apply(service, args); - }); - } -} - -// Set up event handlers for a given service and connected sockets. -// Send it through the service dispatching mechanism (`removed(data, params, callback)`, -// `updated(data, params, callback)` and `created(data, params, callback)`) if it -// exists. -function setupEventHandler (sockets, service, path, ev) { - var defaultDispatcher = function (data, params, callback) { - callback(null, data); - }; - - service.on(ev, function (data) { - // Check if there is a method on the service with the same name as the event - var dispatcher = typeof service[ev] === 'function' ? service[ev] : defaultDispatcher; - var eventName = path + ' ' + ev; - - sockets.clients().forEach(function (socket) { - dispatcher(data, socket.handshake.feathers, function (error, dispatchData) { - if (error) { - socket.emit('error', error); - } else if (dispatchData) { - socket.emit(eventName, dispatchData); - } - }); - }); - }); -} - -module.exports = function (config) { - return function () { - var app = this; - - app.enable('feathers socketio'); - - // Monkey patch app.setup(server) - Proto.mixin({ - setup: function (server) { - var self = this; - var result = this._super.apply(this, arguments); - - if (this.disabled('feathers socketio')) { - return result; - } - - var io = this.io = socketio.listen(server); - - // For a new connection, set up the service method handlers - io.sockets.on('connection', function (socket) { - _.each(self.services, function (service, path) { - _.each(self.methods, function (method) { - setupMethodHandler(socket, service, path, method); - }); - }); - }); - - // Set up events and event dispatching - _.each(self.services, function (service, path) { - // If the service emits events that we want to listen to (Event mixin) - if (typeof service.on === 'function' && service._serviceEvents) { - _.each(service._serviceEvents, function (ev) { - setupEventHandler(io.sockets, service, path, ev); - }); - } - }); - - if (typeof config === 'function') { - config.call(this, io); - } - - return result; - } - }, app); - }; -}; diff --git a/readme.md b/readme.md index c7f8720eba..d179701d28 100644 --- a/readme.md +++ b/readme.md @@ -135,6 +135,23 @@ In the Browser you can connect like this: ``` +Just like REST and SocketIO, the Primus request object can be extended with a `feathers` parameter during authorization which will extend the `params` for any service request: + +```js +app.configure(feathers.primus({ + transformer: 'sockjs' +}, function(primus) { + // Set up Primus authorization here + primus.authorize(function (req, done) { + req.feathers = { + user: { name: 'David' } + } + + done(); + }); +})); +``` + ## API ### listen diff --git a/test/providers/primus.test.js b/test/providers/primus.test.js index d8603695bd..199c625ed5 100644 --- a/test/providers/primus.test.js +++ b/test/providers/primus.test.js @@ -1,22 +1,35 @@ 'use strict'; +var assert = require('assert'); +var _ = require('lodash'); + var feathers = require('../../lib/feathers'); var fixture = require('./service-fixture'); var todoService = fixture.Service; var verify = fixture.verify; describe('Primus provider', function () { - var server, socket; + var server, socket, app, + socketParams = { + user: { name: 'David' }, + provider: 'socketio' + }; before(function () { - server = feathers() + app = feathers() .configure(feathers.primus({ transformer: 'socket.io' }, function(primus) { socket = new primus.Socket('http://localhost:7888'); + + primus.authorize(function (req, done) { + req.feathers = socketParams; + done(); + }); })) - .use('todo', todoService) - .listen(7888); + .use('todo', todoService); + + server = app.listen(7888); }); after(function (done) { @@ -24,6 +37,47 @@ describe('Primus provider', function () { server.close(done); }); + it('passes handshake as service parameters', function(done) { + var service = app.lookup('todo'); + var old = { + find: service.find, + create: service.create, + update: service.update, + remove: service.remove + }; + + service.find = function(params) { + assert.deepEqual(params, socketParams, 'Handshake parameters passed on proper position'); + old.find.apply(this, arguments); + }; + + service.create = function(data, params) { + assert.deepEqual(params, socketParams, 'Passed handshake parameters'); + old.create.apply(this, arguments); + }; + + service.update = function(id, data, params) { + assert.deepEqual(params, _.extend(socketParams, { + test: 'param' + }), 'Extended handshake paramters with original'); + old.update.apply(this, arguments); + }; + + service.remove = function(id, params) { + assert.equal(params.provider, 'socketio', 'Handshake parameters have priority'); + old.remove.apply(this, arguments); + }; + + socket.send('todo::create', {}, {}, function () { + socket.send('todo::update', 1, {}, { test: 'param' }, function() { + socket.send('todo::remove', 1, { provider: 'something' }, function() { + _.extend(service, old); + done(); + }); + }); + }); + }); + describe('CRUD', function () { it('::find', function (done) { socket.send('todo::find', {}, function (error, data) { @@ -92,7 +146,7 @@ describe('Primus provider', function () { name: 'created event' }; - socket.on('todo created', function (data) { + socket.once('todo created', function (data) { verify.create(original, data); done(); }); @@ -105,7 +159,7 @@ describe('Primus provider', function () { name: 'updated event' }; - socket.on('todo updated', function (data) { + socket.once('todo updated', function (data) { verify.update(10, original, data); done(); }); @@ -113,8 +167,21 @@ describe('Primus provider', function () { socket.send('todo::update', 10, original, {}, function () {}); }); + it('patched', function(done) { + var original = { + name: 'patched event' + }; + + socket.once('todo patched', function (data) { + verify.patch(12, original, data); + done(); + }); + + socket.send('todo::patch', 12, original, {}, function () {}); + }); + it('removed', function (done) { - socket.on('todo removed', function (data) { + socket.once('todo removed', function (data) { verify.remove(333, data); done(); }); @@ -122,4 +189,66 @@ describe('Primus provider', function () { socket.send('todo::remove', 333, {}, function () {}); }); }); + + describe('Event filtering', function() { + it('.created', function (done) { + var service = app.lookup('todo'); + var original = { description: 'created event test' }; + var oldCreated = service.created; + + service.created = function(data, params, callback) { + assert.deepEqual(params, socketParams); + verify.create(original, data); + + callback(null, _.extend({ processed: true }, data)); + }; + + socket.send('todo::create', original, {}, function() {}); + + socket.once('todo created', function (data) { + service.created = oldCreated; + // Make sure Todo got processed + verify.create(_.extend({ processed: true }, original), data); + done(); + }); + }); + + it('.updated', function (done) { + var original = { + name: 'updated event' + }; + + socket.once('todo updated', function (data) { + verify.update(10, original, data); + done(); + }); + + socket.send('todo::update', 10, original, {}, function () {}); + }); + + it('.removed', function (done) { + var service = app.lookup('todo'); + var oldRemoved = service.removed; + + service.removed = function(data, params, callback) { + assert.deepEqual(params, socketParams); + + if(data.id === 23) { + // Only dispatch with given id + return callback(null, data); + } + + callback(); + }; + + socket.send('todo::remove', 1, {}, function() {}); + socket.send('todo::remove', 23, {}, function() {}); + + socket.on('todo removed', function (data) { + service.removed = oldRemoved; + assert.equal(data.id, 23); + done(); + }); + }); + }); });