8000 Topics tutorial · syk-coder/rabbitmq-tutorials@087d51a · GitHub
[go: up one dir, main page]

Skip to content

Commit 087d51a

Browse files
committed
Topics tutorial
1 parent a6f70b3 commit 087d51a

File tree

2 files changed

+73
-0
lines changed

2 files changed

+73
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#!/usr/bin/env node
2+
3+
var amqp = require('amqplib');
4+
var when = require('when');
5+
6+
var conn = amqp.connect('amqp://localhost')
7+
conn.then(createChannel).then(null, console.warn);
8+
9+
function createChannel(conn) {
10+
return when(conn.createChannel().then(logMessage)).ensure(function() { conn.close(); });
11+
}
12+
13+
function logMessage(ch) {
14+
var ex = 'topic_logs';
15+
var ok = ch.assertExchange(ex, 'topic', {durable: false})
16+
17+
return ok.then(function() {
18+
var args = process.argv.slice(2);
19+
var msg = args.slice(1).join(' ') || 'Hello World!';
20+
var key = (args.length > 0) ? args[0] : 'anonymous.info';
21+
22+
ch.publish(ex, key, new Buffer(msg));
23+
console.log(" [x] Sent %s:'%s'", key, msg);
24+
return ch.close();
25+
});
26+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#!/usr/bin/env node
2+
3+
usage();
4+
5+
var amqp = require('amqplib');
6+
var all = require('when').all;
7+
var conn = amqp.connect('amqp://localhost')
8+
var ch = conn.then(createChannel).then(null, console.warn);
9+
10+
ch.then(function(ch) {
11+
var x = ch.assertExchange('topic_logs', 'topic', {durable: false});
12+
var q = x.then(function() {
13+
return ch.assertQueue('', {exclusive: true});
14+
});
15+
16+
var ok = q.then(function(qok) {
17+
var queue = qok.queue;
18+
var keys = process.argv.slice(2);
19+
20+
return all(keys.map(function(key) {
21+
ch.bindQueue(queue, 'topic_logs', key);
22+
})).then(function() { return queue; });
23+
});
24+
25+
ok = ok.then(function(queue) {
26+
return ch.consume(queue, logMessage, {noAck: true});
27+
});
28+
return ok.then(function() {
29+
console.log(' [*] Waiting for logs. To exit press CTRL+C');
30+
});
31+
32+
function logMessage(msg) {
33+
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
34+
}
35+
});
36+
37+
function createChannel(conn) {
38+
process.once('SIGINT', function() { conn.close(); });
39+
return conn.createChannel();
40+
}
41+
42+
function usage() {
43+
if (process.argv.length < 3) {
44+
console.log("Usage: receive_logs_topic.js binding_key [binding_key...]");
45+
process.exit(1);
46+
}
47+
}

0 commit comments

Comments
 (0)
0