Hustle is a javascript queuing and messaging library built on top of IndexedDB.
The idea is that sometimes you have two pieces of an application that need to talk to each other, but don't (or shouldn't) have access to each other's memory. Hustle lets them talk through IndexedDB, giving your app a framework for handling tasks and passing messages, all persisted.
If the use case sounds very specific, it is. Hustle was built to make the syncing system in Turtl more scalable by breaking out messaging and queuing into a separate system than having it married to the main database. It shines when you have an app that works offline but syncs to a server somewhere: you can queue up changes to your local data and have them synced to your API when a connection becomes available without having to worry about losing your jobs/messages between app restarts.
var hustle = new Hustle({
tubes: ['jobs'],
db_version: 2 // should increase whenever tubes change
});
Create our Hustle object. This provides the interface for all our messaging and
queuing needs. Note that we specify tubes
. Think of a tube as a table in a
database: it logically separates the different kinds of queue items you'll be
passing through Hustle. Throughout the Queue API, you're able
to specify which tube you're operating on. If undefined, the default
tube is
used (which is always created on init).
hustle.open({
success: function() {
console.log('database opened!');
},
error: function(e) {
console.error('error opening database: ', e);
}
});
Open the Hustle database so we can start messaging and queueing.
hustle.Queue.put({task: 'rob_bank'}, {
tube: 'jobs', // if unspecified, will use tube "default"
success: function(item) {
console.log('added item: ', item.id);
},
error: function(e) {
console.error('failed to add job: ', e);
}
});
Now we put a new message into the queue. The first argument,
{task: 'rob_bank'}
, is our message. This can be any arbitrary javascript
object. Once complete, our success
callback is given a queue item (with a
queue-assigned id
value we can use to reference the message with later on).
hustle.Queue.reserve({
tube: 'jobs', // if unspecified, will use tube "default"
success: function(item) {
console.log('heyyy man...you got a job!', item);
},
error: function(e) {
console.error('failed to reserve job: ', e);
}
});
Now we reserve an item. This removes the job from its "ready" state and puts it into a reserved state, meaning that nobody else can reserve that job unless it's released back onto the tube.
Note that reserve's success
function is triggered even if no items are found,
so be sure to check if item
is null before using.
hustle.Queue.delete(item.id, {
success: function() {
console.log('job '+ item.id +' deleted.');
},
error: function(e) {
console.error('failed to delete job: ', e);
}
});
Once you are satisfied that a job has fulfilled its purpose, it makes sense to
delete it so it doesn't sit there gumming up your reserved
items. Note that we don't have to specify a tube for delete
...the command
works across all tubes, as all job IDs are unique across tubes.
var consumer = new hustle.Queue.Consumer(function(job) {
console.log('got job! ', job);
hustle.Queue.delete(job.id);
}, { tube: 'jobs' });
A consumer listens to a particular tube and calls the
given function for each job it gets. It will do this indefinitely until
consumer.stop()
is called.
- Hustle class
- Hustle.open
- Hustle.close
- Hustle.is_open
- Hustle.wipe
- Hustle.clearHustle
- Hustle.insertDataInHustle
- Hustle.backupDB
- Hustle.backupAll
- Hustle.Queue
var hustle = new Hustle({
db_name: 'hustle',
db_version: 1,
housekeeping_delay: 1000,
message_lifetime: 10000,
tubes: ['default']
});
Creates a Hustle object. Note that the tubes the queue uses must be specified. You cannot use queue tubes that haven't been declared.
db_name
specifies the name we want to open the Hustle queue onto. Default:"hustle"
db_version
is the IndexedDB version number to open the database under. This should change whenever your tubes change or else (or else what? or else they will probably not be updated in the schema). Default:1
maintenance_delay
is a value (in ms) that determines how often this Hustle object will do DB maintenance (moving expired/delayed jobs back to the ready state, mainly). Default:1000
tubes
specifies what tubes we want to be present on open. Default:['default']
hustle.open({
success: function(event) { ... },
error: function(event) { ... }
});
Opens/creates/upgrades the Hustle DB.
success
is fired when the IndexedDB store has been opened successfully and is ready for pounding.error
is fired if something bad happens duringopen
.
hustle.close()
=> boolean
Closes the Hustle database. Returns true
if the DB was closed, otherwise
returns false
(if the DB wasn't open).
This function is synchronous.
hustle.is_open()
=> boolean
This function is synchronous.
hustle.wipe();
Closes the Hustle database and obliterates it. Very useful for debugging apps
or if you have no interest in actually persisting, you can call wipe()
each
time your app loads just before you call open.
This function is synchronous.
hustle.clearHustle();
Clears data from all tubes in the Hustle DB.
hustle.insertDataInHustle(data, tube);
Inserts data in the respective tube.
hustle.backupDB();
Returns all tubes from Hustle DB.
hustle.backupAll(tube);
Returns data with respect to tube name.
The Hustle queue system allows jobs to be atomically grabbed and operated on by any number of workers. Each job can only be reserved by one worker at a time.
Hustle.Queue takes heavy inspiration from beanstalkd, in fact most functions have the same names as the beanstalkd protocol.
All items added to the Hustle queue follow this basic format:
{
// the item's Hustle-assigned unique id
id: 6969,
// the item's priority (lower is more important, defaults to 1024)
priority: 1024,
// the item's user-specified data payload
data: ...,
// how old the item is
age: 0,
// how many times this item has been reserved
reserves: 0,
// how many times this item has been released
releases: 0,
// how many times this item has timed out
timeouts: 0,
// how many times this item has been buried
buries: 0,
// how many times this item has been kicked
kicks: 0,
// how many seconds left this job has to run before expiring (and being moved to the ready state)
time_left: 0,
// what state this item is in (set by peek)
state: 'ready|buried|reserved',
// when this item was created (new Date().getTime())
created: 1391835692616
}
hustle.Queue.peek(item_id, {
success: function(item) { ... },
error: function(event) { ... }
});
Grabs a queue item by ID. peek
checks all tables (reserved, buried, and all
tubes) for the item. This is fairly simple because every item in the Hustle DB
has a unique ID.
Items grabbed by peek
follow the standard item format.
Note that peek's success
function is triggered even if the item isn't found,
so be sure to check if item
is null before using.
success
is fired when the lookup is done. The first argument is the queue item (ornull
if not found). The item will haveitem.tube
anditem.state
set appropriately.error
is fired if there was a problem looking up that queue item.
hustle.Queue.put(job_data, {
tube: 'default',
priority: 1024,
delay: 1000,
ttr: 20,
success: function(item) { ... },
error: function(e) { ... }
});
Puts a new item into the queue.
tube
specifies the tube we're putting this item into. Defaults to "default".priority
specifies this item's priority.0
is the most important, with anything higher getting less important. Defaults:1024
delay
is how many seconds to wait before the job becomes ready. Default:0
ttr
is how many seconds the job has to live once reserved. If this many seconds passes before the job is deleted or released, the job is automatically put back into the ready state. Note that you can reset the ttr timer using the touch command. Set to0
to disable the ttr. Default:0
success
is fired when the job has been added to the queue. The first argument is the full item that was passed back (which is in the standard format). You may want to make note of the item's ID (item.id
) because this will allow you to reference the job later on if needed (via peek, delete, bury, etc).error
is fired when there was a problem adding the item to the queue.
hustle.Queue.reserve({
tube: 'default',
success: function(item) { ... },
error: function(e) { ... }
});
Pulls the next available item off of the specified tube.
Note that reserve's success
function is triggered even if no items are found,
so be sure to check if item
is null before using.
tube
specifies which tube to pull from. Defaults to "default".success
is fired when the reserve command finishes. The first argument is the job we pulled off the queue (ornull
of the tube is empty). It is in the standard format. You'll want to make note of the item's ID (item.id
) because it will let you delete the job once you no longer need it.error
is fired if there was a problem reserving the item.
hustle.Queue.delete(item_id, {
success: function(item) { ... },
error: function(e) { ... }
});
Deletes the item with the given ID. Because item IDs are unique across all tubes, there's no need to specify the tube we're deleting from.
It's important that if you get a job via reserve and it completes
successfully that you then delete
the job. If you don't do this, you're going
to have jobs living forever in your reserved table gumming things up. If you
really want to save a particular job for later inspection/logging, consider
burying it.
success
is fired when complete. The first argument is the item (in the standard format) that was deleted ornull
if the item wasn't found.error
is fired when there was a problem deleting the item.
hustle.Queue.release(item_id, {
priority: 1024,
delay: 0,
success: function() { ... },
error: function(e) { ... }
});
Releases an item back into the queue. This un-reserves an item and makes it available on its original tube for others to consume via reserve.
priority
specifies the new priority to set on the item being released. If unspecified, will default to the item's original priority. Default:1024
delay
specifies how many seconds the jb must wait before becoming ready after releasing it. Default:0
success
is fired when the item is released back into the queue.error
is fired if something went wrong while releasing.
hustle.Queue.bury(item_id, {
priority: 1024,
success: function() { ... },
error: function(e) { ... }
});
Calling bury
moves an item into cold storage. It's a great way to keep items
that fail a lot from plugging up your queue. You can read properties like
item.reserves and determine how many times a job has been
reserved and released and add logic to say "if this job
has been reserved over 5 times, bury it for later."
Once an item is buried, it can only be released back into the queue by using kick or kick_job.
Items are buried in FIFO order.
priority
allows you to re-assign the item's priority prior to being buried. If unspecified, will use the item's current priority value.success
is fired when the bury operation is finished.error
is fired if something goes wrong while burying.
hustle.Queue.kick(num, {
success: function(count) { ... },
error: function(e) { ... }
});
Kick removes the first num
items from the bury state and puts them
into a ready state in their respective tubes.
num
is the number of jobs to kicksuccess
is fired when the operation completed, with the first argument being the actual number of jobs that were kicked.error
is fired if something goes wrong while kicking.
hustle.Queue.kick_job(item_id, {
success: function() { ... },
error: function(e) { ... }
});
Kicks a specific item by id, as opposed to kicking the first N items (like kick).
item_id
is the ID of the item we want to kick.success
is fired when the operation completes.error
is fired if something goes wrong while kicking.
hustle.Queue.touch(id, {
success: function() { ... },
error: function(e) { ... }
});
Reset an item's time to run value (ie reset the timer that moves it to the ready state if it isn't released/deleted within a certain amount of time).
id
specifies the ID of the item we're resetting the timer for.success
is fired when finished.error
is fired when something goes wrong.
hustle.Queue.count_ready(tube, {
success: function(count) { ... },
error: function(e) { ... }
});
Count the number of ready items in a tube.
tube
specifies the tube we want to about the items for.success
is fired when finished, with the first argument being the item count.error
is fired when something goes wrong.
var consumer = new hustle.Queue.Consumer(consume_fn, {
tube: 'default',
delay: 100,
enable_fn: function() { ... },
error: function(e) { ... }
});
The Consumer class provides an interface to watch a particular tube and call a function for each item that is put into it. It currently works by polling every X milliseconds (100 by default).
consume_fn
is a function, of one argument, which will be called for each job entered into the tube. The value passed in is a queue item.tube
is the name of the tube we want to devour (default is "default").delay
is the delay (in ms) between polls to the tube. IndexedDB doesn't have a blocking interface, so polling is the only option, as far as I know.enable_fn
is an optional function you pass that the consumer calls before each time it polls for queue items. If the function returnsfalse
then the polling is stopped.error
is triggered if there are any problems while consuming.
The returned object has two methods:
consumer.start()
starts the consumer. Note that it starts on instantiation, so you don't need to call this unless you previously calledconsumer.stop()
.consumer.stop()
stops the consumer from polling the queue. Can be started again viaconsumer.start()
.
This details some of the exceptions that can be thrown by Hustle. These classes
are available in the static Hustle.Error
namespace.
Thrown when you try to do any operations in Hustle and the database is closed. Make sure you open it first!
Thrown when you try to open the DB and it's already opened through the current Hustle object.
Thrown when you try to access a tube that doesn't exist. Be sure to declare your
tubes when instantiating Hustle and bump up the db_version
property.
Thrown when a bad ID value (like null
) is passed to a function that takes an
ID (like peek, delete, etc).
Thrown when an operation is performed on an item that doesn't exist (or isn't in the location it's supposed to be in).
Hustle allows using bluebird for a promise API.
The promise API is activated by calling hustle.promisify()
:
var hustle = new Hustle();
hustle.promisify();
This replaces all public API functions that take success
/error
options
(any non-synchronous function) to return a promise object instead.
var hustle = new Hustle();
hustle.promisify();
hustle.open().then(function() {
return hustle.Queue.put('fetch me my slippers');
}).then(function() {
return hustle.Queue.reserve();
}).then(function(item) {
console.log("WHAT?? I don't take orders from you...");
return hustle.Queue.delete(item.id)
}).catch(function(e) {
console.error('something went wrong: ', e);
});
Notice how we can chain a number of calls at the top level and have only one error handler for the lot. Very nice.
Just navigate ur browser to Hustle/test/
and let the magic happen.
MIT. JOY.