ProgrammingPercy
ProgrammingPercy
ProgrammingPercy
Back
Learn RabbitMQ
for Event-Driven
Architecture
(EDA)
A beginner-friendly tutorial on how RabbitMQ
works and how to use RabbitMQ in Go using an
Event Driven Architecture
by Percy Bolmér, March 24, 2023
go microservices
docker container.
The project will have a `cmd` folder that will hold all
the different services, each being it’s own runnable.
mkdir eventdriven
cd eventdriven
mkdir -p cmd/producer
mkdir internal
touch cmd/producer/main.go
touch internal/rabbitmq.go
go mod init programmingpercy.tech/eventdrivenrabbit
go get github.com/rabbitmq/amqp091-go
package internal
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
return RabbitClient{
conn: conn,
ch: ch,
}, nil
}
package main
import (
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
panic(err)
}
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn
if err != nil {
panic(err)
}
defer client.Close()
time.Sleep(30 * time.Second)
log.Println(client)
}
Connecting to RabbitMQ
disabled by default.
// CreateQueue will create a new queue based on given cfgs
func (rc RabbitClient) CreateQueue(queueName string
_, err := rc.ch.QueueDeclare(queueName, durable
return err
}
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
panic(err)
}
defer conn.Close()
if err := client.CreateQueue("customers_created"
panic(err)
}
if err := client.CreateQueue("customers_test"
panic(err)
}
time.Sleep(10 *time.Second)
log.Println(client)
go run cmd/producer/main.go
Exploring Exchanges
and Bindings
Before we can start sending messages on the
Queues, we need to create an Exchange. There
are a few defaults already created, but we will
create our own to learn a little bit more about them.
Publishing Messages To
Exchanges
they are from. But the Binding won’t care about the
country, only that it matches the pattern.
...
// Create binding between the customer_events exchange and the
if err := client.CreateBinding("customers-created"
panic(err)
}
// Create binding between the customer_events exchange and the
if err := client.CreateBinding("customers-test"
panic(err)
}
go run cmd/producer/main.go
...
// Create context to manage timeout
ctx, cancel := context.WithTimeout(context.Background
defer cancel()
// Create customer from sweden
if err := client.Send(ctx, "customer_events",
ContentType: "text/plain", // The payload we send is p
DeliveryMode: amqp091.Persistent, // This tells rabbitMQ that
Body: []byte("An cool message between services"
}); err != nil {
panic(err)
}
if err := client.Send(ctx, "customer_events",
ContentType: "text/plain",
DeliveryMode: amqp091.Transient, // This tells rabbitMQ that
Body: []byte("A second cool message")
}); err != nil {
panic(err)
}
log.Println(client)
}
go run cmd/producer/main.go
Consuming Messages,
Acknowledging, Nacking
& Requeues
mkdir cmd/consumer
touch cmd/consumer/main.go
import (
"log"
"programmingpercy/eventdrivenrabbit/internal"
)
func main() {
go func() {
for message := range messageBus {
// breakpoint here
log.Printf("New Message: %v", message)
}
}()
go run cmd/consumer/main.go
// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implementation use - non-pe
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation ide
ReplyTo string // application use - address to repl
Expiration string // implementation use - message expi
MessageId string // application use - message identif
Timestamp time.Time // application use - message timesta
Type string // application use - message type na
UserId string // application use - creating user -
AppId string // application use - creating applic
DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key
Body []byte
}
go func() {
for message := range messageBus {
// breakpoint here
log.Printf("New Message: %v", message)
// Multiple means that we acknowledge a batch of messages, l
if err := message.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle man
continue
}
log.Printf("Acknowledged message %s\n", message
}
}()
Rerun the code now, and you should see that the
message is printed once again, but upon restarting
the message is gone.
go func() {
for message := range messageBus {
log.Printf("New Message: %v", message)
panic("Whops I failed here for some reason")
}
}()
times.
go func() {
for message := range messageBus {
log.Printf("New Message: %v", message)
if !message.Redelivered {
// Nack multiple, Set Requeue to true
message.Nack(false, true)
continue
}
.....
// Set a timeout for 15 secs
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 15*time
defer cancel()
// Create an Errgroup to manage concurrecy
g, ctx := errgroup.WithContext(ctx)
// Set amount of concurrent tasks
g.SetLimit(10)
go func() {
for message := range messageBus {
// Spawn a worker
msg := message
g.Go(func() error {
log.Printf("New Message: %v", msg)
time.Sleep(10 * time.Second)
// Multiple means that we acknowledge a batch of messages,
if err := msg.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle ma
return err
}
log.Printf("Acknowledged message %s\n", msg
return nil
})
}
}()
log.Println(client)
}
`PublishWithDeferredConfirmWithContext`
which will return an object that we can use to
`Wait` for the server to acknowledge.
return RabbitClient{
conn: conn,
ch: ch,
}, nil
}
return q, nil
}
package main
import (
"context"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
panic(err)
}
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn
if err != nil {
panic(err)
}
defer client.Close()
log.Println(client)
}
package main
import (
"context"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
time.Sleep(10 * time.Second)
// Multiple means that we acknowledge a batch of messages,
if err := msg.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle ma
return err
}
log.Printf("Acknowledged message %s\n", msg
return nil
})
}
}()
package main
import (
"context"
"fmt"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy"
if err != nil {
panic(err)
}
defer conn.Close()
// Never use the same Connection for Consume and Publish
consumeConn, err := internal.ConnectRabbitMQ(
if err != nil {
panic(err)
}
defer consumeConn.Close()
if err := consumeClient.CreateBinding(queue.Name
panic(err)
}
package main
import (
"context"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"github.com/rabbitmq/amqp091-go"
"golang.org/x/sync/errgroup"
)
func main() {
Go ahead and try the code, you should see that the
producer receives the RPC responses and prints
them out.
Restarting RabbitMQ
tlsConf := &tls.Config{
RootCAs: rootCAs,
Certificates: []tls.Certificate{cert},
}
// Setup the Connection to RabbitMQ host using AMQPs and Apply
conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s/%s"
if err != nil {
return nil, err
}
return conn, nil
}
#!/bin/bash
function encode_password()
{
SALT=$(od -A n -t x -N 4 /dev/urandom)
PASS=$SALT$(echo -n $1 | xxd -ps | tr -d '\n'
PASS=$(echo -n $PASS | xxd -r -p | sha256sum
PASS=$(echo -n $SALT$PASS | xxd -r -p | base64
echo $PASS
}
encode_password "secret"
log.console = true
# Disable NON TCP
listeners.tcp = none
# TCP port
listeners.ssl.default = 5671
# SSL Certs
ssl_options.cacertfile = /certs/ca_certificate.pem
ssl_options.certfile = /certs/server_blackbox_certificate.pem
ssl_options.keyfile = /certs/server_blackbox_key.pem
# Peer verification
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# Load definitions file
load_definitions = /etc/rabbitmq/rabbitmq_definitions.json
{
"users": [
{
"name": "percy",
"password_hash": "dPOoDgfw31kjUy41HSmqQR+X2Q9PCA5fD
"tags": "administrator"
}
],
"vhosts": [
{
"name": "/"
},{
"name": "customers"
}
],
"permissions": [
{
"user": "percy",
"vhost": "customers",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"exchanges": [
{
"name": "customer_events",
"vhost": "customers",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
},
{
"name": "customer_callbacks",
"vhost": "customers",
"type": "direct",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"queues": [
{
"name": "customers_created",
"vhost": "customers",
"durable": true,
"auto_delete": false,
"arguments": {}
}
],
"bindings": [
{
"source": "customers_events",
"vhost": "customers",
"destination": "customers_created"
"destination_type": "queue",
"routing_key": "customers.created.*"
"arguments": {}
}
]
}
After running that, verify the logs that they print out
creating the user.
Previous Next
Free Secure Self-Hosting Structured Logging In Go
Using Cloudflare Tunnels Using Standard Library - Slog
Your Email
Subscribe