8000 feat: implement thin vertical slice of system-generated notifications by dannykopping · Pull Request #13537 · coder/coder · GitHub
[go: up one dir, main page]

Skip to content

feat: implement thin vertical slice of system-generated notifications #13537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
53c9cbb
feat: system-generated notifications
dannykopping Jun 11, 2024
4856aed
Fixing lint errors & minor tests
dannykopping Jun 11, 2024
cda6efb
Fixing dbauthz test
dannykopping Jun 11, 2024
86f937a
TestBufferedUpdates does not need a real db, altering test details sl…
dannykopping Jun 11, 2024
e8f1af2
Correct TestBufferedUpdates to count updated entries, use real db again
dannykopping Jun 12, 2024
a056f54
Use UUID for notifier IDs
dannykopping Jun 27, 2024
8c64d30
Small improvements from review suggestions
dannykopping Jun 27, 2024
ac149ec
Protect notifiers from modification during Stop()
dannykopping Jun 27, 2024
884fadf
Split out enqueuer as separate responsibility, get rid of singleton
dannykopping Jun 28, 2024
4e362e7
Remove unnecessary handler registry
dannykopping Jun 28, 2024
8097290
Remove unused context
dannykopping Jun 28, 2024
1b841ad
Centralise markdown rendering
dannykopping Jun 28, 2024
61f5bd6
Appease the linter
dannykopping Jun 28, 2024
3c8e33b
Only enqueue notification when not initiated by self
dannykopping Jul 1, 2024
757327c
Hide config flags which are unlikely to be modified by operators
dannykopping Jul 1, 2024
6f909ae
Remove unnecessary Labels struct
dannykopping Jul 1, 2024
36698c5
Enable experiment as safe
dannykopping Jul 1, 2024
c5701a6
Correcting bad refactor
dannykopping Jul 1, 2024
9d4c312
Initialize Enqueuer on API startup
dannykopping Jul 1, 2024
9380d8e
Only start one notifier since all dispatches are concurrent anyway
dannykopping Jul 1, 2024
4b7214d
Fix docs
dannykopping Jul 1, 2024
6679ef1
Fix lint error
dannykopping Jul 1, 2024
337997d
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykopping Jul 2, 2024
ba5f7c6
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykopping Jul 3, 2024
0f29293
Review feedback
dannykopping Jul 3, 2024
7c6c486
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykopping Jul 4, 2024
c6e75c2
Fix lint failures
dannykopping Jul 4, 2024
aff9e6c
Review comments
dannykopping Jul 4, 2024
613e074
Avoid race by exposing number of pending updates
dannykopping Jul 4, 2024
faea7fc
Merge branch 'main' of github.com:/coder/coder into dk/system-notific…
dannykopping Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Split out enqueuer as separate responsibility, get rid of singleton
Signed-off-by: Danny Kopping <danny@coder.com>
  • Loading branch information
dannykopping committed Jun 28, 2024
commit 884fadf9469962645ae46e60b71f02e3905b2a6f
22 changes: 16 additions & 6 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
SSHConfigOptions: configSSHOptions,
},
AllowWorkspaceRenames: vals.AllowWorkspaceRenames.Value(),
NotificationsEnqueuer: notifications.NewNoopEnqueuer(), // Changed further down if notifications enabled.
}
if httpServers.TLSConfig != nil {
options.TLSCertificates = httpServers.TLSConfig.Certificates
Expand Down Expand Up @@ -976,20 +977,29 @@ func (r *RootCmd) Server(newAPI func(context.Context, *coderd.Options) (*coderd.
defer tracker.Close()

// Manage notifications.
var notificationsManager *notifications.Manager
var (
notificationsManager *notifications.Manager
)
if experiments.Enabled(codersdk.ExperimentNotifications) {
cfg := options.DeploymentValues.Notifications
nlog := logger.Named("notifications-manager")
notificationsManager, err = notifications.NewManager(cfg, options.Database, nlog, templateHelpers(options))

// The enqueuer is responsible for enqueueing notifications to the given store.
enqueuer, err := notifications.NewStoreEnqueuer(cfg, options.Database, templateHelpers(options), logger.Named("notifications.enqueuer"))
if err != nil {
return xerrors.Errorf("failed to instantiate notification store enqueuer: %w", err)
}
options.NotificationsEnqueuer = enqueuer

// The notification manager is responsible for:
// - creating notifiers and managing their lifecycles (notifiers are responsible for dequeueing/sending notifications)
// - keeping the store updated with status updates
notificationsManager, err = notifications.NewManager(cfg, options.Database, logger.Named("notifications.manager"))
if err != nil {
return xerrors.Errorf("failed to instantiate notification manager: %w", err)
}

// nolint:gocritic // TODO: create own role.
notificationsManager.Run(dbauthz.AsSystemRestricted(ctx), int(cfg.WorkerCount.Value()))
notifications.RegisterInstance(notificationsManager)
} else {
notifications.RegisterInstance(notifications.NewNoopManager())
}

// Wrap the server in middleware that redirects to the access URL if
Expand Down
4 changes: 4 additions & 0 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/metricscache"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/portsharing"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/provisionerdserver"
Expand Down Expand Up @@ -205,6 +206,8 @@ type Options struct {
DatabaseRolluper *dbrollup.Rolluper
// WorkspaceUsageTracker tracks workspace usage by the CLI.
WorkspaceUsageTracker *workspacestats.UsageTracker
// NotificationsEnqueuer handles enqueueing notifications for delivery by SMTP, webhook, etc.
NotificationsEnqueuer notifications.Enqueuer
}

// @title Coder API
Expand Down Expand Up @@ -1444,6 +1447,7 @@ func (api *API) CreateInMemoryTaggedProvisionerDaemon(dialCtx context.Context, n
OIDCConfig: api.OIDCConfig,
ExternalAuthConfigs: api.ExternalAuthConfigs,
},
api.NotificationsEnqueuer,
)
if err != nil {
return nil, err
Expand Down
128 changes: 128 additions & 0 deletions coderd/notifications/enqueuer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package notifications

import (
"context"
"encoding/json"
"text/template"

"github.com/google/uuid"
"golang.org/x/xerrors"

"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/notifications/render"
"github.com/coder/coder/v2/coderd/notifications/types"
"github.com/coder/coder/v2/codersdk"
)

type StoreEnqueuer struct {
store Store
log slog.Logger

// TODO: expand this to allow for each notification to have custom delivery methods, or multiple, or none.
// For example, Larry might want email notifications for "workspace deleted" notifications, but Harry wants
// Slack notifications, and Mary doesn't want any.
method database.NotificationMethod
// helpers holds a map of template funcs which are used when rendering templates. These need to be passed in because
// the template funcs will return values which are inappropriately encapsulated in this struct.
helpers template.FuncMap
}

// NewStoreEnqueuer creates an Enqueuer implementation which can persist notification messages in the store.
func NewStoreEnqueuer(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, log slog.Logger) (*StoreEnqueuer, error) {
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method)
}

return &StoreEnqueuer{
store: store,
log: log,
method: method,
helpers: helpers,
}, nil
}

// Enqueue queues a notification message for later delivery.
// Messages will be dequeued by a notifier later and dispatched.
func (s *StoreEnqueuer) Enqueue(ctx context.Context, userID, templateID uuid.UUID, labels map[string]string, createdBy string, targets ...uuid.UUID) (*uuid.UUID, error) {
payload, err := s.buildPayload(ctx, userID, templateID, labels)
if err != nil {
s.log.Warn(ctx, "failed to build payload", slog.F("template_id", templateID), slog.F("user_id", userID), slog.Error(err))
return nil, xerrors.Errorf("enqueue notification (payload build): %w", err)
}

input, err := json.Marshal(payload)
if err != nil {
return nil, xerrors.Errorf("failed encoding input labels: %w", err)
}

id := uuid.New()
msg, err := s.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
ID: id,
UserID: userID,
NotificationTemplateID: templateID,
Method: s.method,
Payload: input,
Targets: targets,
CreatedBy: createdBy,
})
if err != nil {
s.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
return nil, xerrors.Errorf("enqueue notification: %w", err)
}

s.log.Debug(ctx, "enqueued notification", slog.F("msg_id", msg.ID))
return &id, nil
}

// buildPayload creates the payload that the notification will for variable substitution and/or routing.
// The payload contains information about the recipient, the event that triggered the notification, and any subsequent
// actions which can be taken by the recipient.
func (s *StoreEnqueuer) buildPayload(ctx context.Context, userID uuid.UUID, templateID uuid.UUID, labels map[string]string) (*types.MessagePayload, error) {
metadata, err := s.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
UserID: userID,
NotificationTemplateID: templateID,
})
if err != nil {
return nil, xerrors.Errorf("new message metadata: %w", err)
}

// Execute any templates in actions.
out, err := render.GoTemplate(string(metadata.Actions), types.MessagePayload{}, s.helpers)
if err != nil {
return nil, xerrors.Errorf("render actions: %w", err)
}
metadata.Actions = []byte(out)

var actions []types.TemplateAction
if err = json.Unmarshal(metadata.Actions, &actions); err != nil {
return nil, xerrors.Errorf("new message metadata: parse template actions: %w", err)
}

return &types.MessagePayload{
Version: "1.0",

NotificationName: metadata.NotificationName,

UserID: metadata.UserID.String(),
UserEmail: metadata.UserEmail,
UserName: metadata.UserName,

Actions: actions,
Labels: labels,
}, nil
}

// NoopEnqueuer implements the Enqueuer interface but performs a noop.
type NoopEnqueuer struct{}

// NewNoopEnqueuer builds a NoopEnqueuer which is used to fulfill the contract for enqueuing notifications, if ExperimentNotifications is not set.
func NewNoopEnqueuer() *NoopEnqueuer {
return &NoopEnqueuer{}
}

func (*NoopEnqueuer) Enqueue(context.Context, uuid.UUID, uuid.UUID, map[string]string, string, ...uuid.UUID) (*uuid.UUID, error) {
// nolint:nilnil // irrelevant.
return nil, nil
}
100 changes: 6 additions & 94 deletions coderd/notifications/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,18 @@ package notifications

import (
"context"
"encoding/json"
"sync"
"text/template"
"time"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/notifications/render"
"github.com/coder/coder/v2/codersdk"
"cdr.dev/slog"

"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
"github.com/coder/coder/v2/coderd/notifications/types"

"cdr.dev/slog"
"github.com/coder/coder/v2/codersdk"
)

// Manager manages all notifications being enqueued and dispatched.
Expand All @@ -40,10 +35,6 @@ import (
// will need an alternative mechanism for handling backpressure.
type Manager struct {
cfg codersdk.NotificationsConfig
// TODO: expand this to allow for each notification to have custom delivery methods, or multiple, or none.
// For example, Larry might want email notifications for "workspace deleted" notifications, but Harry wants
// Slack notifications, and Mary doesn't want any.
method database.NotificationMethod

store Store
log slog.Logger
Expand All @@ -52,7 +43,6 @@ type Manager struct {
notifierMu sync.Mutex

handlers *HandlerRegistry
helpers template.FuncMap

stopOnce sync.Once
stop chan any
Expand All @@ -63,23 +53,16 @@ type Manager struct {
//
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
// access URL e 10000 tc.
func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger, helpers map[string]any) (*Manager, error) {
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
return nil, xerrors.Errorf("given notification method %q is invalid", cfg.Method)
}

func NewManager(cfg codersdk.NotificationsConfig, store Store, log slog.Logger) (*Manager, error) {
return &Manager{
log: log,
cfg: cfg,
store: store,
method: method,
log: log,
cfg: cfg,
store: store,

stop: make(chan any),
done: make(chan any),

handlers: defaultHandlers(cfg, log),
helpers: helpers,
}, nil
}

Expand Down Expand Up @@ -200,77 +183,6 @@ func (m *Manager) loop(ctx context.Context, notifiers int) error {
return err
}

// Enqueue queues a notification message for later delivery.
// Messages will be dequeued by a notifier later and dispatched.
func (m *Manager) Enqueue(ctx context.Context, userID, templateID uuid.UUID, labels types.Labels, createdBy string, targets ...uuid.UUID) (*uuid.UUID, error) {
payload, err := m.buildPayload(ctx, userID, templateID, labels)
if err != nil {
m.log.Warn(ctx, "failed to build payload", slog.F("template_id", templateID), slog.F("user_id", userID), slog.Error(err))
return nil, xerrors.Errorf("enqueue notification (payload build): %w", err)
}

input, err := json.Marshal(payload)
if err != nil {
return nil, xerrors.Errorf("failed encoding input labels: %w", err)
}

id := uuid.New()
msg, err := m.store.EnqueueNotificationMessage(ctx, database.EnqueueNotificationMessageParams{
ID: id,
UserID: userID,
NotificationTemplateID: templateID,
Method: m.method,
Payload: input,
Targets: targets,
CreatedBy: createdBy,
})
if err != nil {
m.log.Warn(ctx, "failed to enqueue notification", slog.F("template_id", templateID), slog.F("input", input), slog.Error(err))
return nil, xerrors.Errorf("enqueue notification: %w", err)
}

m.log.Debug(ctx, "enqueued notification", slog.F("msg_id", msg.ID))
return &id, nil
}

// buildPayload creates the payload that the notification will for variable substitution and/or routing.
// The payload contains information about the recipient, the event that triggered the notification, and any subsequent
// actions which can be taken by the recipient.
func (m *Manager) buildPayload(ctx context.Context, userID uuid.UUID, templateID uuid.UUID, labels types.Labels) (*types.MessagePayload, error) {
metadata, err := m.store.FetchNewMessageMetadata(ctx, database.FetchNewMessageMetadataParams{
UserID: userID,
NotificationTemplateID: templateID,
})
if err != nil {
return nil, xerrors.Errorf("new message metadata: %w", err)
}

// Execute any templates in actions.
out, err := render.GoTemplate(string(metadata.Actions), types.MessagePayload{}, m.helpers)
if err != nil {
return nil, xerrors.Errorf("render actions: %w", err)
}
metadata.Actions = []byte(out)

var actions []types.TemplateAction
if err = json.Unmarshal(metadata.Actions, &actions); err != nil {
return nil, xerrors.Errorf("new message metadata: parse template actions: %w", err)
}

return &types.MessagePayload{
Version: "1.0",

NotificationName: metadata.NotificationName,

UserID: metadata.UserID.String(),
UserEmail: metadata.UserEmail,
UserName: metadata.UserName,

Actions: actions,
Labels: labels,
}, nil
}

// bulkUpdate updates messages in the store based on the given successful and failed message dispatch results.
func (m *Manager) bulkUpdate(ctx context.Context, success, failure <-chan dispatchResult) {
select {
Expand Down
Loading
0