8000 fix: move pubsub publishing out of database transactions to avoid conn exhaustion by dannykopping · Pull Request #17648 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

fix: move pubsub publishing out of database transactions to avoid conn exhaustion #17648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 5, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: move pubsub publishing out of database transactions to avoid con…
…n starvation

Signed-off-by: Danny Kopping <dannykopping@gmail.com>
  • Loading branch information
dannykopping committed May 1, 2025
commit ba2f90a18463a3cbd8d49b6d04d29ff5cd38ce24
57 changes: 42 additions & 15 deletions enterprise/coderd/prebuilds/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ type StoreReconciler struct {
registerer prometheus.Registerer
metrics *MetricsCollector

cancelFn context.CancelCauseFunc
running atomic.Bool
stopped atomic.Bool
done chan struct{}
cancelFn context.CancelCauseFunc
running atomic.Bool
stopped atomic.Bool
done chan struct{}
provisionNotifyCh chan *database.ProvisionerJob
}

var _ prebuilds.ReconciliationOrchestrator = &StoreReconciler{}
Expand All @@ -56,13 +57,14 @@ func NewStoreReconciler(store database.Store,
registerer prometheus.Registerer,
) *StoreReconciler {
reconciler := &StoreReconciler{
store: store,
pubsub: ps,
logger: logger,
cfg: cfg,
clock: clock,
registerer: registerer,
done: make(chan struct{}, 1),
store: store,
pubsub: ps,
logger: logger,
cfg: cfg,
clock: clock,
registerer: registerer,
done: make(chan struct{}, 1),
provisionNotifyCh: make(chan *database.ProvisionerJob, 100),
}

reconciler.metrics = NewMetricsCollector(store, logger, reconciler)
Expand Down Expand Up @@ -100,6 +102,29 @@ func (c *StoreReconciler) Run(ctx context.Context) {
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
c.running.Store(true)

// Publish provisioning jobs outside of database transactions.
// PGPubsub tries to acquire a new connection on Publish. A connection is held while a database transaction is active,
// so we can exhaust available connections.
go func() {
for {
select {
case <-c.done:
return
case <-ctx.Done():
return
case job := <-c.provisionNotifyCh:
if job == nil {
continue
}

err := provisionerjobs.PostJob(c.pubsub, *job)
if err != nil {
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
}
}
}
}()

for {
select {
// TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
Expand Down Expand Up @@ -571,10 +596,12 @@ func (c *StoreReconciler) provision(
return xerrors.Errorf("provision workspace: %w", err)
}

err = provisionerjobs.PostJob(c.pubsub, *provisionerJob)
if err != nil {
// Client probably doesn't care about this error, so just log it.
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
// Publish provisioner job event outside of transaction.
select {
case c.provisionNotifyCh <- provisionerJob:
default: // channel full, drop the message; provisioner will pick this job up later with its periodic check, though.
c.logger.Warn(ctx, "provisioner job notification queue full, dropping",
slog.F("job_id", provisionerJob.ID), slog.F("prebuild_id", prebuildID.String()))
}

c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
Expand Down
Loading
0