8000 refactored Routing tutorial · syk-coder/rabbitmq-tutorials@273e91a · GitHub
[go: up one dir, main page]

Skip to content

Commit 273e91a

Browse files
committed
refactored Routing tutorial
1 parent b98c823 commit 273e91a

File tree

2 files changed

+32
-56
lines changed

2 files changed

+32
-56
lines changed
Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,18 @@
11
#!/usr/bin/env node
22

3-
var amqp = require('amqplib');
4-
var when = require('when');
3+
var amqp = require('amqplib/callback_api');
54

6-
var conn = amqp.connect('amqp://localhost')
7-
conn.then(createChannel).then(null, console.warn);
8-
9-
function createChannel(conn) {
10-
return when(conn.createChannel().then(logMessage)).ensure(function() { conn.close(); });
11-
}
12-
13-
function logMessage(ch) {
14-
var ex = 'direct_logs';
15-
var ok = ch.assertExchange(ex, 'direct', {durable: false})
16-
17-
return ok.then(function() {
5+
amqp.connect('amqp://localhost', function(err, conn) {
6+
conn.createChannel(function(err, ch) {
7+
var ex = 'direct_logs';
188
var args = process.argv.slice(2);
199
var msg = args.slice(1).join(' ') || 'Hello World!';
2010
var severity = (args.length > 0) ? args[0] : 'info';
2111

12+
ch.assertExchange(ex, 'direct', {durable: false});
2213
ch.publish(ex, severity, new Buffer(msg));
23-
console.log(" [x] Sent %s:'%s'", severity, msg);
24-
return ch.close();
14+
console.log(" [x] Sent %s: '%s'", severity, msg);
2515
});
26-
}
16+
17+
setTimeout(function() { conn.close(); process.exit(0) }, 500);
18+
});
Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,30 @@
11
#!/usr/bin/env node
22

3-
usage();
4-
5-
var amqp = require('amqplib');
6-
var all = require('when').all;
7-
var severities = process.argv.slice(2);
8-
var conn = amqp.connect('amqp://localhost')
9-
var ch = conn.then(createChannel).then(null, console.warn);
10-
11-
ch.then(function(ch) {
12-
var x = ch.assertExchange('direct_logs', 'direct', {durable: false});
13-
var q = x.then(function() {
14-
return ch.assertQueue('', {exclusive: true});
15-
});
3+
var amqp = require('amqplib/callback_api');
164

17-
var ok = q.then(function(qok) {
18-
var queue = qok.queue;
19-
return all(severities.map(function(sev) {
20-
ch.bindQueue(queue, 'direct_logs', sev);
21-
})).then(function() { return queue; });
22-
});
5+
var args = process.argv.slice(2);
236

24-
ok = ok.then(function(queue) {
25-
return ch.consume(queue, logMessage, {noAck: true});
26-
});
27-
return ok.then(function() {
28-
console.log(' [*] Waiting for logs. To exit press CTRL+C');
29-
});
7+
if (args.length == 0) {
8+
console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
9+
process.exit(1);
10+
}
3011

31-
function logMessage(msg) {
32-
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
33-
}
34-
});
12+
amqp.connect('amqp://localhost', function(err, conn) {
13+
conn.createChannel(function(err, ch) {
14+
var ex = 'direct_logs';
3515

36-
function createChannel(conn) {
37-
process.once('SIGINT', function() { conn.close(); });
38-
return conn.createChannel();
39-
}
16+
ch.assertExchange(ex, 'direct', {durable: false});
4017

41-
function usage() {
42-
if (process.argv.length < 3) {
43-
console.log("Usage: receive_logs_direct.js [info] [warning] [error]");
44-
process.exit(1);
45-
}
46-
}
18+
ch.assertQueue('', {exclusive: true}, function(err, q) {
19+
console.log(' [*] Waiting for logs. To exit press CTRL+C');
20+
21+
args.map(function(severity) {
22+
ch.bindQueue(q.queue, ex, severity);
23+
});
24+
25+
ch.consume(q.queue, function(msg) {
26+
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
27+
}, {noAck: true});
28+
});
29+
});
30+
});

0 commit comments

Comments
 (0)
0