8000 initial merge with dfoody QOS extension · JavaScriptExpert/kue@9e39ea6 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9e39ea6

Browse files
committed
initial merge with dfoody QOS extension
1 parent 275da9a commit 9e39ea6

File tree

3 files changed

+188
-18
lines changed

3 files changed

+188
-18
lines changed

lib/queue/job.js

Lines changed: 156 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ var EventEmitter = require('events').EventEmitter
1212
, events = require('./events')
1313
, redis = require('../redis')
1414
, reds = require('reds')
15+
, crypto = require('crypto')
1516
, noop = function () {
1617
};
1718

@@ -427,7 +428,25 @@ Job.prototype.attempts = function (n) {
427428
* @api public
428429
*/
429430

430-
Job.prototype.remove = function (fn) {
431+
Job.prototype.remove = function(fn){
432+
var self = this;
433+
var client = self.client;
434+
435+
this.state('removed', function(err)
436+
{
437+
if(err) return (fn || noop)(err);
438+
439+
!exports.disableSearch && getSearch().remove(self.id);
440+
441+
client.del('q:job:' + self.id, noop);
442+
client.del('q:job:' + self.id + ':state', noop);
443+
client.del('q:job:' + self.id + ':log', fn || noop);
444+
});
445+
446+
return this;
447+
};
448+
449+
/*Job.prototype.remove = function (fn) {
431450
var client = this.client;
432451
this.removeState(function (err) {
433452
client.del('q:job:' + this.id + ':log');
@@ -453,7 +472,7 @@ Job.prototype.remove = function (fn) {
453472
* @api public
454473
*/
455474

456-
Job.prototype.removeState = function (fn) {
475+
/*Job.prototype.removeState = function (fn) {
457476
var client = this.client
458477
, state = this._state;
459478
// console.log( "removeState(%d) START ", this.id, state, this._state );
@@ -462,11 +481,11 @@ Job.prototype.removeState = function (fn) {
462481
client.zrem('q:jobs:' + state, this.id);
463482
client.zrem('q:jobs:' + this.type + ':' + state, this.id);
464483
// multi.exec(function (err, replies) {
465-
// console.log( "removeState(%d) END ", this.id, state, this._state/*, replies*/ );
466-
fn && fn(/*err*/);
484+
// console.log( "removeState(%d) END ", this.id, state, this._state*//*, replies*//* );
485+
fn && fn(*//*err*//*);
467486
// }.bind(this));
468487
return this;
469-
};
488+
};*/
470489

471490
/**
472491
* Set state to `state`.
@@ -477,7 +496,7 @@ Job.prototype.removeState = function (fn) {
477496
* @api public
478497
*/
479498

480-
Job.prototype.state = function (state, fn) {
499+
/*Job.prototype.state = function (state, fn) {
481500
var client = this.client;
482501
this.removeState(function () {
483502
this._state = state;
@@ -487,17 +506,142 @@ Job.prototype.state = function (state, fn) {
487506
client.zadd('q:jobs:' + state, this._priority, this.id);
488507
client.zadd('q:jobs:' + this.type + ':' + state, this._priority, this.id);
489508
// multi.exec(function (err, replies) {
490-
// console.log( "setState(%d) End ", this.id, state, this._state/*, replies*/ );
509+
// console.log( "setState(%d) End ", this.id, state, this._state*//*, replies*//* );
491510
this.set('updated_at', Date.now());
492-
this.set('state', state, function(){
493-
// increase available jobs, used by Worker#getJob()
494-
('inactive' == state) ? client.lpush('q:' + this.type + ':jobs', 1, fn) : fn();
495-
}.bind(this));
511+
this.set('state', state, fn);
512+
// increase available jobs, used by Worker#getJob()
513+
if ('inactive' == state) client.lpush('q:' + this.type + ':jobs', 1);
496514
// }.bind(this));
497515
}.bind(this));
498516
return this;
517+
};*/
518+
/**
519+
* Set state to `state`.
520+
*
521+
* @param {String} script
522+
* @return {Job} for chaining
523+
* @api private
524+
*/
525+
526+
var scripts = {};
527+
var scriptCache = {};
528+
529+
Job.prototype.cachedEval = function(script) {
530+
var args = Array.prototype.slice.call(arguments, 0);
531+
var fn = args[args.length-1];
532+
var hash = scriptCache[script];
533+
var self = this;
534+
var client = self.client;
535+
536+
if(typeof fn != 'function') {
537+
args.push(noop);
538+
fn = noop;
539+
}
540+
541+
if(!hash) {
542+
hash = crypto.createHash('sha1').update(scripts[script], 'utf8').digest('hex');
543+
scriptCache[script] = hash;
544+
}
545+
546+
args[0] = hash;
547+
args[args.length-1] = function(err) {
548+
if(err && (err.message.indexOf('NOSCRIPT') >= 0)) {
549+
console.log("info: loading script " + script + " into cache as " + scriptCache[script]);
550+
551+
args[0] = scripts[script];
552+
args[args.length-1] = fn;
553+
554+
return client.eval.apply(client, args);
555+
}
556+
else {
557+
return fn.apply(self, arguments);
558+
}
559+
};
560+
561+
client.evalsha.apply(client, args);
562+
return this;
499563
};
500564

565+
/**
566+
* Set state to `state`.
567+
*
568+
* @param {String} state
569+
* @param {Function} fn
570+
* @return {Job} for chaining
571+
* @api public
572+
*/
573+
574+
scripts.stateLUA =
575+
"local acquire = nil\n"
576+
+ "local stateChange = nil\n"
577+
+ "acquire = function(id, priority, group)\n"
578+
+ "if not group or group == '' then return 0 end\n"
579+
+ "priority = tonumber(priority)\n"
580+
+ "if not priority then priority = 0 end\n"
581+
+ "local owner = redis.call('get', 'q:lockowners:' .. group)\n"
582+
+ "if owner and owner ~= id then\n"
583+
+ "local oprio = tonumber(redis.call('hget', 'q:job:' .. owner, 'priority'))\n"
584+
+ "local ostate = redis.call('hget', 'q:job:' .. owner, 'state')\n"
585+
+ "local otype = redis.call('hget', 'q:job:' .. owner, 'type')\n"
586+
+ "if not oprio then oprio = 0 end\n"
587+
+ "if 'inactive' == ostate and otype and oprio > priority then\n"
588+
+ "if redis.call('zrem', 'q:jobs:' .. otype .. ':inactive', owner) == 1 then\n"
589+
+ "stateChange(tostring(owner), otype, 'staged', oprio, group, nil)\n"
590+
+ "return 1\n"
591+
+ "end\n"
592+
+ "end\n"
593+
+ "end\n"
594+
+ "if not owner and redis.call('zcard', 'q:staged:' .. group) ~= 0 then\n"
595+
+ "local best = redis.call('zrange', 'q:staged:' .. group, 0, 0)[1]\n"
596+
+ "redis.call('zrem', 'q:staged:' .. group, best)\n"
597+
+ "redis.call('set', 'q:lockowners:' .. group, best)\n"
598+
+ "local bprio = tonumber(redis.call('hget', 'q:job:' .. best, 'priority'))\n"
599+
+ "local btype = redis.call('hget', 'q:job:' .. best, 'type')\n"
600+
+ "stateChange(tostring(best), btype, 'inactive', bprio, group, nil)\n"
601+
+ "return 1\n"
602+
+ "end\n"
603+
+ "return 0\n"
604+
+ "end\n"
605+
+ "stateChange = function(id, type, state, priority, group, update_by)\n"
606+
+ "local old = redis.call('getset', 'q:job:' .. id .. ':state', state)\n"
607+
+ "if state == old then return state end\n" // state has already changed
608+
+ "if 'active' == state and update_by then redis.call('hset', 'q:job:' .. id, 'update_by', update_by) end\n"
609+
+ "if 'removed' == state then redis.call('zrem', 'q:jobs', id) end\n"
610+
+ "if old then\n"
611+
+ "redis.call('zrem', 'q:jobs:' .. old, id)\n"
612+
+ "redis.call('zrem', 'q:jobs:' .. type .. ':' .. old, id)\n"
613+
+ "end\n"
614+
+ "if ('complete' == state or 'removed' == state) and group then redis.call('zrem', 'q:staged:' .. group, id) end\n"
615+
+ "if 'removed' ~= state then\n"
616+
+ "redis.call('hset', 'q:job:' .. id, 'state', state)\n"
617+
+ "redis.call('zadd', 'q:jobs', priority, id)\n"
618+
+ "redis.call('zadd', 'q:jobs:' .. state, priority, id)\n"
619+
+ "redis.call('zadd', 'q:jobs:' .. type .. ':' .. state, priority, id)\n"
620+
+ "if 'inactive' == state then redis.call('lpush', 'q:' .. type .. ':jobs', 1) end\n"
621+
+ "if 'staged' == state and group then redis.call('zadd', 'q:staged:' .. group, priority, id) end\n"
622+
+ "end\n"
623+
+ "if group and ('complete' == state or 'removed' == state or ('staged' == state and 'inactive' == old)) then\n"
624+
+ "if redis.call('get', 'q:lockowners:' .. group) == id then redis.call('del', 'q:lockowners:' .. group) end\n"
625+
+ "end\n"
626+
+ "if acquire(id, priority, group) == 1 then return redis.call('hget', 'q:job:' .. id, 'state') end\n"
627+
+ "return state\n"
628+
+ "end\n"
629+
+ "return stateChange(ARGV[1], ARGV[2], ARGV[3], ARGV[4], ARGV[5], ARGV[6])\n"
630+
;
631+
632+
Job.prototype.state = function(state, fn) {
633+
var self = this;
634+
var client = self.client;
635+
var update_by = this._heartbeat ? Date.now() + this._heartbeat : null;
636+
637+
self.cachedEval('stateLUA', 0, ''+self.id, self.type, state, self._priority, self._group, update_by, function(err, newstate) {
638+
if(err) return (fn || noop)(err);
639+
self._state = newstate;
640+
if(fn) fn();
641+
});
642+
643+
return this;
644+
};
501645
/**
502646
* Set the job's failure `err`.
503647
*
@@ -630,11 +774,7 @@ Job.prototype.update = function (fn) {
630774
this.state(this._state, fn);
631775
}.bind(this));
632776

633-
if( !exports.disableSearch ) {
634-
getSearch().index(json, this.id, function(){
635-
console.log( "add index...");
636-
}.bind(this));
637-
}
777+
!exports.disableSearch && getSearch().index(json, this.id);
638778
};
639779

640780
/**

lib 57A6 /queue/worker.js

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,36 @@ Worker.prototype.getJob = function (fn) {
249249
self.job = false;
250250
return fn(err /*|| "No job to pop!"*/);
251251
}
252-
Job.get(id, fn);
252+
// Job.get(id, fn);
253+
Job.get(id, function(err, job) {
254+
if(err || !job) {
255+
// Attempt to put the job back on the queue if it still exists
256+
// (after a short pause in case redis had a hiccup)
257+
setTimeout(function() {
258+
var lua =
259+
"local state = redis.call('hget', KEYS[1], 'state')\n"
260+
+ "local jtype = redis.call('hget', KEYS[1], 'type')\n"
261+
+ "if not state or not jtype or 'inactive' ~= state then return 0 end\n"
262+
+ "local priority = redis.call('hget', KEYS[1], 'priority')\n"
263+
+ "if not priority then priority = 0 end\n"
264+
+ "redis.call('zadd', KEYS[2], tonumber(priority), ARGV[1])\n"
265+
+ "redis.call('lpush', KEYS[3], 1)\n"
266+
+ "return 1"
267+
;
268+
var jkey = 'q:job:' + id;
269+
var qkey = 'q:jobs:' + self.type + ':inactive';
270+
var bkey = 'q:' + self.type + ':jobs';
271+
self.client.eval(lua, 3, jkey, qkey, bkey, id, function(err3, result) {
272+
if(err3 || !result) {
273+
// self.error(, "Worker.getJob can't add job back to kue", { job: id, error: err3 ? err3.message : undefined, result: result, type: self.type });
274+
} else {
275+
// self.errfn("info", "Worker.getJob added job back to kue", { job: id, type: self.type });
276+
}
277+
});
278+
}, 1500);
279+
}
280+
return fn(err, job);
281+
});
253282
});
254283
});
255284
};

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
"jade": "0.26.3",
2121
"stylus": "0.27.2",
2222
"nib": "0.5.0",
23-
"reds": "~0.2.4"
23+
"reds": "~0.2.4",
24+
"crypto": "*"
2425
},
2526
"main": "index",
2627
"devDependencies": {

0 commit comments

Comments
 (0)
0