@@ -12,6 +12,7 @@ var EventEmitter = require('events').EventEmitter
12
12
, events = require ( './events' )
13
13
, redis = require ( '../redis' )
14
14
, reds = require ( 'reds' )
15
+ , crypto = require ( 'crypto' )
15
16
, noop = function ( ) {
16
17
} ;
17
18
@@ -427,7 +428,25 @@ Job.prototype.attempts = function (n) {
427
428
* @api public
428
429
*/
429
430
430
- Job . prototype . remove = function ( fn ) {
431
+ Job . prototype . remove = function ( fn ) {
432
+ var self = this ;
433
+ var client = self . client ;
434
+
435
+ this . state ( 'removed' , function ( err )
436
+ {
437
+ if ( err ) return ( fn || noop ) ( err ) ;
438
+
439
+ ! exports . disableSearch && getSearch ( ) . remove ( self . id ) ;
440
+
441
+ client . del ( 'q:job:' + self . id , noop ) ;
442
+ client . del ( 'q:job:' + self . id + ':state' , noop ) ;
443
+ client . del ( 'q:job:' + self . id + ':log' , fn || noop ) ;
444
+ } ) ;
445
+
446
+ return this ;
447
+ } ;
448
+
449
+ /*Job.prototype.remove = function (fn) {
431
450
var client = this.client;
432
451
this.removeState(function (err) {
433
452
client.del('q:job:' + this.id + ':log');
@@ -453,7 +472,7 @@ Job.prototype.remove = function (fn) {
453
472
* @api public
454
473
*/
455
474
456
- Job . prototype . removeState = function ( fn ) {
475
+ /* Job.prototype.removeState = function (fn) {
457
476
var client = this.client
458
477
, state = this._state;
459
478
// console.log( "removeState(%d) START ", this.id, state, this._state );
@@ -462,11 +481,11 @@ Job.prototype.removeState = function (fn) {
462
481
client.zrem('q:jobs:' + state, this.id);
463
482
client.zrem('q:jobs:' + this.type + ':' + state, this.id);
464
483
// multi.exec(function (err, replies) {
465
- // console.log( "removeState(%d) END ", this.id, state, this._state/ *, replies*/ );
466
- fn && fn ( / *err*/) ;
484
+ // console.log( "removeState(%d) END ", this.id, state, this._state*/ / *, replies*//* );
485
+ fn && fn(*/ / *err*//* );
467
486
// }.bind(this));
468
487
return this;
469
- } ;
488
+ };*/
470
489
471
490
/**
472
491
* Set state to `state`.
@@ -477,7 +496,7 @@ Job.prototype.removeState = function (fn) {
477
496
* @api public
478
497
*/
479
498
480
- Job . prototype . state = function ( state , fn ) {
499
+ /* Job.prototype.state = function (state, fn) {
481
500
var client = this.client;
482
501
this.removeState(function () {
483
502
this._state = state;
@@ -487,17 +506,142 @@ Job.prototype.state = function (state, fn) {
487
506
client.zadd('q:jobs:' + state, this._priority, this.id);
488
507
client.zadd('q:jobs:' + this.type + ':' + state, this._priority, this.id);
489
508
// multi.exec(function (err, replies) {
490
- // console.log( "setState(%d) End ", this.id, state, this._state/ *, replies*/ );
509
+ // console.log( "setState(%d) End ", this.id, state, this._state*/ / *, replies*//* );
491
510
this.set('updated_at', Date.now());
492
- this . set ( 'state' , state , function ( ) {
493
- // increase available jobs, used by Worker#getJob()
494
- ( 'inactive' == state ) ? client . lpush ( 'q:' + this . type + ':jobs' , 1 , fn ) : fn ( ) ;
495
- } . bind ( this ) ) ;
511
+ this.set('state', state, fn);
512
+ // increase available jobs, used by Worker#getJob()
513
+ if ('inactive' == state) client.lpush('q:' + this.type + ':jobs', 1);
496
514
// }.bind(this));
497
515
}.bind(this));
498
516
return this;
517
+ };*/
518
+ /**
519
+ * Set state to `state`.
520
+ *
521
+ * @param {String } script
522
+ * @return {Job } for chaining
523
+ * @api private
524
+ */
525
+
526
+ var scripts = { } ;
527
+ var scriptCache = { } ;
528
+
529
+ Job . prototype . cachedEval = function ( script ) {
530
+ var args = Array . prototype . slice . call ( arguments , 0 ) ;
531
+ var fn = args [ args . length - 1 ] ;
532
+ var hash = scriptCache [ script ] ;
533
+ var self = this ;
534
+ var client = self . client ;
535
+
536
+ if ( typeof fn != 'function' ) {
537
+ args . push ( noop ) ;
538
+ fn = noop ;
539
+ }
540
+
541
+ if ( ! hash ) {
542
+ hash = crypto . createHash ( 'sha1' ) . update ( scripts [ script ] , 'utf8' ) . digest ( 'hex' ) ;
543
+ scriptCache [ script ] = hash ;
544
+ }
545
+
546
+ args [ 0 ] = hash ;
547
+ args [ args . length - 1 ] = function ( err ) {
548
+ if ( err && ( err . message . indexOf ( 'NOSCRIPT' ) >= 0 ) ) {
549
+ console . log ( "info: loading script " + script + " into cache as " + scriptCache [ script ] ) ;
550
+
551
+ args [ 0 ] = scripts [ script ] ;
552
+ args [ args . length - 1 ] = fn ;
553
+
554
+ return client . eval . apply ( client , args ) ;
555
+ }
556
+ else {
557
+ return fn . apply ( self , arguments ) ;
558
+ }
559
+ } ;
560
+
561
+ client . evalsha . apply ( client , args ) ;
562
+ return this ;
499
563
} ;
500
564
565
+ /**
566
+ * Set state to `state`.
567
+ *
568
+ * @param {String } state
569
+ * @param {Function } fn
570
+ * @return {Job } for chaining
571
+ * @api public
572
+ */
573
+
574
+ scripts . stateLUA =
575
+ "local acquire = nil\n"
576
+ + "local stateChange = nil\n"
577
+ + "acquire = function(id, priority, group)\n"
578
+ + "if not group or group == '' then return 0 end\n"
579
+ + "priority = tonumber(priority)\n"
580
+ + "if not priority then priority = 0 end\n"
581
+ + "local owner = redis.call('get', 'q:lockowners:' .. group)\n"
582
+ + "if owner and owner ~= id then\n"
583
+ + "local oprio = tonumber(redis.call('hget', 'q:job:' .. owner, 'priority'))\n"
584
+ + "local ostate = redis.call('hget', 'q:job:' .. owner, 'state')\n"
585
+ + "local otype = redis.call('hget', 'q:job:' .. owner, 'type')\n"
586
+ + "if not oprio then oprio = 0 end\n"
587
+ + "if 'inactive' == ostate and otype and oprio > priority then\n"
588
+ + "if redis.call('zrem', 'q:jobs:' .. otype .. ':inactive', owner) == 1 then\n"
589
+ + "stateChange(tostring(owner), otype, 'staged', oprio, group, nil)\n"
590
+ + "return 1\n"
591
+ + "end\n"
592
+ + "end\n"
593
+ + "end\n"
594
+ + "if not owner and redis.call('zcard', 'q:staged:' .. group) ~= 0 then\n"
595
+ + "local best = redis.call('zrange', 'q:staged:' .. group, 0, 0)[1]\n"
596
+ + "redis.call('zrem', 'q:staged:' .. group, best)\n"
597
+ + "redis.call('set', 'q:lockowners:' .. group, best)\n"
598
+ + "local bprio = tonumber(redis.call('hget', 'q:job:' .. best, 'priority'))\n"
599
+ + "local btype = redis.call('hget', 'q:job:' .. best, 'type')\n"
600
+ + "stateChange(tostring(best), btype, 'inactive', bprio, group, nil)\n"
601
+ + "return 1\n"
602
+ + "end\n"
603
+ + "return 0\n"
604
+ + "end\n"
605
+ + "stateChange = function(id, type, state, priority, group, update_by)\n"
606
+ + "local old = redis.call('getset', 'q:job:' .. id .. ':state', state)\n"
607
+ + "if state == old then return state end\n" // state has already changed
608
+ + "if 'active' == state and update_by then redis.call('hset', 'q:job:' .. id, 'update_by', update_by) end\n"
609
+ + "if 'removed' == state then redis.call('zrem', 'q:jobs', id) end\n"
610
+ + "if old then\n"
611
+ + "redis.call('zrem', 'q:jobs:' .. old, id)\n"
612
+ + "redis.call('zrem', 'q:jobs:' .. type .. ':' .. old, id)\n"
613
+ + "end\n"
614
+ + "if ('complete' == state or 'removed' == state) and group then redis.call('zrem', 'q:staged:' .. group, id) end\n"
615
+ + "if 'removed' ~= state then\n"
616
+ + "redis.call('hset', 'q:job:' .. id, 'state', state)\n"
617
+ + "redis.call('zadd', 'q:jobs', priority, id)\n"
618
+ + "redis.call('zadd', 'q:jobs:' .. state, priority, id)\n"
619
+ + "redis.call('zadd', 'q:jobs:' .. type .. ':' .. state, priority, id)\n"
620
+ + "if 'inactive' == state then redis.call('lpush', 'q:' .. type .. ':jobs', 1) end\n"
621
+ + "if 'staged' == state and group then redis.call('zadd', 'q:staged:' .. group, priority, id) end\n"
622
+ + "end\n"
623
+ + "if group and ('complete' == state or 'removed' == state or ('staged' == state and 'inactive' == old)) then\n"
624
+ + "if redis.call('get', 'q:lockowners:' .. group) == id then redis.call('del', 'q:lockowners:' .. group) end\n"
625
+ + "end\n"
626
+ + "if acquire(id, priority, group) == 1 then return redis.call('hget', 'q:job:' .. id, 'state') end\n"
627
+ + "return state\n"
628
+ + "end\n"
629
+ + "return stateChange(ARGV[1], ARGV[2], ARGV[3], ARGV[4], ARGV[5], ARGV[6])\n"
630
+ ;
631
+
632
+ Job . prototype . state = function ( state , fn ) {
633
+ var self = this ;
634
+ var client = self . client ;
635
+ var update_by = this . _heartbeat ? Date . now ( ) + this . _heartbeat : null ;
636
+
637
+ self . cachedEval ( 'stateLUA' , 0 , '' + self . id , self . type , state , self . _priority , self . _group , update_by , function ( err , newstate ) {
638
+ if ( err ) return ( fn || noop ) ( err ) ;
639
+ self . _state = newstate ;
640
+ if ( fn ) fn ( ) ;
641
+ } ) ;
642
+
643
+ return this ;
644
+ } ;
501
645
/**
502
646
* Set the job's failure `err`.
503
647
*
@@ -630,11 +774,7 @@ Job.prototype.update = function (fn) {
630
774
this . state ( this . _state , fn ) ;
631
775
} . bind ( this ) ) ;
632
776
633
- if ( ! exports . disableSearch ) {
634
- getSearch ( ) . index ( json , this . id , function ( ) {
635
- console . log ( "add index..." ) ;
636
- } . bind ( this ) ) ;
637
- }
777
+ ! exports . disableSearch && getSearch ( ) . index ( json , this . id ) ;
638
778
} ;
639
779
640
780
/**
0 commit comments