8000 Port tutorial 5 to Go · diamondtech/rabbitmq-tutorials@57e863d · GitHub
[go: up one dir, main page]

Skip to content

Commit 57e863d

Browse files
Port tutorial 5 to Go
1 parent b87d0c9 commit 57e863d

File tree

2 files changed

+156
-0
lines changed

2 files changed

+156
-0
lines changed

go/emit_log_topic.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package main
2+
3+
import (
4+
"github.com/streadway/amqp"
5+
"log"
6+
"os"
7+
"fmt"
8+
"strings"
9+
)
10+
11+
func failOnError(err error, msg string) {
12+
if err != nil {
13+
log.Fatalf("%s: %s", msg, err)
14+
panic(fmt.Sprintf("%s: %s", msg, err))
15+
}
16+
}
17+
18+
func main() {
19+
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
20+
failOnError(err, "Failed to connect to RabbitMQ")
21+
defer conn.Close()
22+
23+
ch, err := conn.Channel()
24+
failOnError(err, "Failed to open a channel")
25+
defer ch.Close()
26+
27+
err = ch.ExchangeDeclare(
28+
"logs_topic", // name
29+
"topic", // type
30+
true, // durable
31+
false, // auto-deleted
32+
false, // internal
33+
false, // noWait
34+
nil, // arguments
35+
)
36+
failOnError(err, "Failed to declare an exchange")
37+
38+
body := bodyFrom(os.Args)
39+
err = ch.Publish(
40+
"logs_topic", // exchange
41+
severityFrom(os.Args), // routing key
42+
false, // mandatory
43+
false, // immediate
44+
amqp.Publishing{
45+
ContentType: "text/plain",
46+
Body: []byte(body),
47+
})
48+
49+
failOnError(err, "Failed to publish a message")
50+
log.Printf(" [x] Sent %s", body)
51+
52+
os.Exit(0)
53+
}
54+
55+
func bodyFrom(args []string) string {
56+
var s string
57+
if (len(args) < 2) || os.Args[2] == "" {
58+
s = "hello"
59+
} else {
60+
s = strings.Join(args[1:], " ")
61+
62+
}
63+
64+
return s
65+
}
66+
67+
func severityFrom(args []string) string {
68+
var s string
69+
70+
if (len(args) < 1) || os.Args[1] == "" {
71+
s = "anonymous.info"
72+
} else {
73+
s = os.Args[1]
74+
75+
}
76+
77+
return s
78+
}

go/receive_logs_topic.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package main
2+
3+
import (
4+
"github.com/streadway/amqp"
5+
"log"
6+
"os"
7+
"fmt"
8+
)
9+
10+
func failOnError(err error, msg string) {
11+
if err != nil {
12+
log.Fatalf("%s: %s", msg, err)
13+
panic(fmt.Sprintf("%s: %s", msg, err))
14+
}
15+
}
16+
17+
func main() {
18+
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
19+
failOnError(err, "Failed to connect to RabbitMQ")
20+
defer conn.Close()
21+
22+
ch, err := conn.Channel()
23+
failOnError(err, "Failed to open a channel")
24+
defer ch.Close()
25+
26+
err = ch.ExchangeDeclare(
27+
"logs_topic", // name
28+
"topic", // type
29+
true, // durable
30+
false, // auto-deleted
31+
false, // internal
32+
false, // noWait
33+
nil, // arguments
34+
)
35+
failOnError(err, "Failed to declare an exchange")
36+
q, err := ch.QueueDeclare(
37+
"", // name
38+
false, // durable
39+
false, // delete when usused
40+
false, // exclusive
41+
false, // noWait
42+
nil, // arguments
43+
)
44+
failOnError(err, "Failed to declare a queue")
45+
46+
var s string
47+
for _, s = range os.Args {
48+
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s)
49+
err = ch.QueueBind(
50+
q.Name, // queue name
51+
s, // routing key
52+
"logs_topic", // exchange
53+
false,
54+
nil)
55+
failOnError(err, "Failed to bind a queue")
56+
}
57+
58+
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
59+
60+
done := make(chan bool)
61+
var d amqp.Delivery
62+
63+
go func() {
64+
for d = range msgs {
65+
log.Printf(" [x] %s", d.Body)
66+
done <- true
67+
}
68+
}()
69+
70+
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
71+
select {
72+
case <-done:
73+
break
74+
}
75+
log.Printf("Done")
76+
77+
os.Exit(0)
78+
}

0 commit comments

Comments
 (0)
0