Job Queue in Golang
Bo-Yi Wu
2019.10.19
About me
• Software Engineer in Mediatek
• Member of Drone CI/CD Platform
• Member of Gitea Platform
• Member of Gin Golang Framework
• Maintain Some GitHub Actions Plugins.
• Teacher of Udemy Platform: Golang + Drone
Queue (Open Source)
Why not
use Open Source?
Single Service
• Build in SQLite (LDAP, Job List)
• Build in Cache (LRU)
• Build for single binary
• No third-party dependency
Before talking about
Job Queue
Buffered
vs
Unbuffered
http://bit.ly/understand-channel
Goroutine
func main() {
go func() {
fmt.Println("GO GO GO")
}()
time.Sleep(1 * time.Second)
}
Unbuffered
make(chan bool)
Unbuffered Channel
func main() {
c := make(chan bool)
go func() {
fmt.Println("GO GO GO")
c <- true
}()
<-c
}
Unbuffered Channel
func main() {
c := make(chan bool)
go func() {
fmt.Println("GO GO GO")
c <- true
}()
<-c
}
Unbuffered Channel
func main() {
c := make(chan bool)
go func() {
fmt.Println("GO GO GO")
<-c
}()
c <- true
}
Unbuffered Channel
func main() {
c := make(chan bool)
go func() {
fmt.Println("GO GO GO")
c <- true
c <- true
}()
<-c
time.Sleep(1 * time.Second)
}
buffered
make(chan bool, 1)
Buffered channel
func main() {
c := make(chan bool, 1)
go func() {
fmt.Println("GO GO GO")
<-c
}()
c <- true
}
Buffered channel
func main() {
c := make(chan bool, 1)
go func() {
fmt.Println("GO GO GO")
<-c
}()
c <- true
}
Buffered channel
func main() {
c := make(chan bool, 1)
go func() {
fmt.Println("GO GO GO")
c <- true
}()
<-c
}
How to implement
Job Queue in Go
Sometimes you don’t need
A job queue
go process("job01")
func worker(jobChan <-chan Job) {
for job := range jobChan {
process(job)
}
}
// make a channel with a capacity of 1024.
jobChan := make(chan Job, 1024)
// start the worker
go worker(jobChan)
// enqueue a job
jobChan <- job
func worker(jobChan <-chan Job) {
for job := range jobChan {
process(job)
}
}
// make a channel with a capacity of 1024.
jobChan := make(chan Job, 1024)
// start the worker
go worker(jobChan)
// enqueue a job
jobChan <- job
Block if there already are 1024 jobs
jobChan := make(chan Job, 1024)
Enqueue without blocking
func Enqueue(job Job, jobChan chan<- Job) bool {
select {
case jobChan <- job:
return true
default:
return false
}
}
if !Enqueue(job, job100) {
Error(
http.StatusServiceUnavailable,
"max capacity reached",
)
return
}
Stopping the worker?
func main() {
ch := make(chan int, 2)
go func() {
ch <- 1
ch <- 2
}()
for n := range ch {
fmt.Println(n)
}
}
func main() {
ch := make(chan int, 2)
go func() {
ch <- 1
ch <- 2
close(ch)
}()
for n := range ch {
fmt.Println(n)
}
}
func main() {
ch := make(chan int, 2)
go func() {
ch <- 1
ch <- 2
}()
go func() {
for n := range ch {
fmt.Println(n)
}
}()
time.Sleep(1 * time.Second)
}
Setup Consumer
type Consumer struct {
inputChan chan int
jobsChan chan int
}
const PoolSize = 2
func main() {
// create the consumer
consumer := Consumer{
inputChan: make(chan int, 1),
jobsChan: make(chan int, PoolSize),
}
}
func (c *Consumer) queue(input int) {
fmt.Println("send input value:", input)
c.jobsChan <- input
}
func (c *Consumer) worker(num int) {
for job := range c.jobsChan {
fmt.Println("worker:", num, " job value:", job)
}
}
for i := 0; i < PoolSize; i++ {
go consumer.worker(i)
}
consumer.queue(1)
consumer.queue(2)
consumer.queue(3)
consumer.queue(4)
blocking by poll size = 2
Output (Poll Size = 2)
send input value: 1
send input value: 2
send input value: 3
worker: 0 job value: 1
send input value: 4
worker: 0 job value: 2
worker: 0 job value: 3
worker: 0 job value: 4
rewrite queue func
func (c *Consumer) queue(input int) bool {
fmt.Println("send input value:", input)
select {
case c.jobsChan <- input:
return true
default:
return false
}
}
Output (Poll Size = 2)
send input value: 1
send input value: 2
send input value: 3
send input value: 4
worker: 0 job value: 1
worker: 0 job value: 2
Shutdown with
Sigterm Handling
func WithContextFunc(ctx context.Context, f func()) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(c)
select {
case <-ctx.Done():
case <-c:
f()
cancel()
}
}()
return ctx
}
func WithContextFunc(ctx context.Context, f func()) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(c)
select {
case <-ctx.Done():
case <-c:
f()
cancel()
}
}()
return ctx
}
func (c Consumer) startConsumer(ctx context.Context) {
for {
select {
case job := <-c.inputChan:
if ctx.Err() != nil {
close(c.jobsChan)
return
}
c.jobsChan <- job
case <-ctx.Done():
close(c.jobsChan)
return
}
}
}
Cancel by ctx.Done() event
func (c *Consumer) worker(num int) {
for job := range c.jobsChan {
fmt.Println("worker:", num, " job value:", job)
}
}
Canceling Workers
without Context
cancelChan := make(chan struct{}) Create a cancel channel
go worker(jobChan, cancelChan)
func worker(jobChan <-chan Job, cancelChan <-chan struct{}) {
for {
select {
case <-cancelChan:
return
case job := <-jobChan:
process(job)
}
}
}
// to cancel the worker, close the cancel channel
close(cancelChan)
cancelChan := make(chan struct{}) Create a cancel channel
go worker(jobChan, cancelChan)
func worker(jobChan <-chan Job, cancelChan <-chan struct{}) {
for {
select {
case <-cancelChan: close(cancelChan)
return
case job := <-jobChan:
process(job)
}
}
}
// to cancel the worker, close the cancel channel
close(cancelChan)
Graceful shutdown
with worker
sync.WaitGroup
wg := &sync.WaitGroup{}
wg.Add(numberOfWorkers)
// Start [PoolSize] workers
for i := 0; i < PoolSize; i++ {
go consumer.worker(i)
}
WaitGroup
WaitGroup
WaitGroup
WaitGroup
func (c Consumer) worker(wg *sync.WaitGroup) {
defer wg.Done()
for job := range c.jobsChan {
// handle the job event
}
}
Add WaitGroup
after Cancel Function
func WithContextFunc(ctx context.Context, f func()) context.Context {
ctx, cancel := context.WithCancel(ctx)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(c)
select {
case <-ctx.Done():
case <-c:
cancel()
f() Add WaitGroup after Cancel Function
}
}()
return ctx
}
wg := &sync.WaitGroup{}
wg.Add(numberOfWorkers)
ctx := signal.WithContextFunc(
context.Background(),
func() {
wg.Wait()
close(finishChan)
},
)
go consumer.startConsumer(ctx)
End of Program
select {
case <-finished:
case err := <-errChannel:
if err != nil {
return err
}
}
How to auto-scaling
build agent?
Communicate between
server and agent
Jobs Schema
r := e.Group("/rpc")
r.Use(rpc.Check()) Check RPC Secret
{
r.POST("/v1/healthz", web.RPCHeartbeat)
r.POST("/v1/request", web.RPCRquest)
r.POST("/v1/accept", web.RPCAccept)
r.POST("/v1/details", web.RPCDetails)
r.POST("/v1/updateStatus", web.RPCUpdateStatus)
r.POST("/v1/upload", web.RPCUploadBytes)
r.POST("/v1/reset", web.RPCResetStatus)
}
/rpc/v1/accept
Update jobs set version = (oldVersion + 1)
where machine = "fooBar" and version = oldVersion
Create multiple worker
if r.Capacity != 0 {
var g errgroup.Group
for i := 0; i < r.Capacity; i++ {
g.Go(func() error {
return r.start(ctx, 0)
})
time.Sleep(1 * time.Second)
}
return g.Wait()
}
Break for and select loop
func (r *Runner) start(ctx context.Context, id int64) error {
LOOP:
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
r.poll(ctx, id)
if r.Capacity == 0 {
break LOOP
}
}
time.Sleep(1 * time.Second)
}
return nil
}
How to cancel the current Job?
Context with Cancel or Timeout
Job03 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
timeout, cancel := context.WithTimeout(ctx, 60*time.Minute)
defer cancel()
Context with Cancel or Timeout
Job03 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
timeout, cancel := context.WithTimeout(ctx, 60*time.Minute)
defer cancel()
Job05 context
Watch the Cancel event
go func() {
done, _ := r.Manager.Watch(ctx, id)
if done {
cancel()
}
}()
Handle cancel event on Server
subscribers: make(map[chan struct{}]int64),
cancelled: make(map[int64]time.Time),
User cancel running job
c.Lock()
c.cancelled[id] = time.Now().Add(time.Minute * 5)
for subscriber, build := range c.subscribers {
if id == build {
close(subscriber)
}
}
c.Unlock()
Agent subscribe the cancel event
for {
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(time.Minute):
c.Lock()
_, ok := c.cancelled[id]
c.Unlock()
if ok {
return true, nil
}
case <-subscriber:
return true, nil
}
}
case <-time.After(time.Minute):
c.Lock()
_, ok := c.cancelled[id]
c.Unlock()
if ok {
return true, nil
}
1 Cancel
case <-time.After(time.Minute):
c.Lock()
_, ok := c.cancelled[id]
c.Unlock()
if ok {
return true, nil
}
1 Cancel
2 Reconnect Server
case <-time.After(time.Minute):
c.Lock()
_, ok := c.cancelled[id]
c.Unlock()
if ok {
return true, nil
}
https://www.udemy.com/course/golang-fight/?couponCode=GOLANG201911
https://www.udemy.com/course/devops-oneday/?couponCode=DEVOPS201911
Any Question?