From f1342db88f61b17c9e9eaaf71fa330f2e499f9cd Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 13 Jun 2023 18:44:15 +0000 Subject: [PATCH 1/4] chore: rename store to dbmock for consistency --- Makefile | 6 +++--- coderd/database/dbmock/{store.go => dbmock.go} | 0 coderd/database/dbmock/doc.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename coderd/database/dbmock/{store.go => dbmock.go} (100%) diff --git a/Makefile b/Makefile index d198e2a29c16c..c113da58bb6c8 100644 --- a/Makefile +++ b/Makefile @@ -431,7 +431,7 @@ lint/helm: gen: \ coderd/database/dump.sql \ coderd/database/querier.go \ - coderd/database/dbmock/store.go \ + coderd/database/dbmock/dbmock.go \ provisionersdk/proto/provisioner.pb.go \ provisionerd/proto/provisionerd.pb.go \ site/src/api/typesGenerated.ts \ @@ -453,7 +453,7 @@ gen/mark-fresh: files="\ coderd/database/dump.sql \ coderd/database/querier.go \ - coderd/database/dbmock/store.go \ + coderd/database/dbmock/dbmock.go \ provisionersdk/proto/provisioner.pb.go \ provisionerd/proto/provisionerd.pb.go \ site/src/api/typesGenerated.ts \ @@ -490,7 +490,7 @@ coderd/database/querier.go: coderd/database/sqlc.yaml coderd/database/dump.sql $ ./coderd/database/generate.sh -coderd/database/dbmock/store.go: coderd/database/db.go coderd/database/querier.go +coderd/database/dbmock/dbmock.go: coderd/database/db.go coderd/database/querier.go go generate ./coderd/database/dbmock/ provisionersdk/proto/provisioner.pb.go: provisionersdk/proto/provisioner.proto diff --git a/coderd/database/dbmock/store.go b/coderd/database/dbmock/dbmock.go similarity index 100% rename from coderd/database/dbmock/store.go rename to coderd/database/dbmock/dbmock.go diff --git a/coderd/database/dbmock/doc.go b/coderd/database/dbmock/doc.go index 1aaeb50463812..2199de635b86b 100644 --- a/coderd/database/dbmock/doc.go +++ b/coderd/database/dbmock/doc.go @@ -1,4 +1,4 @@ // package dbmock contains a mocked implementation of the database.Store interface for use in tests package dbmock -//go:generate mockgen -destination ./store.go -package dbmock github.com/coder/coder/coderd/database Store +//go:generate mockgen -destination ./dbmock.go -package dbmock github.com/coder/coder/coderd/database Store From da8631db2dd006f657a3d46ff1be09dbd7aa6ae3 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 13 Jun 2023 18:48:05 +0000 Subject: [PATCH 2/4] chore: remove redundant dbtype package This wasn't necessary and forked how we do DB types. --- coderd/coderd.go | 3 +-- coderd/database/dbgen/dbgen.go | 3 +-- coderd/database/dbtype/dbtype.go | 29 -------------------- coderd/database/models.go | 5 ++-- coderd/database/queries.sql.go | 5 ++-- coderd/database/sqlc.yaml | 6 +++-- coderd/database/time.go | 14 ---------- coderd/database/{drivers.go => types.go} | 34 ++++++++++++++++++++++++ coderd/workspaces_test.go | 5 ++-- coderd/wsbuilder/wsbuilder_test.go | 5 ++-- 10 files changed, 48 insertions(+), 61 deletions(-) delete mode 100644 coderd/database/time.go rename coderd/database/{drivers.go => types.go} (55%) diff --git a/coderd/coderd.go b/coderd/coderd.go index f1aa14e145eac..64cc157c723d4 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -48,7 +48,6 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" "github.com/coder/coder/coderd/database/dbmetrics" - "github.com/coder/coder/coderd/database/dbtype" "github.com/coder/coder/coderd/gitauth" "github.com/coder/coder/coderd/gitsshkey" "github.com/coder/coder/coderd/healthcheck" @@ -948,7 +947,7 @@ func (api *API) CreateInMemoryProvisionerDaemon(ctx context.Context, debounce ti CreatedAt: database.Now(), Name: name, Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho, database.ProvisionerTypeTerraform}, - Tags: dbtype.StringMap{ + Tags: database.StringMap{ provisionerdserver.TagScope: provisionerdserver.ScopeOrganization, }, }) diff --git a/coderd/database/dbgen/dbgen.go b/coderd/database/dbgen/dbgen.go index 604873fa2afb6..1a428ba2ba63c 100644 --- a/coderd/database/dbgen/dbgen.go +++ b/coderd/database/dbgen/dbgen.go @@ -18,7 +18,6 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" - "github.com/coder/coder/coderd/database/dbtype" "github.com/coder/coder/coderd/rbac" "github.com/coder/coder/cryptorand" ) @@ -278,7 +277,7 @@ func ProvisionerJob(t testing.TB, db database.Store, orig database.ProvisionerJo // Always set some tags to prevent Acquire from grabbing jobs it should not. if !orig.StartedAt.Time.IsZero() { if orig.Tags == nil { - orig.Tags = make(dbtype.StringMap) + orig.Tags = make(database.StringMap) } // Make sure when we acquire the job, we only get this one. orig.Tags[id.String()] = "true" diff --git a/coderd/database/dbtype/dbtype.go b/coderd/database/dbtype/dbtype.go index 9ab47c16f5552..2811f6f5a93c8 100644 --- a/coderd/database/dbtype/dbtype.go +++ b/coderd/database/dbtype/dbtype.go @@ -1,30 +1 @@ package dbtype - -import ( - "database/sql/driver" - "encoding/json" - - "golang.org/x/xerrors" -) - -type StringMap map[string]string - -func (m *StringMap) Scan(src interface{}) error { - if src == nil { - return nil - } - switch src := src.(type) { - case []byte: - err := json.Unmarshal(src, m) - if err != nil { - return err - } - default: - return xerrors.Errorf("unsupported Scan, storing driver.Value type %T into type %T", src, m) - } - return nil -} - -func (m StringMap) Value() (driver.Value, error) { - return json.Marshal(m) -} diff --git a/coderd/database/models.go b/coderd/database/models.go index 558e1c51a94d7..cd571d6bba803 100644 --- a/coderd/database/models.go +++ b/coderd/database/models.go @@ -11,7 +11,6 @@ import ( "fmt" "time" - "github.com/coder/coder/coderd/database/dbtype" "github.com/google/uuid" "github.com/lib/pq" "github.com/tabbed/pqtype" @@ -1478,7 +1477,7 @@ type ProvisionerDaemon struct { Name string `db:"name" json:"name"` Provisioners []ProvisionerType `db:"provisioners" json:"provisioners"` ReplicaID uuid.NullUUID `db:"replica_id" json:"replica_id"` - Tags dbtype.StringMap `db:"tags" json:"tags"` + Tags StringMap `db:"tags" json:"tags"` } type ProvisionerJob struct { @@ -1497,7 +1496,7 @@ type ProvisionerJob struct { Input json.RawMessage `db:"input" json:"input"` WorkerID uuid.NullUUID `db:"worker_id" json:"worker_id"` FileID uuid.UUID `db:"file_id" json:"file_id"` - Tags dbtype.StringMap `db:"tags" json:"tags"` + Tags StringMap `db:"tags" json:"tags"` ErrorCode sql.NullString `db:"error_code" json:"error_code"` TraceMetadata pqtype.NullRawMessage `db:"trace_metadata" json:"trace_metadata"` } diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index d27802827763b..2c71a96ccc4e0 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -10,7 +10,6 @@ import ( "encoding/json" "time" - "github.com/coder/coder/coderd/database/dbtype" "github.com/google/uuid" "github.com/lib/pq" "github.com/tabbed/pqtype" @@ -1999,7 +1998,7 @@ type InsertProvisionerDaemonParams struct { CreatedAt time.Time `db:"created_at" json:"created_at"` Name string `db:"name" json:"name"` Provisioners []ProvisionerType `db:"provisioners" json:"provisioners"` - Tags dbtype.StringMap `db:"tags" json:"tags"` + Tags StringMap `db:"tags" json:"tags"` } func (q *sqlQuerier) InsertProvisionerDaemon(ctx context.Context, arg InsertProvisionerDaemonParams) (ProvisionerDaemon, error) { @@ -2365,7 +2364,7 @@ type InsertProvisionerJobParams struct { FileID uuid.UUID `db:"file_id" json:"file_id"` Type ProvisionerJobType `db:"type" json:"type"` Input json.RawMessage `db:"input" json:"input"` - Tags dbtype.StringMap `db:"tags" json:"tags"` + Tags StringMap `db:"tags" json:"tags"` TraceMetadata pqtype.NullRawMessage `db:"trace_metadata" json:"trace_metadata"` } diff --git a/coderd/database/sqlc.yaml b/coderd/database/sqlc.yaml index cd3e846afb430..0330a3503e1ef 100644 --- a/coderd/database/sqlc.yaml +++ b/coderd/database/sqlc.yaml @@ -8,9 +8,11 @@ overrides: go: overrides: - column: "provisioner_daemons.tags" - go_type: "github.com/coder/coder/coderd/database/dbtype.StringMap" + go_type: + type: "StringMap" - column: "provisioner_jobs.tags" - go_type: "github.com/coder/coder/coderd/database/dbtype.StringMap" + go_type: + type: "StringMap" - column: "users.rbac_roles" go_type: "github.com/lib/pq.StringArray" - column: "templates.user_acl" diff --git a/coderd/database/time.go b/coderd/database/time.go deleted file mode 100644 index 290ddf228fb7b..0000000000000 --- a/coderd/database/time.go +++ /dev/null @@ -1,14 +0,0 @@ -package database - -import "time" - -// Now returns a standardized timezone used for database resources. -func Now() time.Time { - return Time(time.Now().UTC()) -} - -// Time returns a time compatible with Postgres. Postgres only stores dates with -// microsecond precision. -func Time(t time.Time) time.Time { - return t.Round(time.Microsecond) -} diff --git a/coderd/database/drivers.go b/coderd/database/types.go similarity index 55% rename from coderd/database/drivers.go rename to coderd/database/types.go index a1084d1229662..45d21964ac27c 100644 --- a/coderd/database/drivers.go +++ b/coderd/database/types.go @@ -3,6 +3,7 @@ package database import ( "database/sql/driver" "encoding/json" + "time" "golang.org/x/xerrors" @@ -43,3 +44,36 @@ func (t *TemplateACL) Scan(src interface{}) error { func (t TemplateACL) Value() (driver.Value, error) { return json.Marshal(t) } + +type StringMap map[string]string + +func (m *StringMap) Scan(src interface{}) error { + if src == nil { + return nil + } + switch src := src.(type) { + case []byte: + err := json.Unmarshal(src, m) + if err != nil { + return err + } + default: + return xerrors.Errorf("unsupported Scan, storing driver.Value type %T into type %T", src, m) + } + return nil +} + +func (m StringMap) Value() (driver.Value, error) { + return json.Marshal(m) +} + +// Now returns a standardized timezone used for database resources. +func Now() time.Time { + return Time(time.Now().UTC()) +} + +// Time returns a time compatible with Postgres. Postgres only stores dates with +// microsecond precision. +func Time(t time.Time) time.Time { + return t.Round(time.Microsecond) +} diff --git a/coderd/workspaces_test.go b/coderd/workspaces_test.go index c9bdc60afd19e..ee651dc8d626e 100644 --- a/coderd/workspaces_test.go +++ b/coderd/workspaces_test.go @@ -26,7 +26,6 @@ import ( "github.com/coder/coder/coderd/database/dbauthz" "github.com/coder/coder/coderd/database/dbgen" "github.com/coder/coder/coderd/database/dbtestutil" - "github.com/coder/coder/coderd/database/dbtype" "github.com/coder/coder/coderd/parameter" "github.com/coder/coder/coderd/rbac" "github.com/coder/coder/coderd/schedule" @@ -589,7 +588,7 @@ func TestWorkspaceFilterAllStatus(t *testing.T) { InitiatorID: owner.UserID, WorkerID: uuid.NullUUID{}, FileID: file.ID, - Tags: dbtype.StringMap{ + Tags: database.StringMap{ "custom": "true", }, }) @@ -617,7 +616,7 @@ func TestWorkspaceFilterAllStatus(t *testing.T) { job.Type = database.ProvisionerJobTypeWorkspaceBuild job.OrganizationID = owner.OrganizationID // Need to prevent acquire from getting this job. - job.Tags = dbtype.StringMap{ + job.Tags = database.StringMap{ jobID.String(): "true", } job = dbgen.ProvisionerJob(t, db, job) diff --git a/coderd/wsbuilder/wsbuilder_test.go b/coderd/wsbuilder/wsbuilder_test.go index bef69d5ea99d8..414841e80b7f0 100644 --- a/coderd/wsbuilder/wsbuilder_test.go +++ b/coderd/wsbuilder/wsbuilder_test.go @@ -15,7 +15,6 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbmock" - "github.com/coder/coder/coderd/database/dbtype" "github.com/coder/coder/coderd/provisionerdserver" "github.com/coder/coder/coderd/wsbuilder" "github.com/coder/coder/codersdk" @@ -614,7 +613,7 @@ func withActiveVersion(params []database.TemplateVersionParameter) func(mTx *dbm StorageMethod: database.ProvisionerStorageMethodFile, Type: database.ProvisionerJobTypeTemplateVersionImport, Input: nil, - Tags: dbtype.StringMap{ + Tags: database.StringMap{ "version": "active", provisionerdserver.TagScope: provisionerdserver.ScopeUser, }, @@ -654,7 +653,7 @@ func withInactiveVersion(params []database.TemplateVersionParameter) func(mTx *d StorageMethod: database.ProvisionerStorageMethodFile, Type: database.ProvisionerJobTypeTemplateVersionImport, Input: nil, - Tags: dbtype.StringMap{ + Tags: database.StringMap{ "version": "inactive", provisionerdserver.TagScope: provisionerdserver.ScopeUser, }, From 119e85341faff1fd4b264893036e9f2db2db1630 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 13 Jun 2023 18:59:13 +0000 Subject: [PATCH 3/4] chore: separate pubsub into a new package This didn't need to be in database and was bloating it. --- cli/server.go | 7 ++-- coderd/coderd.go | 3 +- coderd/coderdtest/coderdtest.go | 3 +- coderd/database/dbtestutil/db.go | 11 +++--- coderd/database/{ => pubsub}/pubsub.go | 26 ++++++------- .../{ => pubsub}/pubsub_internal_test.go | 10 ++--- coderd/database/{ => pubsub}/pubsub_memory.go | 4 +- .../{ => pubsub}/pubsub_memory_test.go | 8 ++-- coderd/database/{ => pubsub}/pubsub_test.go | 38 +++++++++---------- .../provisionerdserver/provisionerdserver.go | 3 +- .../provisionerdserver_test.go | 5 ++- coderd/provisionerjobs.go | 5 ++- coderd/provisionerjobs_internal_test.go | 7 ++-- enterprise/replicasync/replicasync.go | 9 +++-- enterprise/replicasync/replicasync_test.go | 3 +- enterprise/tailnet/coordinator.go | 6 +-- enterprise/tailnet/coordinator_test.go | 8 ++-- 17 files changed, 83 insertions(+), 73 deletions(-) rename coderd/database/{ => pubsub}/pubsub.go (92%) rename coderd/database/{ => pubsub}/pubsub_internal_test.go (94%) rename coderd/database/{ => pubsub}/pubsub_memory.go (97%) rename coderd/database/{ => pubsub}/pubsub_memory_test.go (89%) rename coderd/database/{ => pubsub}/pubsub_test.go (88%) diff --git a/cli/server.go b/cli/server.go index d7dea720978e9..c47cf8271de9e 100644 --- a/cli/server.go +++ b/cli/server.go @@ -68,6 +68,7 @@ import ( "github.com/coder/coder/coderd/database/dbmetrics" "github.com/coder/coder/coderd/database/dbpurge" "github.com/coder/coder/coderd/database/migrations" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/coderd/devtunnel" "github.com/coder/coder/coderd/gitauth" "github.com/coder/coder/coderd/gitsshkey" @@ -463,7 +464,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. Logger: logger.Named("coderd"), Database: dbfake.New(), DERPMap: derpMap, - Pubsub: database.NewPubsubInMemory(), + Pubsub: pubsub.NewInMemory(), CacheDir: cacheDir, GoogleTokenValidator: googleTokenValidator, GitAuthConfigs: gitAuthConfigs, @@ -589,7 +590,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. if cfg.InMemoryDatabase { // This is only used for testing. options.Database = dbmetrics.New(dbfake.New(), options.PrometheusRegistry) - options.Pubsub = database.NewPubsubInMemory() + options.Pubsub = pubsub.NewInMemory() } else { sqlDB, err := connectToPostgres(ctx, logger, sqlDriver, cfg.PostgresURL.String()) if err != nil { @@ -600,7 +601,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd. }() options.Database = dbmetrics.New(database.New(sqlDB), options.PrometheusRegistry) - options.Pubsub, err = database.NewPubsub(ctx, sqlDB, cfg.PostgresURL.String()) + options.Pubsub, err = pubsub.New(ctx, sqlDB, cfg.PostgresURL.String()) if err != nil { return xerrors.Errorf("create pubsub: %w", err) } diff --git a/coderd/coderd.go b/coderd/coderd.go index 64cc157c723d4..82a7d36e80551 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -48,6 +48,7 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" "github.com/coder/coder/coderd/database/dbmetrics" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/coderd/gitauth" "github.com/coder/coder/coderd/gitsshkey" "github.com/coder/coder/coderd/healthcheck" @@ -95,7 +96,7 @@ type Options struct { AppHostnameRegex *regexp.Regexp Logger slog.Logger Database database.Store - Pubsub database.Pubsub + Pubsub pubsub.Pubsub // CacheDir is used for caching files served by the API. CacheDir string diff --git a/coderd/coderdtest/coderdtest.go b/coderd/coderdtest/coderdtest.go index b6623fbd6f942..56984cab13d88 100644 --- a/coderd/coderdtest/coderdtest.go +++ b/coderd/coderdtest/coderdtest.go @@ -59,6 +59,7 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" "github.com/coder/coder/coderd/database/dbtestutil" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/coderd/gitauth" "github.com/coder/coder/coderd/gitsshkey" "github.com/coder/coder/coderd/healthcheck" @@ -130,7 +131,7 @@ type Options struct { // It should only be used in cases where multiple Coder // test instances are running against the same database. Database database.Store - Pubsub database.Pubsub + Pubsub pubsub.Pubsub ConfigSSH codersdk.SSHConfigResponse diff --git a/coderd/database/dbtestutil/db.go b/coderd/database/dbtestutil/db.go index 7726a7174861c..932e4aaf4739a 100644 --- a/coderd/database/dbtestutil/db.go +++ b/coderd/database/dbtestutil/db.go @@ -11,13 +11,14 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbfake" "github.com/coder/coder/coderd/database/postgres" + "github.com/coder/coder/coderd/database/pubsub" ) -func NewDB(t testing.TB) (database.Store, database.Pubsub) { +func NewDB(t testing.TB) (database.Store, pubsub.Pubsub) { t.Helper() db := dbfake.New() - pubsub := database.NewPubsubInMemory() + ps := pubsub.NewInMemory() if os.Getenv("DB") != "" { connectionURL := os.Getenv("CODER_PG_CONNECTION_URL") if connectionURL == "" { @@ -36,12 +37,12 @@ func NewDB(t testing.TB) (database.Store, database.Pubsub) { }) db = database.New(sqlDB) - pubsub, err = database.NewPubsub(context.Background(), sqlDB, connectionURL) + ps, err = pubsub.New(context.Background(), sqlDB, connectionURL) require.NoError(t, err) t.Cleanup(func() { - _ = pubsub.Close() + _ = ps.Close() }) } - return db, pubsub + return db, ps } diff --git a/coderd/database/pubsub.go b/coderd/database/pubsub/pubsub.go similarity index 92% rename from coderd/database/pubsub.go rename to coderd/database/pubsub/pubsub.go index 6a6d1f2f07751..f661e885c2848 100644 --- a/coderd/database/pubsub.go +++ b/coderd/database/pubsub/pubsub.go @@ -1,4 +1,4 @@ -package database +package pubsub import ( "context" @@ -48,7 +48,7 @@ type msgOrErr struct { type msgQueue struct { ctx context.Context cond *sync.Cond - q [PubsubBufferSize]msgOrErr + q [BufferSize]msgOrErr front int size int closed bool @@ -82,7 +82,7 @@ func (q *msgQueue) run() { return } item := q.q[q.front] - q.front = (q.front + 1) % PubsubBufferSize + q.front = (q.front + 1) % BufferSize q.size-- q.cond.L.Unlock() @@ -111,20 +111,20 @@ func (q *msgQueue) enqueue(msg []byte) { q.cond.L.Lock() defer q.cond.L.Unlock() - if q.size == PubsubBufferSize { + if q.size == BufferSize { // queue is full, so we're going to drop the msg we got called with. // We also need to record that messages are being dropped, which we // do at the last message in the queue. This potentially makes us // lose 2 messages instead of one, but it's more important at this // point to warn the subscriber that they're losing messages so they // can do something about it. - back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize + back := (q.front + BufferSize - 1) % BufferSize q.q[back].msg = nil q.q[back].err = ErrDroppedMessages return } // queue is not full, insert the message - next := (q.front + q.size) % PubsubBufferSize + next := (q.front + q.size) % BufferSize q.q[next].msg = msg q.q[next].err = nil q.size++ @@ -143,17 +143,17 @@ func (q *msgQueue) dropped() { q.cond.L.Lock() defer q.cond.L.Unlock() - if q.size == PubsubBufferSize { + if q.size == BufferSize { // queue is full, but we need to record that messages are being dropped, // which we do at the last message in the queue. This potentially drops // another message, but it's more important for the subscriber to know. - back := (q.front + PubsubBufferSize - 1) % PubsubBufferSize + back := (q.front + BufferSize - 1) % BufferSize q.q[back].msg = nil q.q[back].err = ErrDroppedMessages return } // queue is not full, insert the error - next := (q.front + q.size) % PubsubBufferSize + next := (q.front + q.size) % BufferSize q.q[next].msg = nil q.q[next].err = ErrDroppedMessages q.size++ @@ -171,9 +171,9 @@ type pgPubsub struct { queues map[string]map[uuid.UUID]*msgQueue } -// PubsubBufferSize is the maximum number of unhandled messages we will buffer +// BufferSize is the maximum number of unhandled messages we will buffer // for a subscriber before dropping messages. -const PubsubBufferSize = 2048 +const BufferSize = 2048 // Subscribe calls the listener when an event matching the name is received. func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) { @@ -295,8 +295,8 @@ func (p *pgPubsub) recordReconnect() { } } -// NewPubsub creates a new Pubsub implementation using a PostgreSQL connection. -func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) { +// New creates a new Pubsub implementation using a PostgreSQL connection. +func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) { // Creates a new listener using pq. errCh := make(chan error) listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) { diff --git a/coderd/database/pubsub_internal_test.go b/coderd/database/pubsub/pubsub_internal_test.go similarity index 94% rename from coderd/database/pubsub_internal_test.go rename to coderd/database/pubsub/pubsub_internal_test.go index 31c50ce172176..adfa70286dbe0 100644 --- a/coderd/database/pubsub_internal_test.go +++ b/coderd/database/pubsub/pubsub_internal_test.go @@ -1,4 +1,4 @@ -package database +package pubsub import ( "context" @@ -26,7 +26,7 @@ func Test_msgQueue_ListenerWithError(t *testing.T) { // PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned // when we wrap around the end of the circular buffer. This tests that we correctly handle // the wrapping and aren't dequeueing misaligned data. - cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring + cycles := (BufferSize / 5) * 2 // almost twice around the ring for j := 0; j < cycles; j++ { for i := 0; i < 4; i++ { uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i))) @@ -75,7 +75,7 @@ func Test_msgQueue_Listener(t *testing.T) { // PubsubBufferSize is 2048, which is a power of 2, so a pattern of 5 will not be aligned // when we wrap around the end of the circular buffer. This tests that we correctly handle // the wrapping and aren't dequeueing misaligned data. - cycles := (PubsubBufferSize / 5) * 2 // almost twice around the ring + cycles := (BufferSize / 5) * 2 // almost twice around the ring for j := 0; j < cycles; j++ { for i := 0; i < 4; i++ { uut.enqueue([]byte(fmt.Sprintf("%d%d", j, i))) @@ -119,7 +119,7 @@ func Test_msgQueue_Full(t *testing.T) { // we send 2 more than the capacity. One extra because the call to the ListenerFunc blocks // but only after we've dequeued a message, and then another extra because we want to exceed // the capacity, not just reach it. - for i := 0; i < PubsubBufferSize+2; i++ { + for i := 0; i < BufferSize+2; i++ { uut.enqueue([]byte(fmt.Sprintf("%d", i))) // ensure the first dequeue has happened before proceeding, so that this function isn't racing // against the goroutine that dequeues items. @@ -136,5 +136,5 @@ func Test_msgQueue_Full(t *testing.T) { // Ok, so we sent 2 more than capacity, but we only read the capacity, that's because the last // message we send doesn't get queued, AND, it bumps a message out of the queue to make room // for the error, so we read 2 less than we sent. - require.Equal(t, PubsubBufferSize, n) + require.Equal(t, BufferSize, n) } diff --git a/coderd/database/pubsub_memory.go b/coderd/database/pubsub/pubsub_memory.go similarity index 97% rename from coderd/database/pubsub_memory.go rename to coderd/database/pubsub/pubsub_memory.go index 0ab4684c80a3f..ec4c26a4f01e0 100644 --- a/coderd/database/pubsub_memory.go +++ b/coderd/database/pubsub/pubsub_memory.go @@ -1,4 +1,4 @@ -package database +package pubsub import ( "context" @@ -87,7 +87,7 @@ func (*memoryPubsub) Close() error { return nil } -func NewPubsubInMemory() Pubsub { +func NewInMemory() Pubsub { return &memoryPubsub{ listeners: make(map[string]map[uuid.UUID]genericListener), } diff --git a/coderd/database/pubsub_memory_test.go b/coderd/database/pubsub/pubsub_memory_test.go similarity index 89% rename from coderd/database/pubsub_memory_test.go rename to coderd/database/pubsub/pubsub_memory_test.go index 7856880d856c2..80553c8fa73da 100644 --- a/coderd/database/pubsub_memory_test.go +++ b/coderd/database/pubsub/pubsub_memory_test.go @@ -1,4 +1,4 @@ -package database_test +package pubsub_test import ( "context" @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/coder/coder/coderd/database" + "github.com/coder/coder/coderd/database/pubsub" ) func TestPubsubMemory(t *testing.T) { @@ -16,7 +16,7 @@ func TestPubsubMemory(t *testing.T) { t.Run("Legacy", func(t *testing.T) { t.Parallel() - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() event := "test" data := "testing" messageChannel := make(chan []byte) @@ -36,7 +36,7 @@ func TestPubsubMemory(t *testing.T) { t.Run("WithErr", func(t *testing.T) { t.Parallel() - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() event := "test" data := "testing" messageChannel := make(chan []byte) diff --git a/coderd/database/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go similarity index 88% rename from coderd/database/pubsub_test.go rename to coderd/database/pubsub/pubsub_test.go index 60fb1821af55d..49b4bd6203a60 100644 --- a/coderd/database/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -1,6 +1,6 @@ //go:build linux -package database_test +package pubsub_test import ( "context" @@ -15,8 +15,8 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" - "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/postgres" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/testutil" ) @@ -39,7 +39,7 @@ func TestPubsub(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := database.NewPubsub(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, db, connectionURL) require.NoError(t, err) defer pubsub.Close() event := "test" @@ -67,7 +67,7 @@ func TestPubsub(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := database.NewPubsub(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, db, connectionURL) require.NoError(t, err) defer pubsub.Close() cancelFunc() @@ -82,7 +82,7 @@ func TestPubsub(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := database.NewPubsub(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, db, connectionURL) require.NoError(t, err) defer pubsub.Close() @@ -114,7 +114,7 @@ func TestPubsub(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := database.NewPubsub(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, db, connectionURL) require.NoError(t, err) defer pubsub.Close() @@ -171,7 +171,7 @@ func TestPubsub_ordering(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := database.NewPubsub(ctx, db, connectionURL) + pubsub, err := pubsub.New(ctx, db, connectionURL) require.NoError(t, err) defer pubsub.Close() event := "test" @@ -219,14 +219,14 @@ func TestPubsub_Disconnect(t *testing.T) { ctx, cancelFunc := context.WithTimeout(context.Background(), testutil.WaitSuperLong) defer cancelFunc() - pubsub, err := database.NewPubsub(ctx, db, connectionURL) + ps, err := pubsub.New(ctx, db, connectionURL) require.NoError(t, err) - defer pubsub.Close() + defer ps.Close() event := "test" // buffer responses so that when the test completes, goroutines don't get blocked & leak - errors := make(chan error, database.PubsubBufferSize) - messages := make(chan string, database.PubsubBufferSize) + errors := make(chan error, pubsub.BufferSize) + messages := make(chan string, pubsub.BufferSize) readOne := func() (m string, e error) { t.Helper() select { @@ -244,7 +244,7 @@ func TestPubsub_Disconnect(t *testing.T) { return m, e } - cancelSub, err := pubsub.SubscribeWithErr(event, func(ctx context.Context, msg []byte, err error) { + cancelSub, err := ps.SubscribeWithErr(event, func(ctx context.Context, msg []byte, err error) { messages <- string(msg) errors <- err }) @@ -252,7 +252,7 @@ func TestPubsub_Disconnect(t *testing.T) { defer cancelSub() for i := 0; i < 100; i++ { - err = pubsub.Publish(event, []byte(fmt.Sprintf("%d", i))) + err = ps.Publish(event, []byte(fmt.Sprintf("%d", i))) require.NoError(t, err) } // make sure we're getting at least one message. @@ -270,7 +270,7 @@ func TestPubsub_Disconnect(t *testing.T) { default: // ok } - err = pubsub.Publish(event, []byte(fmt.Sprintf("%d", j))) + err = ps.Publish(event, []byte(fmt.Sprintf("%d", j))) j++ if err != nil { break @@ -292,7 +292,7 @@ func TestPubsub_Disconnect(t *testing.T) { default: // ok } - err = pubsub.Publish(event, []byte(fmt.Sprintf("%d", j))) + err = ps.Publish(event, []byte(fmt.Sprintf("%d", j))) if err == nil { break } @@ -303,7 +303,7 @@ func TestPubsub_Disconnect(t *testing.T) { k := j // exceeding the buffer invalidates the test because this causes us to drop messages for reasons other than DB // reconnect - require.Less(t, k, database.PubsubBufferSize, "exceeded buffer") + require.Less(t, k, pubsub.BufferSize, "exceeded buffer") // We don't know how quickly the pubsub will reconnect, so continue to send messages with increasing numbers. As // soon as we see k or higher we know we're getting messages after the restart. @@ -315,7 +315,7 @@ func TestPubsub_Disconnect(t *testing.T) { default: // ok } - _ = pubsub.Publish(event, []byte(fmt.Sprintf("%d", j))) + _ = ps.Publish(event, []byte(fmt.Sprintf("%d", j))) j++ time.Sleep(testutil.IntervalFast) } @@ -324,7 +324,7 @@ func TestPubsub_Disconnect(t *testing.T) { gotDroppedErr := false for { m, err := readOne() - if xerrors.Is(err, database.ErrDroppedMessages) { + if xerrors.Is(err, pubsub.ErrDroppedMessages) { gotDroppedErr = true continue } @@ -334,7 +334,7 @@ func TestPubsub_Disconnect(t *testing.T) { if l >= k { // exceeding the buffer invalidates the test because this causes us to drop messages for reasons other than // DB reconnect - require.Less(t, l, database.PubsubBufferSize, "exceeded buffer") + require.Less(t, l, pubsub.BufferSize, "exceeded buffer") break } } diff --git a/coderd/provisionerdserver/provisionerdserver.go b/coderd/provisionerdserver/provisionerdserver.go index f0de9939bf06a..a33c4a048a6d3 100644 --- a/coderd/provisionerdserver/provisionerdserver.go +++ b/coderd/provisionerdserver/provisionerdserver.go @@ -31,6 +31,7 @@ import ( "github.com/coder/coder/coderd/audit" "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/coderd/gitauth" "github.com/coder/coder/coderd/httpmw" "github.com/coder/coder/coderd/schedule" @@ -56,7 +57,7 @@ type Server struct { GitAuthConfigs []*gitauth.Config Tags json.RawMessage Database database.Store - Pubsub database.Pubsub + Pubsub pubsub.Pubsub Telemetry telemetry.Reporter Tracer trace.Tracer QuotaCommitter *atomic.Pointer[proto.QuotaCommitter] diff --git a/coderd/provisionerdserver/provisionerdserver_test.go b/coderd/provisionerdserver/provisionerdserver_test.go index 6b42556f0aee9..86a834cf0780c 100644 --- a/coderd/provisionerdserver/provisionerdserver_test.go +++ b/coderd/provisionerdserver/provisionerdserver_test.go @@ -21,6 +21,7 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbfake" "github.com/coder/coder/coderd/database/dbgen" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/coderd/gitauth" "github.com/coder/coder/coderd/provisionerdserver" "github.com/coder/coder/coderd/schedule" @@ -51,7 +52,7 @@ func TestAcquireJob(t *testing.T) { t.Run("Debounce", func(t *testing.T) { t.Parallel() db := dbfake.New() - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() srv := &provisionerdserver.Server{ ID: uuid.New(), Logger: slogtest.Make(t, nil), @@ -1256,7 +1257,7 @@ func TestInsertWorkspaceResource(t *testing.T) { func setup(t *testing.T, ignoreLogErrors bool) *provisionerdserver.Server { t.Helper() db := dbfake.New() - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() return &provisionerdserver.Server{ ID: uuid.New(), diff --git a/coderd/provisionerjobs.go b/coderd/provisionerjobs.go index 99fb647385f5e..67b3db5d9ce05 100644 --- a/coderd/provisionerjobs.go +++ b/coderd/provisionerjobs.go @@ -19,6 +19,7 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/db2sdk" "github.com/coder/coder/coderd/database/dbauthz" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/coderd/httpapi" "github.com/coder/coder/codersdk" "github.com/coder/coder/provisionersdk" @@ -268,7 +269,7 @@ type logFollower struct { ctx context.Context logger slog.Logger db database.Store - pubsub database.Pubsub + pubsub pubsub.Pubsub r *http.Request rw http.ResponseWriter conn *websocket.Conn @@ -281,7 +282,7 @@ type logFollower struct { } func newLogFollower( - ctx context.Context, logger slog.Logger, db database.Store, pubsub database.Pubsub, + ctx context.Context, logger slog.Logger, db database.Store, pubsub pubsub.Pubsub, rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob, after int64, ) *logFollower { return &logFollower{ diff --git a/coderd/provisionerjobs_internal_test.go b/coderd/provisionerjobs_internal_test.go index ee34e451058b0..5df9c55d61689 100644 --- a/coderd/provisionerjobs_internal_test.go +++ b/coderd/provisionerjobs_internal_test.go @@ -20,6 +20,7 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbmock" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/codersdk" "github.com/coder/coder/provisionersdk" "github.com/coder/coder/testutil" @@ -138,7 +139,7 @@ func Test_logFollower_completeBeforeFollow(t *testing.T) { logger := slogtest.Make(t, nil) ctrl := gomock.NewController(t) mDB := dbmock.NewMockStore(ctrl) - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() now := database.Now() job := database.ProvisionerJob{ ID: uuid.New(), @@ -200,7 +201,7 @@ func Test_logFollower_completeBeforeSubscribe(t *testing.T) { logger := slogtest.Make(t, nil) ctrl := gomock.NewController(t) mDB := dbmock.NewMockStore(ctrl) - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() now := database.Now() job := database.ProvisionerJob{ ID: uuid.New(), @@ -276,7 +277,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) { logger := slogtest.Make(t, nil) ctrl := gomock.NewController(t) mDB := dbmock.NewMockStore(ctrl) - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() now := database.Now() job := database.ProvisionerJob{ ID: uuid.New(), diff --git a/enterprise/replicasync/replicasync.go b/enterprise/replicasync/replicasync.go index 4b31b912ea673..a2bcb8837288e 100644 --- a/enterprise/replicasync/replicasync.go +++ b/enterprise/replicasync/replicasync.go @@ -20,6 +20,7 @@ import ( "github.com/coder/coder/buildinfo" "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbauthz" + "github.com/coder/coder/coderd/database/pubsub" ) var PubsubEvent = "replica" @@ -36,7 +37,7 @@ type Options struct { // New registers the replica with the database and periodically updates to ensure // it's healthy. It contacts all other alive replicas to ensure they are reachable. -func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub database.Pubsub, options *Options) (*Manager, error) { +func New(ctx context.Context, logger slog.Logger, db database.Store, ps pubsub.Pubsub, options *Options) (*Manager, error) { if options == nil { options = &Options{} } @@ -77,7 +78,7 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub data if err != nil { return nil, xerrors.Errorf("insert replica: %w", err) } - err = pubsub.Publish(PubsubEvent, []byte(options.ID.String())) + err = ps.Publish(PubsubEvent, []byte(options.ID.String())) if err != nil { return nil, xerrors.Errorf("publish new replica: %w", err) } @@ -86,7 +87,7 @@ func New(ctx context.Context, logger slog.Logger, db database.Store, pubsub data id: options.ID, options: options, db: db, - pubsub: pubsub, + pubsub: ps, self: replica, logger: logger, closed: make(chan struct{}), @@ -110,7 +111,7 @@ type Manager struct { id uuid.UUID options *Options db database.Store - pubsub database.Pubsub + pubsub pubsub.Pubsub logger slog.Logger closeWait sync.WaitGroup diff --git a/enterprise/replicasync/replicasync_test.go b/enterprise/replicasync/replicasync_test.go index f2c0eebd8cd5c..741be64fa12cc 100644 --- a/enterprise/replicasync/replicasync_test.go +++ b/enterprise/replicasync/replicasync_test.go @@ -18,6 +18,7 @@ import ( "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbfake" "github.com/coder/coder/coderd/database/dbtestutil" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/enterprise/replicasync" "github.com/coder/coder/testutil" ) @@ -212,7 +213,7 @@ func TestReplica(t *testing.T) { // this many PostgreSQL connections takes some // configuration tweaking. db := dbfake.New() - pubsub := database.NewPubsubInMemory() + pubsub := pubsub.NewInMemory() logger := slogtest.Make(t, nil) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) diff --git a/enterprise/tailnet/coordinator.go b/enterprise/tailnet/coordinator.go index c25a9c2f773f3..569f770881df4 100644 --- a/enterprise/tailnet/coordinator.go +++ b/enterprise/tailnet/coordinator.go @@ -16,13 +16,13 @@ import ( "golang.org/x/xerrors" "cdr.dev/slog" - "github.com/coder/coder/coderd/database" + "github.com/coder/coder/coderd/database/pubsub" agpl "github.com/coder/coder/tailnet" ) // NewCoordinator creates a new high availability coordinator // that uses PostgreSQL pubsub to exchange handshakes. -func NewCoordinator(logger slog.Logger, pubsub database.Pubsub) (agpl.Coordinator, error) { +func NewCoordinator(logger slog.Logger, pubsub pubsub.Pubsub) (agpl.Coordinator, error) { ctx, cancelFunc := context.WithCancel(context.Background()) nameCache, err := lru.New[uuid.UUID, string](512) @@ -53,7 +53,7 @@ type haCoordinator struct { id uuid.UUID log slog.Logger mutex sync.RWMutex - pubsub database.Pubsub + pubsub pubsub.Pubsub close chan struct{} closeFunc context.CancelFunc diff --git a/enterprise/tailnet/coordinator_test.go b/enterprise/tailnet/coordinator_test.go index cf85af4a5a565..bcc3ddca34d05 100644 --- a/enterprise/tailnet/coordinator_test.go +++ b/enterprise/tailnet/coordinator_test.go @@ -10,8 +10,8 @@ import ( "cdr.dev/slog/sloggers/slogtest" - "github.com/coder/coder/coderd/database" "github.com/coder/coder/coderd/database/dbtestutil" + "github.com/coder/coder/coderd/database/pubsub" "github.com/coder/coder/enterprise/tailnet" agpl "github.com/coder/coder/tailnet" "github.com/coder/coder/testutil" @@ -21,7 +21,7 @@ func TestCoordinatorSingle(t *testing.T) { t.Parallel() t.Run("ClientWithoutAgent", func(t *testing.T) { t.Parallel() - coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory()) + coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub.NewInMemory()) require.NoError(t, err) defer coordinator.Close() @@ -49,7 +49,7 @@ func TestCoordinatorSingle(t *testing.T) { t.Run("AgentWithoutClients", func(t *testing.T) { t.Parallel() - coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory()) + coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub.NewInMemory()) require.NoError(t, err) defer coordinator.Close() @@ -77,7 +77,7 @@ func TestCoordinatorSingle(t *testing.T) { t.Run("AgentWithClient", func(t *testing.T) { t.Parallel() - coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), database.NewPubsubInMemory()) + coordinator, err := tailnet.NewCoordinator(slogtest.Make(t, nil), pubsub.NewInMemory()) require.NoError(t, err) defer coordinator.Close() From 936cbe70f9669e6a0ff8bc9c6b1a4b35aa7a89d9 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Tue, 13 Jun 2023 22:16:02 +0000 Subject: [PATCH 4/4] Fix linting errors --- coderd/database/pubsub/pubsub_test.go | 8 ++++---- .../provisionerdserver_test.go | 8 ++++---- coderd/provisionerjobs.go | 4 ++-- coderd/provisionerjobs_internal_test.go | 16 ++++++++-------- enterprise/tailnet/coordinator.go | 4 ++-- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/coderd/database/pubsub/pubsub_test.go b/coderd/database/pubsub/pubsub_test.go index 49b4bd6203a60..d1f80fa5a1aed 100644 --- a/coderd/database/pubsub/pubsub_test.go +++ b/coderd/database/pubsub/pubsub_test.go @@ -171,12 +171,12 @@ func TestPubsub_ordering(t *testing.T) { db, err := sql.Open("postgres", connectionURL) require.NoError(t, err) defer db.Close() - pubsub, err := pubsub.New(ctx, db, connectionURL) + ps, err := pubsub.New(ctx, db, connectionURL) require.NoError(t, err) - defer pubsub.Close() + defer ps.Close() event := "test" messageChannel := make(chan []byte, 100) - cancelSub, err := pubsub.Subscribe(event, func(ctx context.Context, message []byte) { + cancelSub, err := ps.Subscribe(event, func(ctx context.Context, message []byte) { // sleep a random amount of time to simulate handlers taking different amount of time // to process, depending on the message // nolint: gosec @@ -187,7 +187,7 @@ func TestPubsub_ordering(t *testing.T) { require.NoError(t, err) defer cancelSub() for i := 0; i < 100; i++ { - err = pubsub.Publish(event, []byte(fmt.Sprintf("%d", i))) + err = ps.Publish(event, []byte(fmt.Sprintf("%d", i))) assert.NoError(t, err) } for i := 0; i < 100; i++ { diff --git a/coderd/provisionerdserver/provisionerdserver_test.go b/coderd/provisionerdserver/provisionerdserver_test.go index 86a834cf0780c..6b881210b3f6a 100644 --- a/coderd/provisionerdserver/provisionerdserver_test.go +++ b/coderd/provisionerdserver/provisionerdserver_test.go @@ -52,14 +52,14 @@ func TestAcquireJob(t *testing.T) { t.Run("Debounce", func(t *testing.T) { t.Parallel() db := dbfake.New() - pubsub := pubsub.NewInMemory() + ps := pubsub.NewInMemory() srv := &provisionerdserver.Server{ ID: uuid.New(), Logger: slogtest.Make(t, nil), AccessURL: &url.URL{}, Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, Database: db, - Pubsub: pubsub, + Pubsub: ps, Telemetry: telemetry.NewNoop(), AcquireJobDebounce: time.Hour, Auditor: mockAuditor(), @@ -1257,7 +1257,7 @@ func TestInsertWorkspaceResource(t *testing.T) { func setup(t *testing.T, ignoreLogErrors bool) *provisionerdserver.Server { t.Helper() db := dbfake.New() - pubsub := pubsub.NewInMemory() + ps := pubsub.NewInMemory() return &provisionerdserver.Server{ ID: uuid.New(), @@ -1266,7 +1266,7 @@ func setup(t *testing.T, ignoreLogErrors bool) *provisionerdserver.Server { AccessURL: &url.URL{}, Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho}, Database: db, - Pubsub: pubsub, + Pubsub: ps, Telemetry: telemetry.NewNoop(), Auditor: mockAuditor(), TemplateScheduleStore: testTemplateScheduleStore(), diff --git a/coderd/provisionerjobs.go b/coderd/provisionerjobs.go index 67b3db5d9ce05..3926d353d1017 100644 --- a/coderd/provisionerjobs.go +++ b/coderd/provisionerjobs.go @@ -282,14 +282,14 @@ type logFollower struct { } func newLogFollower( - ctx context.Context, logger slog.Logger, db database.Store, pubsub pubsub.Pubsub, + ctx context.Context, logger slog.Logger, db database.Store, ps pubsub.Pubsub, rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob, after int64, ) *logFollower { return &logFollower{ ctx: ctx, logger: logger, db: db, - pubsub: pubsub, + pubsub: ps, r: r, rw: rw, jobID: job.ID, diff --git a/coderd/provisionerjobs_internal_test.go b/coderd/provisionerjobs_internal_test.go index 5df9c55d61689..acbf303efc957 100644 --- a/coderd/provisionerjobs_internal_test.go +++ b/coderd/provisionerjobs_internal_test.go @@ -139,7 +139,7 @@ func Test_logFollower_completeBeforeFollow(t *testing.T) { logger := slogtest.Make(t, nil) ctrl := gomock.NewController(t) mDB := dbmock.NewMockStore(ctrl) - pubsub := pubsub.NewInMemory() + ps := pubsub.NewInMemory() now := database.Now() job := database.ProvisionerJob{ ID: uuid.New(), @@ -158,7 +158,7 @@ func Test_logFollower_completeBeforeFollow(t *testing.T) { // we need an HTTP server to get a websocket srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - uut := newLogFollower(ctx, logger, mDB, pubsub, rw, r, job, 10) + uut := newLogFollower(ctx, logger, mDB, ps, rw, r, job, 10) uut.follow() })) defer srv.Close() @@ -201,7 +201,7 @@ func Test_logFollower_completeBeforeSubscribe(t *testing.T) { logger := slogtest.Make(t, nil) ctrl := gomock.NewController(t) mDB := dbmock.NewMockStore(ctrl) - pubsub := pubsub.NewInMemory() + ps := pubsub.NewInMemory() now := database.Now() job := database.ProvisionerJob{ ID: uuid.New(), @@ -218,7 +218,7 @@ func Test_logFollower_completeBeforeSubscribe(t *testing.T) { // we need an HTTP server to get a websocket srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - uut := newLogFollower(ctx, logger, mDB, pubsub, rw, r, job, 0) + uut := newLogFollower(ctx, logger, mDB, ps, rw, r, job, 0) uut.follow() })) defer srv.Close() @@ -277,7 +277,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) { logger := slogtest.Make(t, nil) ctrl := gomock.NewController(t) mDB := dbmock.NewMockStore(ctrl) - pubsub := pubsub.NewInMemory() + ps := pubsub.NewInMemory() now := database.Now() job := database.ProvisionerJob{ ID: uuid.New(), @@ -294,7 +294,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) { // we need an HTTP server to get a websocket srv := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - uut := newLogFollower(ctx, logger, mDB, pubsub, rw, r, job, 0) + uut := newLogFollower(ctx, logger, mDB, ps, rw, r, job, 0) uut.follow() })) defer srv.Close() @@ -343,7 +343,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) { } msg, err = json.Marshal(&n) require.NoError(t, err) - err = pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg) + err = ps.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg) require.NoError(t, err) mt, msg, err = client.Read(ctx) @@ -361,7 +361,7 @@ func Test_logFollower_EndOfLogs(t *testing.T) { n.CreatedAfter = 0 msg, err = json.Marshal(&n) require.NoError(t, err) - err = pubsub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg) + err = ps.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(job.ID), msg) require.NoError(t, err) // server should now close diff --git a/enterprise/tailnet/coordinator.go b/enterprise/tailnet/coordinator.go index 569f770881df4..b0d9cfa64032f 100644 --- a/enterprise/tailnet/coordinator.go +++ b/enterprise/tailnet/coordinator.go @@ -22,7 +22,7 @@ import ( // NewCoordinator creates a new high availability coordinator // that uses PostgreSQL pubsub to exchange handshakes. -func NewCoordinator(logger slog.Logger, pubsub pubsub.Pubsub) (agpl.Coordinator, error) { +func NewCoordinator(logger slog.Logger, ps pubsub.Pubsub) (agpl.Coordinator, error) { ctx, cancelFunc := context.WithCancel(context.Background()) nameCache, err := lru.New[uuid.UUID, string](512) @@ -33,7 +33,7 @@ func NewCoordinator(logger slog.Logger, pubsub pubsub.Pubsub) (agpl.Coordinator, coord := &haCoordinator{ id: uuid.New(), log: logger, - pubsub: pubsub, + pubsub: ps, closeFunc: cancelFunc, close: make(chan struct{}), nodes: map[uuid.UUID]*agpl.Node{},