8000 refactored Topics tutorial · lxf/rabbitmq-tutorials@bd9f582 · GitHub
[go: up one dir, main page]

Skip to content

Commit bd9f582

Browse files
committed
refactored Topics tutorial
1 parent 8f64451 commit bd9f582

File tree

2 files changed

+31
-56
lines changed

2 files changed

+31
-56
lines changed
Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,18 @@
11
#!/usr/bin/env node
22

3-
var amqp = require('amqplib');
4-
var when = require('when');
3+
var amqp = require('amqplib/callback_api');
54

6-
var conn = amqp.connect('amqp://localhost')
7-
conn.then(createChannel).then(null, console.warn);
8-
9-
function createChannel(conn) {
10-
return when(conn.createChannel().then(logMessage)).ensure(function() { conn.close(); });
11-
}
12-
13-
function logMessage(ch) {
14-
var ex = 'topic_logs';
15-
var ok = ch.assertExchange(ex, 'topic', {durable: false})
16-
17-
return ok.then(function() {
5+
amqp.connect('amqp://localhost', function(err, conn) {
6+
conn.createChannel(function(err, ch) {
7+
var ex = 'topic_logs';
188
var args = process.argv.slice(2);
19-
var msg = args.slice(1).join(' ') || 'Hello World!';
209
var key = (args.length > 0) ? args[0] : 'anonymous.info';
10+
var msg = args.slice(1).join(' ') || 'Hello World!';
2111

12+
ch.assertExchange(ex, 'topic', {durable: false});
2213
ch.publish(ex, key, new Buffer(msg));
23-
console.log(" [x] Sent %s:'%s'", key, msg);
24-
return ch.close();
14+
console.log(" [x] Sent %s: '%s'", key, msg);
2515
});
26-
}
16+
17+
setTimeout(function() { conn.close(); process.exit(0) }, 500);
18+
});
Lines changed: 21 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,30 @@
11
#!/usr/bin/env node
22

3-
usage();
3+
var amqp = require('amqplib/callback_api');
44

5-
var amqp = require('amqplib');
6-
var all = require('when').all;
7-
var conn = amqp.connect('amqp://localhost')
8-
var ch = conn.then(createChannel).then(null, console.warn);
5+
var args = process.argv.slice(2);
96

10-
ch.then(function(ch) {
11-
var x = ch.assertExchange('topic_logs', 'topic', {durable: false});
12-
var q = x.then(function() {
13-
return ch.assertQueue('', {exclusive: true});
14-
});
7+
if (args.length == 0) {
8+
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
9+
process.exit(1);
10+
}
1511

16-
var ok = q.then(function(qok) {
17-
var queue = qok.queue;
18-
var keys = process.argv.slice(2);
12+
amqp.connect('amqp://localhost', function(err, conn) {
13+
conn.createChannel(function(err, ch) {
14+
var ex = 'topic_logs';
1915

20-
return all(keys.map(function(key) {
21-
ch.bindQueue(queue, 'topic_logs', key);
22-
})).then(function() { return queue; });
23-
});
16+
ch.assertExchange(ex, 'topic', {durable: false});
2417

25-
ok = ok.then(function(queue) {
26-
return ch.consume(queue, logMessage, {noAck: true});
27-
});
28-
return ok.then(function() {
29-
console.log(' [*] Waiting for logs. To exit press CTRL+C');
30-
});
18+
ch.assertQueue('', {exclusive: true}, function(err, q) {
19+
console.log(' [*] Waiting for logs. To exit press CTRL+C');
3120

32-
function logMessage(msg) {
33-
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
34-
}
35-
});
21+
args.forEach(function(key) {
22+
ch.bindQueue(q.queue, ex, key);
23+
});
3624

37-
function createChannel(conn) {
38-
process.once('SIGINT', function() { conn.close(); });
39-
return conn.createChannel();
40-
}
41-
42-
function usage() {
43-
if (process.argv.length < 3) {
44-
console.log("Usage: receive_logs_topic.js binding_key [binding_key...]");
45-
process.exit(1);
46-
}
47-
}
25+
ch.consume(q.queue, function(msg) {
26+
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
27+
}, {noAck: true});
28+
});
29+
});
30+
});

0 commit comments

Comments
 (0)
0