Learn RabbitMQ
for Event-Driven
A beginner-friendly tutorial on how RabbitMQ
works and how to use RabbitMQ in Go using an
Event Driven Architecture
by Percy Bolmér, March 24, 2023
docker container.
The project will have a `cmd` folder that will hold all
the different services, each being it’s own runnable.
mkdir eventdriven
cd eventdriven
mkdir -p cmd/producer
mkdir internal
touch cmd/producer/main.go
touch internal/rabbitmq.go
go mod init
go get
package internal
import (
amqp ""
return RabbitClient{
conn: conn,
ch: ch,
}, nil
package main
import (
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn
if err != nil {
defer client.Close()
time.Sleep(30 * time.Second)
Connecting to RabbitMQ
disabled by default.
// CreateQueue will create a new queue based on given cfgs
func (rc RabbitClient) CreateQueue(queueName string
_, err :=, durable
return err
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
defer conn.Close()
if err := client.CreateQueue("customers_created"
if err := client.CreateQueue("customers_test"
time.Sleep(10 *time.Second)
go run cmd/producer/main.go
Exploring Exchanges
and Bindings
Before we can start sending messages on the
Queues, we need to create an Exchange. There
are a few defaults already created, but we will
create our own to learn a little bit more about them.
Publishing Messages To
they are from. But the Binding won’t care about the
country, only that it matches the pattern.
// Create binding between the customer_events exchange and the
if err := client.CreateBinding("customers-created"
// Create binding between the customer_events exchange and the
if err := client.CreateBinding("customers-test"
go run cmd/producer/main.go
// Create context to manage timeout
ctx, cancel := context.WithTimeout(context.Background
defer cancel()
// Create customer from sweden
if err := client.Send(ctx, "customer_events",
ContentType: "text/plain", // The payload we send is p
DeliveryMode: amqp091.Persistent, // This tells rabbitMQ that
Body: []byte("An cool message between services"
}); err != nil {
if err := client.Send(ctx, "customer_events",
ContentType: "text/plain",
DeliveryMode: amqp091.Transient, // This tells rabbitMQ that
Body: []byte("A second cool message")
}); err != nil {
go run cmd/producer/main.go
Consuming Messages,
Acknowledging, Nacking
& Requeues
mkdir cmd/consumer
touch cmd/consumer/main.go
import (
func main() {
go func() {
for message := range messageBus {
// breakpoint here
log.Printf("New Message: %v", message)
go run cmd/consumer/main.go
// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implementation use - non-pe
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation ide
ReplyTo string // application use - address to repl
Expiration string // implementation use - message expi
MessageId string // application use - message identif
Timestamp time.Time // application use - message timesta
Type string // application use - message type na
UserId string // application use - creating user -
AppId string // application use - creating applic
DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key
Body []byte
go func() {
for message := range messageBus {
// breakpoint here
log.Printf("New Message: %v", message)
// Multiple means that we acknowledge a batch of messages, l
if err := message.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle man
log.Printf("Acknowledged message %s\n", message
Rerun the code now, and you should see that the
message is printed once again, but upon restarting
the message is gone.
go func() {
for message := range messageBus {
log.Printf("New Message: %v", message)
panic("Whops I failed here for some reason")
go func() {
for message := range messageBus {
log.Printf("New Message: %v", message)
if !message.Redelivered {
// Nack multiple, Set Requeue to true
message.Nack(false, true)
// Set a timeout for 15 secs
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 15*time
defer cancel()
// Create an Errgroup to manage concurrecy
g, ctx := errgroup.WithContext(ctx)
// Set amount of concurrent tasks
go func() {
for message := range messageBus {
// Spawn a worker
msg := message
g.Go(func() error {
log.Printf("New Message: %v", msg)
time.Sleep(10 * time.Second)
// Multiple means that we acknowledge a batch of messages,
if err := msg.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle ma
return err
log.Printf("Acknowledged message %s\n", msg
return nil
which will return an object that we can use to
`Wait` for the server to acknowledge.
return RabbitClient{
conn: conn,
ch: ch,
}, nil
return q, nil
package main
import (
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn
if err != nil {
defer client.Close()
package main
import (
func main() {
time.Sleep(10 * time.Second)
// Multiple means that we acknowledge a batch of messages,
if err := msg.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle ma
return err
log.Printf("Acknowledged message %s\n", msg
return nil
package main
import (
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
defer conn.Close()
// Never use the same Connection for Consume and Publish
consumeConn, err := internal.ConnectRabbitMQ(
if err != nil {
defer consumeConn.Close()
if err := consumeClient.CreateBinding(queue.Name
package main
import (
func main() {
Go ahead and try the code, you should see that the
producer receives the RPC responses and prints
them out.
Restarting RabbitMQ
tlsConf := &tls.Config{
RootCAs: rootCAs,
Certificates: []tls.Certificate{cert},
// Setup the Connection to RabbitMQ host using AMQPs and Apply
conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s/%s"
if err != nil {
return nil, err
return conn, nil
function encode_password()
SALT=$(od -A n -t x -N 4 /dev/urandom)
PASS=$SALT$(echo -n $1 | xxd -ps | tr -d '\n'
PASS=$(echo -n $PASS | xxd -r -p | sha256sum
PASS=$(echo -n $SALT$PASS | xxd -r -p | base64
echo $PASS
encode_password "secret"
log.console = true
# Disable NON TCP
listeners.tcp = none
# TCP port
listeners.ssl.default = 5671
# SSL Certs
ssl_options.cacertfile = /certs/ca_certificate.pem
ssl_options.certfile = /certs/server_blackbox_certificate.pem
ssl_options.keyfile = /certs/server_blackbox_key.pem
# Peer verification
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# Load definitions file
load_definitions = /etc/rabbitmq/rabbitmq_definitions.json
"users": [
"name": "percy",
"password_hash": "dPOoDgfw31kjUy41HSmqQR+X2Q9PCA5fD
"tags": "administrator"
"vhosts": [
"name": "/"
"name": "customers"
"permissions": [
"user": "percy",
"vhost": "customers",
"configure": ".*",
"write": ".*",
"read": ".*"
"exchanges": [
"name": "customer_events",
"vhost": "customers",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
"name": "customer_callbacks",
"vhost": "customers",
"type": "direct",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
"queues": [
"name": "customers_created",
"vhost": "customers",
"durable": true,
"auto_delete": false,
"arguments": {}
"bindings": [
"source": "customers_events",
"vhost": "customers",
"destination": "customers_created"
"destination_type": "queue",
"routing_key": "customers.created.*"
"arguments": {}
After running that, verify the logs that they print out
creating the user.
