8000 RPC tutorial · sardesh/rabbitmq-tutorials@6aacc23 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6aacc23

Browse files
committed
RPC tutorial
1 parent 087d51a commit 6aacc23

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

javascript-nodejs/src/rpc_client.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#!/usr/bin/env node
2+
3+
var amqp = require('amqplib');
4+
var when = require('when');
5+
6+
var conn = amqp.connect('amqp://localhost')
7+
conn.then(createChannel).then(null, console.warn);
8+
9+
function createChannel(conn) {
10+
return when(conn.createChannel().then(requestFib)).ensure(function() { conn.close(); });
11+
}
12+
13+
function requestFib(ch) {
14+
var answer = when.defer();
15+
var correlationId = generateUuid();
16+
17+
function maybeAnswer(msg) {
18+
if (msg.properties.correlationId === correlationId) {
19+
answer.resolve(msg.content.toString());
20+
}
21+
}
22+
23+
var ok = ch.assertQueue('', {exclusive: true})
24+
.then(function(qok) { return qok.queue; });
25+
26+
ok = ok.then(function(queue) {
27+
return ch.consume(queue, maybeAnswer, {noAck: true})
28+
.then(function() { return queue; });
29+
});
30+
31+
ok = ok.then(function(queue) {
32+
console.log(' [x] Requesting fib(30)');
33+
ch.sendToQueue('rpc_queue', new Buffer('30'), {
34+
correlationId: correlationId, replyTo: queue
35+
});
36+
return answer.promise;
37+
});
38+
39+
return ok.then(function(fibN) {
40+
console.log(' [.] Got %d', fibN);
41+
});
42+
}
43+
44+
function generateUuid() {
45+
return Math.random().toString() + Math.random().toString() + Math.random().toString();
46+
}

javascript-nodejs/src/rpc_server.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#!/usr/bin/env node
2+
3+
var amqp = require('amqplib');
4+
5+
var conn = amqp.connect('amqp://localhost');
6+
conn.then(createChannel).then(null, console.warn);
7+
8+
function createChannel(conn) {
9+
process.once('SIGINT', function() { conn.close(); });
10+
return conn.createChannel().then(consume);
11+
}
12+
13+
function consume(ch) {
14+
var ok = ch.assertQueue< 8000 span class=pl-kos>('rpc_queue', {durable: false});
15+
ok = ok.then(function() {
16+
ch.prefetch(1);
17+
return ch.consume('rpc_queue', reply);
18+
});
19+
20+
return ok.then(function(_ignore) {
21+
console.log(' [x] Awaiting RPC requests');
22+
});
23+
24+
function reply(msg) {
25+
var n = parseInt(msg.content.toString());
26+
console.log(' [.] fib(%d)', n);
27+
var response = fib(n);
28+
ch.sendToQueue( msg.properties.replyTo,
29+
new Buffer(response.toString()),
30+
{correlationId: msg.properties.correlationId});
31+
ch.ack(msg);
32+
}
33+
}
34+
35+
function fib(n) {
36+
if(n == 0)
37+
return 0;
38+
else if(n == 1)
39+
return 1;
40+
else
41+
return fib(n-1) + fib(n-2);
42+
}

0 commit comments

Comments
 (0)
0