From 336461d1c1a6e28a4c647160f6ca63c6ffd02658 Mon Sep 17 00:00:00 2001 From: Muhammad Bilal Javed Date: Thu, 19 Sep 2019 15:35:28 +0200 Subject: [PATCH 1/4] Temp changes for load testing decrease number of processors set delayRetry time passed --- driver/sql/projector_aggregate.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/driver/sql/projector_aggregate.go b/driver/sql/projector_aggregate.go index 0f79b95..db22559 100644 --- a/driver/sql/projector_aggregate.go +++ b/driver/sql/projector_aggregate.go @@ -60,8 +60,9 @@ func NewAggregateProjector( logger = logger.WithFields(func(e goengine.LoggerEntry) { e.String("projection", projection.Name()) }) - - processor, err := NewBackgroundProcessor(10, 32, logger, metrics, nil) + queueBuffer := 32 + notificationQueue := newNotificationQueue(queueBuffer, retryDelay, metrics) + processor, err := NewBackgroundProcessor(5, queueBuffer, logger, metrics, notificationQueue) if err != nil { return nil, err } From b3fedd195bfbaea0cbdb7c39a45c44809a3efe46 Mon Sep 17 00:00:00 2001 From: Muhammad Bilal Javed Date: Fri, 20 Sep 2019 10:23:31 +0200 Subject: [PATCH 2/4] Increase no of processors --- driver/sql/projector_aggregate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/driver/sql/projector_aggregate.go b/driver/sql/projector_aggregate.go index db22559..c188243 100644 --- a/driver/sql/projector_aggregate.go +++ b/driver/sql/projector_aggregate.go @@ -62,7 +62,7 @@ func NewAggregateProjector( }) queueBuffer := 32 notificationQueue := newNotificationQueue(queueBuffer, retryDelay, metrics) - processor, err := NewBackgroundProcessor(5, queueBuffer, logger, metrics, notificationQueue) + processor, err := NewBackgroundProcessor(10, queueBuffer, logger, metrics, notificationQueue) if err != nil { return nil, err } From 243b95e57865cbd26ff93bc495a367bfcf9d09d9 Mon Sep 17 00:00:00 2001 From: Muhammad Bilal Javed Date: Fri, 20 Sep 2019 15:04:21 +0200 Subject: [PATCH 3/4] Pass no. of procesors in constructor --- driver/sql/projector_aggregate.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/driver/sql/projector_aggregate.go b/driver/sql/projector_aggregate.go index c188243..f1244e6 100644 --- a/driver/sql/projector_aggregate.go +++ b/driver/sql/projector_aggregate.go @@ -38,6 +38,7 @@ func NewAggregateProjector( logger goengine.Logger, metrics Metrics, retryDelay time.Duration, + noOfProcessors int, ) (*AggregateProjector, error) { switch { case db == nil: @@ -62,7 +63,7 @@ func NewAggregateProjector( }) queueBuffer := 32 notificationQueue := newNotificationQueue(queueBuffer, retryDelay, metrics) - processor, err := NewBackgroundProcessor(10, queueBuffer, logger, metrics, notificationQueue) + processor, err := NewBackgroundProcessor(noOfProcessors, queueBuffer, logger, metrics, notificationQueue) if err != nil { return nil, err } From 09e5e0b1349dce38cf2e77217098b3e4faf0f65a Mon Sep 17 00:00:00 2001 From: Muhammad Bilal Javed Date: Fri, 20 Sep 2019 15:06:40 +0200 Subject: [PATCH 4/4] Pass no of processors --- strategy/json/sql/postgres/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/strategy/json/sql/postgres/manager.go b/strategy/json/sql/postgres/manager.go index f77dfaf..c9ebc06 100644 --- a/strategy/json/sql/postgres/manager.go +++ b/strategy/json/sql/postgres/manager.go @@ -122,6 +122,7 @@ func (m *SingleStreamManager) NewAggregateProjector( projectionErrorHandler driverSQL.ProjectionErrorCallback, useLockedField bool, retryDelay time.Duration, + noOfProcessors int, ) (*driverSQL.AggregateProjector, error) { eventStore, err := m.NewEventStore() if err != nil { @@ -154,5 +155,6 @@ func (m *SingleStreamManager) NewAggregateProjector( m.logger, m.metrics, retryDelay, + noOfProcessors, ) }