A simple service mesh. Messages sent within the service mesh can be consistently partitioned across members of the cluster.
npm install meshage --save
Initialize a cluster/node:
const {init, GrapevineCluster} from 'meshage';
const conn = await init(
// Initialize the cluster to join
new GrapevineCluster(
process.env.CLUSTER_PORT,
(process.env.SEEDS || '').split(',')),
process.env.HTTP_PORT
)
.register('echo', message => {
// Register a message handler on the 'echo' stream
return { echoed: message };
})
.start();
Given the above example, create a cluster of nodes:
Start a seed node
CLUSTER_PORT=9742 HTTP_PORT=8080 node index.js
Start other nodes referencing the seed address to join the cluster
CLUSTER_PORT=9743 SEEDS=127.0.0.1:9742 HTTP_PORT=8081 node index.js
Each node exposes an HTTP endpoint which accepts messages for registered streams. When a request is received by any instance registered to the cluster, a consistent hashing algorithm is used to determine which node should handle the request. If the node which receives the initial HTTP request is the designated handler it will respond directly, otherwise the receiving node will route the request to the designated node within the cluster.
Request:
curl -sX POST http://localhost:8080/api/echo/$RANDOM \
-H 'Content-Type: application/json' \
-d '{"hello":"world"}'
Response:
{
"echoed": {
"hello": "world"
}
}
Nodes in a cluster will automatically negotiate a protocol to use to send/receive messages. The following protocols are registered for each node by default.
You may configure a router with specific protocols as follows:
The below example uses only the RSocket
protocol.
const router = return new meshage.DefaultMessageRouter(
cluster,
new RSocketServiceInvoker(),
new RSocketMessageListener(`${addressStr}/find`)
);
Sends a message to be handled consistently by a registered handler for the specified stream.
URL : /api/:stream/:partitionKey
URL Parameters :
stream
- the logical name for the message handler.partitionKey
- the identifier for theentity
receiving the message.
Example :
Request:
curl -sX POST http://localhost:8080/api/echo/$RANDOM \
-H 'Content-Type: application/json' \
-d '{"hello":"world"}'
Response:
{
"echoed": {
"hello": "world"
}
}
Sends a message to all registered handlers for the specified stream.
URL : /api/broadcast/:stream/:partitionKey
URL Parameters :
stream
- the logical name for the message handler.partitionKey
- the identifier for theentity
receiving the message.
Example :
Request:
curl -sX POST http://localhost:8080/api/broadcast/echo/$RANDOM \
-H 'Content-Type: application/json' \
-d '{"hello":"world"}'
Response:
[
{
"echoed": {
"hello": "world"
}
},
{
"echoed": {
"hello": "world"
}
}
]
Configures the cluster to join.
init(cluster : Cluster) : MessageRouter
cluster
- an instance ofCluster
which is responsible for advertising and discovering message handlers.address
- (optional) an host:port pair (string) or simply a numeric port (number) to listen for HTTP requests on
const conn = await init(cluster, 8080).start();
Leverages an implementation of the Gossip protocol to discover nodes and services.
GrapevineCluster(address : (string | number), seeds : (string | number)[])
address
- accepts a host:port pair (string) or simply a numeric port (number). If only a port is provided, the host defaults to127.0.0.1
.seeds
- accepts an array ofaddress
values (following the same behavior as the address argument).
// The initial node in the cluster will not have seeds
new GrapevineCluster(9473);
// Subsequent nodes in the cluster need to specify at least one existing node as a seed
new GrapevineCluster(9474, [9473]);
Connects to a consul agent/cluster for service registration.
ConsulCluster(address : (string | number), seeds : (string | number)[])
address
- an host:port pair (string) or simply a numeric port (number). The cluster address should point to the associated consul agents HTTP API (typically port 8500).seeds
- an array ofaddress
values (following the same behavior as the address argument). The seed address should be point to a consul agents serf_lan port (typically port 8301).
new ConsulCluster('127.0.0.1:8500');
Custom cluster types may be provided by implementing the core/cluster/Cluster
interface.
Registers a message handler on the node.
register(stream : string, handler : (message : Message) => any) : MessageRouter
stream
- the stream name to accept messages for.handler
- the message handler function.
node.register('someStream', message => {
return {};
});
Joins the cluster and begins advertising the nodes message handlers.
start(callback : (router : ConnectedMessageRouter) => void)
callback
- (optional) accepts a callback function which is provided a router instance. The router instance can be used to send or broadcast messages to nodes in the cluster.
const conn = await init(...).start();
const res = await conn.send({ stream: 'echo', partitionKey: 'c6c5e7f3-6228-41ce-a7ea-23ac24a08a32', data: 'hello' });
console.log(res);
The router
instance passed to the start
callback exposes two methods:
Sends a message to be handled consistently by a registered handler for the specified stream. Depending on how the message is routed, it could be handled by the node itself.
send(message : Message) : Promise<{}>
message
- the message to send
Sends a message to all registered handlers for the specified stream.
broadcast(message : Message) : Promise<{}>
message
- the message to send
Address may be supplied in the following formats:
The host and port separated by a colon.
Example
localhost:8080
Just the port (as a number or string). If no explicit hostname is provided, os.hostname()
is used to determine the host.
Example
8080
By suffixing the address with the keyword find
, the library will attempt to find an open port to listen on.
Example
localhost:find
- use localhost, but find an open portlocalhost:8080/find
- use localhost and port 8080 if available, otherwise find an open port8080/find
- use port 8080 if available, otherwise find an open portfind
- find any open port