diff --git a/driver/sql/projector_aggregate.go b/driver/sql/projector_aggregate.go index 0f79b95..f1244e6 100644 --- a/driver/sql/projector_aggregate.go +++ b/driver/sql/projector_aggregate.go @@ -38,6 +38,7 @@ func NewAggregateProjector( logger goengine.Logger, metrics Metrics, retryDelay time.Duration, + noOfProcessors int, ) (*AggregateProjector, error) { switch { case db == nil: @@ -60,8 +61,9 @@ func NewAggregateProjector( logger = logger.WithFields(func(e goengine.LoggerEntry) { e.String("projection", projection.Name()) }) - - processor, err := NewBackgroundProcessor(10, 32, logger, metrics, nil) + queueBuffer := 32 + notificationQueue := newNotificationQueue(queueBuffer, retryDelay, metrics) + processor, err := NewBackgroundProcessor(noOfProcessors, queueBuffer, logger, metrics, notificationQueue) if err != nil { return nil, err } diff --git a/strategy/json/sql/postgres/manager.go b/strategy/json/sql/postgres/manager.go index f77dfaf..c9ebc06 100644 --- a/strategy/json/sql/postgres/manager.go +++ b/strategy/json/sql/postgres/manager.go @@ -122,6 +122,7 @@ func (m *SingleStreamManager) NewAggregateProjector( projectionErrorHandler driverSQL.ProjectionErrorCallback, useLockedField bool, retryDelay time.Duration, + noOfProcessors int, ) (*driverSQL.AggregateProjector, error) { eventStore, err := m.NewEventStore() if err != nil { @@ -154,5 +155,6 @@ func (m *SingleStreamManager) NewAggregateProjector( m.logger, m.metrics, retryDelay, + noOfProcessors, ) }