@boringnode/bus
is a service bus implementation for Node.js. It is designed to be simple and easy to use.
Currently, it supports the following transports:
👉 Memory: A simple in-memory transport for testing purposes.
👉 Redis: A Redis transport for production usage.
👉 Mqtt: A Mqtt transport for production usage.
npm install @boringnode/bus
The module exposes a manager that can be used to register buses.
import { BusManager } from '@boringnode/bus'
import { redis } from '@boringnode/bus/transports/redis'
import { mqtt } from '@boringnode/bus/transports/mqtt'
import { memory } from '@boringnode/bus/transports/memory'
const manager = new BusManager({
default: 'main',
transports: {
main: {
transport: memory(),
},
redis: {
transport: redis({
host: 'localhost',
port: 6379,
}),
},
mqtt: {
transport: mqtt({
host: 'localhost',
port: 1883,
}),
},
}
})
Once the manager is created, you can subscribe to channels and publish messages.
manager.subscribe('channel', (message) => {
console.log('Received message', message)
})
manager.publish('channel', 'Hello world')
By default, the bus will use the default
transport. You can specify different transport by using the use
method.
manager.use('redis').publish('channel', 'Hello world')
manager.use('mqtt').publish('channel', 'Hello world')
If you don't need multiple buses, you can create a single bus directly by importing the transports and the Bus class.
import { Bus } from '@boringnode/bus'
import { RedisTransport } from '@boringnode/bus/transports/redis'
const transport = new RedisTransport({
host: 'localhost',
port: 6379,
})
const bus = new Bus(transport, {
retryQueue: {
retryInterval: '100ms'
}
})
The bus also supports a retry queue. When a message fails to be published, it will be moved to the retry queue.
For example, your Redis server is down.
const manager = new BusManager({
default: 'main',
transports: {
main: {
transport: redis({
host: 'localhost',
port: 6379,
}),
retryQueue: {
retryInterval: '100ms'
}
},
}
})
manager.use('redis').publish('channel', 'Hello World')
The message will be moved to the retry queue and will be retried every 100ms.
You have multiple options to configure the retry queue.
export interface RetryQueueOptions {
// Enable the retry queue (default: true)
enabled?: boolean
// Defines if we allow duplicates messages in the retry queue (default: true)
removeDuplicates?: boolean
// The maximum size of the retry queue (default: null)
maxSize?: number | null
// The interval between each retry (default: false)
retryInterval?: Duration | false
}
The module also provides some test helpers to make it easier to test the code that relies on the bus. First, you can use the MemoryTransport
to create a bus that uses an in-memory transport.
You can also use the ChaosTransport
to simulate a transport that fails randomly, in order to test the resilience of your code.
import { Bus } from '@boringnode/bus'
import { ChaosTransport } from '@boringnode/bus/test_helpers'
const buggyTransport = new ChaosTransport(new MemoryTransport())
const bus = new Bus(buggyTransport)
/**
* Now, every time you will try to publish a message, the transport
* will throw an error.
*/
buggyTransport.alwaysThrow()