From b25ec6fa1a3b393d61b37d942f927fecdd49a450 Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Thu, 17 Oct 2019 17:30:46 +0500 Subject: [PATCH 1/6] refact: Remove mutex from batch event processor --- .rubocop.yml | 3 + lib/optimizely/event/batch_event_processor.rb | 37 +- spec/event/batch_event_processor_spec.rb | 505 +++++++++--------- 3 files changed, 264 insertions(+), 281 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index f9ae7181..d05c3a81 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -47,3 +47,6 @@ Style/SignalException: Lint/RescueException: Enabled: false + +Layout/EndOfLine: + EnforcedStyle: lf diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 62f43045..99e7d0e7 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -58,8 +58,6 @@ def initialize( DEFAULT_BATCH_INTERVAL end @notification_center = notification_center - @mutex = Mutex.new - @received = ConditionVariable.new @current_batch = [] @started = false start! @@ -76,10 +74,7 @@ def start! end def flush - @mutex.synchronize do - @event_queue << FLUSH_SIGNAL - @received.signal - end + @event_queue << FLUSH_SIGNAL end def process(user_event) @@ -90,28 +85,21 @@ def process(user_event) return end - @mutex.synchronize do - begin - @event_queue << user_event - @received.signal - rescue Exception - @logger.log(Logger::WARN, 'Payload not accepted by the queue.') - return - end + begin + @event_queue << user_event + rescue Exception + @logger.log(Logger::WARN, 'Payload not accepted by the queue.') + return end end def stop! return unless @started - @mutex.synchronize do - @event_queue << SHUTDOWN_SIGNAL - @received.signal - end - - @started = false @logger.log(Logger::WARN, 'Stopping scheduler.') - @thread.exit + @event_queue << SHUTDOWN_SIGNAL + @thread.join + @started = false end private @@ -126,12 +114,7 @@ def run flush_queue! end - item = nil - - @mutex.synchronize do - @received.wait(@mutex, 0.05) - item = @event_queue.pop if @event_queue.length.positive? - end + item = @event_queue.pop if @event_queue.length.positive? if item.nil? sleep(0.05) diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 172d528a..b9a5043d 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -43,15 +43,6 @@ allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler) allow(@notification_center).to receive(:send_notifications) - - @event_processor = Optimizely::BatchEventProcessor.new( - event_queue: @event_queue, - event_dispatcher: @event_dispatcher, - batch_size: MAX_BATCH_SIZE, - flush_interval: MAX_DURATION_MS, - logger: spy_logger, - notification_center: @notification_center - ) end after(:example) do @@ -59,271 +50,277 @@ @event_queue.clear end - it 'should log waring when service is already started' do - @event_processor.start! - expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once - end - - it 'return return empty event queue and dispatch log event when event is processed' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - @event_processor.process(conversion_event) - sleep 1.5 - - expect(@event_processor.event_queue.length).to eq(0) - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - end - - it 'should flush the current batch when deadline exceeded' do - user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - logger = spy('logger') - event_processor = Optimizely::BatchEventProcessor.new( - event_queue: @event_queue, - event_dispatcher: @event_dispatcher, - batch_size: MAX_BATCH_SIZE, - flush_interval: MAX_DURATION_MS * 3, - logger: logger - ) - sleep 0.025 - event_processor.process(user_event) - - sleep 1 - expect(event_processor.event_queue.length).to eq(0) - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - end - - it 'should flush the current batch when max batch size' do - allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) - expected_batch = [] - counter = 0 - until counter >= 11 - event['key'] = event['key'] + counter.to_s - user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - expected_batch << user_event - @event_processor.process(user_event) - counter += 1 - end - - sleep 1 + # it 'should log waring when service is already started' do + # @event_processor.start! + # expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once + # end + + # it 'return return empty event queue and dispatch log event when event is processed' do + # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + # @event_processor.process(conversion_event) + # sleep 1.5 + + # expect(@event_processor.event_queue.length).to eq(0) + # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + # end + + # it 'should flush the current batch when deadline exceeded' do + # user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # logger = spy('logger') + # event_processor = Optimizely::BatchEventProcessor.new( + # event_queue: @event_queue, + # event_dispatcher: @event_dispatcher, + # batch_size: MAX_BATCH_SIZE, + # flush_interval: MAX_DURATION_MS * 3, + # logger: logger + # ) + # sleep 0.025 + # event_processor.process(user_event) + + # sleep 1 + # expect(event_processor.event_queue.length).to eq(0) + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + # end + + # it 'should flush the current batch when max batch size' do + # allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) + # expected_batch = [] + # counter = 0 + # until counter >= 11 + # event['key'] = event['key'] + counter.to_s + # user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # expected_batch << user_event + # @event_processor.process(user_event) + # counter += 1 + # end + + # sleep 1 + + # expected_batch.pop # Removes 11th element + # expect(@event_processor.current_batch.size).to be 10 + + # expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).twice + # expect(@event_dispatcher).to have_received(:dispatch_event).with( + # Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) + # ).twice + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once + # end + + # it 'should dispatch the event when flush is called' do + # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + # event_processor = Optimizely::BatchEventProcessor.new( + # event_queue: @event_queue, + # event_dispatcher: @event_dispatcher, + # batch_size: MAX_BATCH_SIZE, + # flush_interval: MAX_DURATION_MS / 2, + # logger: spy_logger + # ) + + # event_processor.process(conversion_event) + # event_processor.flush + # sleep 1.5 + + # event_processor.process(conversion_event) + # event_processor.flush + # sleep 1.5 + + # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice + + # expect(event_processor.event_queue.length).to eq(0) + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received flush signal.').twice + # end + + # it 'should flush on mismatch revision' do + # allow(project_config).to receive(:revision).and_return('1', '2') + # user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) + + # expect(user_event1.event_context[:revision]).to eq('1') + # @event_processor.process(user_event1) + + # expect(user_event2.event_context[:revision]).to eq('2') + # @event_processor.process(user_event2) + + # sleep 0.25 + + # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once + # expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + # end + + # it 'should flush on mismatch project id' do + # allow(project_config).to receive(:project_id).and_return('X', 'Y') + # user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) + + # expect(user_event1.event_context[:project_id]).to eq('X') + # @event_processor.process(user_event1) + + # expect(user_event2.event_context[:project_id]).to eq('Y') + # @event_processor.process(user_event2) + + # sleep 0.25 + + # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once + # expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + # end + + # it 'should process and halt event when start or stop are called' do + # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + # @event_processor.process(conversion_event) + # sleep 1.75 + # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + + # @event_processor.stop! + # @event_processor.process(conversion_event) + # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + # @event_processor.start! + # @event_processor.stop! + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.').at_least(1).times + # end + + # it 'should not dispatch event when close is called during process' do + # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + + # @event_processor.process(conversion_event) + # @event_processor.stop! + # expect(@event_dispatcher).not_to have_received(:dispatch_event) + # end + + # it 'should set default batch size when provided invalid' do + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) + # expect(event_processor.batch_size).to eq(10) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 'test', logger: spy_logger) + # expect(event_processor.batch_size).to eq(10) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: [], logger: spy_logger) + # expect(event_processor.batch_size).to eq(10) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 0, logger: spy_logger) + # expect(event_processor.batch_size).to eq(10) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: -5, logger: spy_logger) + # expect(event_processor.batch_size).to eq(10) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5.5, logger: spy_logger) + # expect(event_processor.batch_size).to eq(10) + # event_processor.stop! + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default batch_size: 10.').exactly(5).times + # end + + # it 'should set batch size when provided valid' do + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5) + # expect(event_processor.batch_size).to eq(5) + # event_processor.stop! + # end + + # it 'should set default flush interval when provided invalid' do + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) + # expect(event_processor.flush_interval).to eq(30_000) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger) + # expect(event_processor.flush_interval).to eq(30_000) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger) + # expect(event_processor.flush_interval).to eq(30_000) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger) + # expect(event_processor.flush_interval).to eq(30_000) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger) + # expect(event_processor.flush_interval).to eq(30_000) + # event_processor.stop! + # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times + # end + + # it 'should set flush interval when provided valid' do + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000) + # expect(event_processor.flush_interval).to eq(2000) + # event_processor.stop! + # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000.5) + # expect(event_processor.flush_interval).to eq(2000.5) + # event_processor.stop! + # end + + # it 'should send log event notification when event is dispatched' do + # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + # @event_processor.process(conversion_event) + # sleep 1.5 + + # expect(@notification_center).to have_received(:send_notifications).with( + # Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + # log_event + # ).once + + # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + # end + + # it 'should log an error when dispatch event raises timeout exception' do + # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + # allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) + + # timeout_error = Timeout::Error.new + # allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) + + # @event_processor.process(conversion_event) + # sleep 1.5 + + # expect(@notification_center).not_to have_received(:send_notifications) + + # expect(spy_logger).to have_received(:log).once.with( + # Logger::ERROR, + # "Error dispatching event: #{log_event} Timeout::Error." + # ) + # end - expected_batch.pop # Removes 11th element - expect(@event_processor.current_batch.size).to be 10 - - expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).twice - expect(@event_dispatcher).to have_received(:dispatch_event).with( - Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) - ).twice - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once - end + it 'should flush pending events when stop is called' do + experiment = project_config.get_experiment_from_key('test_experiment') + impression_event = Optimizely::UserEventFactory.create_impression_event(project_config, experiment, '111128', 'test_user', nil) + log_event = Optimizely::EventFactory.create_log_event(impression_event, spy_logger) - it 'should dispatch the event when flush is called' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args).and_return(log_event) - event_processor = Optimizely::BatchEventProcessor.new( + @event_processor = Optimizely::BatchEventProcessor.new( event_queue: @event_queue, event_dispatcher: @event_dispatcher, - batch_size: MAX_BATCH_SIZE, - flush_interval: MAX_DURATION_MS / 2, - logger: spy_logger - ) - - event_processor.process(conversion_event) - event_processor.flush - sleep 1.5 - - event_processor.process(conversion_event) - event_processor.flush - sleep 1.5 - - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice - - expect(event_processor.event_queue.length).to eq(0) - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received flush signal.').twice - end - - it 'should flush on mismatch revision' do - allow(project_config).to receive(:revision).and_return('1', '2') - user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) - - expect(user_event1.event_context[:revision]).to eq('1') - @event_processor.process(user_event1) - - expect(user_event2.event_context[:revision]).to eq('2') - @event_processor.process(user_event2) - - sleep 0.25 - - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once - expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - end - - it 'should flush on mismatch project id' do - allow(project_config).to receive(:project_id).and_return('X', 'Y') - user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) - - expect(user_event1.event_context[:project_id]).to eq('X') - @event_processor.process(user_event1) - - expect(user_event2.event_context[:project_id]).to eq('Y') - @event_processor.process(user_event2) - - sleep 0.25 - - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once - expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - end - - it 'should process and halt event when start or stop are called' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - @event_processor.process(conversion_event) - sleep 1.75 - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - - @event_processor.stop! - @event_processor.process(conversion_event) - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - @event_processor.start! - @event_processor.stop! - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.').at_least(1).times - end - - it 'should not dispatch event when close is called during process' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - - @event_processor.process(conversion_event) - @event_processor.stop! - expect(@event_dispatcher).not_to have_received(:dispatch_event) - end - - it 'should set default batch size when provided invalid' do - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) - expect(event_processor.batch_size).to eq(10) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 'test', logger: spy_logger) - expect(event_processor.batch_size).to eq(10) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: [], logger: spy_logger) - expect(event_processor.batch_size).to eq(10) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 0, logger: spy_logger) - expect(event_processor.batch_size).to eq(10) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: -5, logger: spy_logger) - expect(event_processor.batch_size).to eq(10) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5.5, logger: spy_logger) - expect(event_processor.batch_size).to eq(10) - event_processor.stop! - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default batch_size: 10.').exactly(5).times - end - - it 'should set batch size when provided valid' do - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5) - expect(event_processor.batch_size).to eq(5) - event_processor.stop! - end - - it 'should set default flush interval when provided invalid' do - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) - expect(event_processor.flush_interval).to eq(30_000) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger) - expect(event_processor.flush_interval).to eq(30_000) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger) - expect(event_processor.flush_interval).to eq(30_000) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger) - expect(event_processor.flush_interval).to eq(30_000) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger) - expect(event_processor.flush_interval).to eq(30_000) - event_processor.stop! - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times - end - - it 'should set flush interval when provided valid' do - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000) - expect(event_processor.flush_interval).to eq(2000) - event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000.5) - expect(event_processor.flush_interval).to eq(2000.5) - event_processor.stop! - end - - it 'should send log event notification when event is dispatched' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - @event_processor.process(conversion_event) - sleep 1.5 - - expect(@notification_center).to have_received(:send_notifications).with( - Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], - log_event - ).once - - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - end - - it 'should log an error when dispatch event raises timeout exception' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) - - timeout_error = Timeout::Error.new - allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) - - @event_processor.process(conversion_event) - sleep 1.5 - - expect(@notification_center).not_to have_received(:send_notifications) - - expect(spy_logger).to have_received(:log).once.with( - Logger::ERROR, - "Error dispatching event: #{log_event} Timeout::Error." + batch_size: 5, + flush_interval: 10_000, + logger: spy_logger, + notification_center: @notification_center ) - end - it 'should flush pending events when stop is called' do - allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) expected_batch = [] - counter = 0 - until counter >= 10 - event['key'] = event['key'] + counter.to_s + 4.times do user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) expected_batch << user_event @event_processor.process(user_event) - counter += 1 end - sleep 0.25 - - # max batch size not occurred and batch is not dispatched. - expect(@event_processor.current_batch.size).to be < 10 + # Wait until other thread has processed the event. + until @event_processor.current_batch.length != 4; end expect(@event_dispatcher).not_to have_received(:dispatch_event) - # Stop should flush the queue! @event_processor.stop! - sleep 0.75 expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.') + expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!') expect(@event_dispatcher).to have_received(:dispatch_event).with( Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) ) - - expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!') end end From 1a3a7d1d83a219e1cfda0b9b98b4382630fb9bbc Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Mon, 21 Oct 2019 12:30:43 +0500 Subject: [PATCH 2/6] fix http test 83 --- spec/config_manager/http_project_config_manager_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/config_manager/http_project_config_manager_spec.rb b/spec/config_manager/http_project_config_manager_spec.rb index 5c02c4cd..6c7659ba 100644 --- a/spec/config_manager/http_project_config_manager_spec.rb +++ b/spec/config_manager/http_project_config_manager_spec.rb @@ -178,7 +178,7 @@ expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Fetching datafile from https://cdn.optimizely.com/datafiles/QBw9gFM8oTn7ogY9ANCC1z.json').once expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received new datafile and updated config. ' \ - 'Old revision number: 42. New revision number: 81.').once + 'Old revision number: 42. New revision number: 83.').once # Asserts that config has updated from URL. expect(http_project_config_manager.config.account_id).not_to eql(datafile_project_config.account_id) @@ -234,7 +234,7 @@ expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Fetching datafile from https://cdn.optimizely.com/datafiles/QBw9gFM8oTn7ogY9ANCC1z.json').at_least(2) expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received new datafile and updated config. ' \ - 'Old revision number: 42. New revision number: 81.').once + 'Old revision number: 42. New revision number: 83.').once expect(http_project_config_manager.config.account_id).not_to eql(datafile_project_config.account_id) end From 3e49f9882bd717e3ebc1cc31eb8d808a9a875dc7 Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Mon, 21 Oct 2019 17:40:07 +0500 Subject: [PATCH 3/6] Refactoring Done --- lib/optimizely/event/batch_event_processor.rb | 24 +- spec/event/batch_event_processor_spec.rb | 543 ++++++++++-------- 2 files changed, 301 insertions(+), 266 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 99e7d0e7..e9e214a8 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -20,7 +20,7 @@ module Optimizely class BatchEventProcessor < EventProcessor # BatchEventProcessor is a batched implementation of the Interface EventProcessor. - # Events passed to the BatchEventProcessor are immediately added to a EventQueue. + # Events passed to the BatchEventProcessor are immediately added to an EventQueue. # The BatchEventProcessor maintains a single consumer thread that pulls events off of # the BlockingQueue and buffers them for either a configured batch size or for a # maximum duration before the resulting LogEvent is sent to the NotificationCenter. @@ -30,6 +30,7 @@ class BatchEventProcessor < EventProcessor DEFAULT_BATCH_SIZE = 10 DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds DEFAULT_QUEUE_CAPACITY = 1000 + DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds FLUSH_SIGNAL = 'FLUSH_SIGNAL' SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL' @@ -69,6 +70,7 @@ def start! return end @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval + @logger.log(Logger::INFO, 'Starting scheduler.') @thread = Thread.new { run } @started = true end @@ -86,7 +88,7 @@ def process(user_event) end begin - @event_queue << user_event + @event_queue.push(user_event, true) rescue Exception @logger.log(Logger::WARN, 'Payload not accepted by the queue.') return @@ -96,9 +98,9 @@ def process(user_event) def stop! return unless @started - @logger.log(Logger::WARN, 'Stopping scheduler.') + @logger.log(Logger::INFO, 'Stopping scheduler.') @event_queue << SHUTDOWN_SIGNAL - @thread.join + @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false end @@ -106,13 +108,7 @@ def stop! def run loop do - if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline - @logger.log( - Logger::DEBUG, - 'Deadline exceeded flushing current batch.' - ) - flush_queue! - end + flush_queue! if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline item = @event_queue.pop if @event_queue.length.positive? @@ -122,7 +118,7 @@ def run end if item == SHUTDOWN_SIGNAL - @logger.log(Logger::INFO, 'Received shutdown signal.') + @logger.log(Logger::DEBUG, 'Received shutdown signal.') break end @@ -135,7 +131,7 @@ def run add_to_batch(item) if item.is_a? Optimizely::UserEvent end rescue SignalException - @logger.log(Logger::INFO, 'Interrupted while processing buffer.') + @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') rescue Exception => e @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") ensure @@ -171,7 +167,7 @@ def add_to_batch(user_event) # Reset the deadline if starting a new batch. @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty? - @logger.log(Logger::DEBUG, "Adding user event: #{user_event} to batch.") + @logger.log(Logger::DEBUG, "Adding user events: #{user_event} to batch.") @current_batch << user_event return unless @current_batch.length >= @batch_size diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index b9a5043d..aa8e4e96 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -25,17 +25,11 @@ require 'optimizely/helpers/validator' require 'optimizely/logger' describe Optimizely::BatchEventProcessor do - WebMock.allow_net_connect! let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON } let(:error_handler) { Optimizely::NoOpErrorHandler.new } let(:spy_logger) { spy('logger') } let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) } let(:event) { project_config.get_event_from_key('test_event') } - let(:log_url) { 'https://logx.optimizely.com/v1/events' } - let(:post_headers) { {'Content-Type' => 'application/json'} } - - MAX_BATCH_SIZE = 10 - MAX_DURATION_MS = 1000 before(:example) do @event_queue = SizedQueue.new(100) @@ -46,264 +40,273 @@ end after(:example) do - @event_processor.stop! - @event_queue.clear + @event_processor.stop! if @event_processor.instance_of? Optimizely::BatchEventProcessor end - # it 'should log waring when service is already started' do - # @event_processor.start! - # expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once - # end - - # it 'return return empty event queue and dispatch log event when event is processed' do - # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - # @event_processor.process(conversion_event) - # sleep 1.5 - - # expect(@event_processor.event_queue.length).to eq(0) - # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - # end - - # it 'should flush the current batch when deadline exceeded' do - # user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # logger = spy('logger') - # event_processor = Optimizely::BatchEventProcessor.new( - # event_queue: @event_queue, - # event_dispatcher: @event_dispatcher, - # batch_size: MAX_BATCH_SIZE, - # flush_interval: MAX_DURATION_MS * 3, - # logger: logger - # ) - # sleep 0.025 - # event_processor.process(user_event) - - # sleep 1 - # expect(event_processor.event_queue.length).to eq(0) - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - # end - - # it 'should flush the current batch when max batch size' do - # allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) - # expected_batch = [] - # counter = 0 - # until counter >= 11 - # event['key'] = event['key'] + counter.to_s - # user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # expected_batch << user_event - # @event_processor.process(user_event) - # counter += 1 - # end - - # sleep 1 - - # expected_batch.pop # Removes 11th element - # expect(@event_processor.current_batch.size).to be 10 - - # expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).twice - # expect(@event_dispatcher).to have_received(:dispatch_event).with( - # Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) - # ).twice - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once - # end - - # it 'should dispatch the event when flush is called' do - # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - # event_processor = Optimizely::BatchEventProcessor.new( - # event_queue: @event_queue, - # event_dispatcher: @event_dispatcher, - # batch_size: MAX_BATCH_SIZE, - # flush_interval: MAX_DURATION_MS / 2, - # logger: spy_logger - # ) - - # event_processor.process(conversion_event) - # event_processor.flush - # sleep 1.5 - - # event_processor.process(conversion_event) - # event_processor.flush - # sleep 1.5 - - # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice - - # expect(event_processor.event_queue.length).to eq(0) - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received flush signal.').twice - # end - - # it 'should flush on mismatch revision' do - # allow(project_config).to receive(:revision).and_return('1', '2') - # user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) - - # expect(user_event1.event_context[:revision]).to eq('1') - # @event_processor.process(user_event1) - - # expect(user_event2.event_context[:revision]).to eq('2') - # @event_processor.process(user_event2) - - # sleep 0.25 - - # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once - # expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - # end - - # it 'should flush on mismatch project id' do - # allow(project_config).to receive(:project_id).and_return('X', 'Y') - # user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) - - # expect(user_event1.event_context[:project_id]).to eq('X') - # @event_processor.process(user_event1) - - # expect(user_event2.event_context[:project_id]).to eq('Y') - # @event_processor.process(user_event2) - - # sleep 0.25 - - # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once - # expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - # end - - # it 'should process and halt event when start or stop are called' do - # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - # @event_processor.process(conversion_event) - # sleep 1.75 - # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - - # @event_processor.stop! - # @event_processor.process(conversion_event) - # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - # @event_processor.start! - # @event_processor.stop! - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.').at_least(1).times - # end - - # it 'should not dispatch event when close is called during process' do - # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - - # @event_processor.process(conversion_event) - # @event_processor.stop! - # expect(@event_dispatcher).not_to have_received(:dispatch_event) - # end - - # it 'should set default batch size when provided invalid' do - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) - # expect(event_processor.batch_size).to eq(10) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 'test', logger: spy_logger) - # expect(event_processor.batch_size).to eq(10) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: [], logger: spy_logger) - # expect(event_processor.batch_size).to eq(10) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 0, logger: spy_logger) - # expect(event_processor.batch_size).to eq(10) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: -5, logger: spy_logger) - # expect(event_processor.batch_size).to eq(10) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5.5, logger: spy_logger) - # expect(event_processor.batch_size).to eq(10) - # event_processor.stop! - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default batch_size: 10.').exactly(5).times - # end - - # it 'should set batch size when provided valid' do - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5) - # expect(event_processor.batch_size).to eq(5) - # event_processor.stop! - # end - - # it 'should set default flush interval when provided invalid' do - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) - # expect(event_processor.flush_interval).to eq(30_000) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger) - # expect(event_processor.flush_interval).to eq(30_000) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger) - # expect(event_processor.flush_interval).to eq(30_000) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger) - # expect(event_processor.flush_interval).to eq(30_000) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger) - # expect(event_processor.flush_interval).to eq(30_000) - # event_processor.stop! - # expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times - # end - - # it 'should set flush interval when provided valid' do - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000) - # expect(event_processor.flush_interval).to eq(2000) - # event_processor.stop! - # event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000.5) - # expect(event_processor.flush_interval).to eq(2000.5) - # event_processor.stop! - # end - - # it 'should send log event notification when event is dispatched' do - # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - # @event_processor.process(conversion_event) - # sleep 1.5 - - # expect(@notification_center).to have_received(:send_notifications).with( - # Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], - # log_event - # ).once - - # expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - # end - - # it 'should log an error when dispatch event raises timeout exception' do - # conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - # log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - # allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) - - # timeout_error = Timeout::Error.new - # allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) - - # @event_processor.process(conversion_event) - # sleep 1.5 - - # expect(@notification_center).not_to have_received(:send_notifications) - - # expect(spy_logger).to have_received(:log).once.with( - # Logger::ERROR, - # "Error dispatching event: #{log_event} Timeout::Error." - # ) - # end + it 'should log waring when service is already started' do + @event_processor = Optimizely::BatchEventProcessor.new(logger: spy_logger) + @event_processor.start! + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once + end - it 'should flush pending events when stop is called' do - experiment = project_config.get_experiment_from_key('test_experiment') - impression_event = Optimizely::UserEventFactory.create_impression_event(project_config, experiment, '111128', 'test_user', nil) - log_event = Optimizely::EventFactory.create_log_event(impression_event, spy_logger) + it 'should flush the current batch when flush deadline exceeded' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + flush_interval: 100, + logger: spy_logger, + notification_center: @notification_center + ) + + @event_processor.process(conversion_event) + # flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched. + sleep 0.3 + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + expect(@notification_center).to have_received(:send_notifications).with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event + ).once + end + + it 'should flush the current batch when max batch size met' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + batch_size: 11, + flush_interval: 100_000, + logger: spy_logger + ) + + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(user_event, spy_logger) allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args).and_return(log_event) + expected_batch = [] + 11.times do + expected_batch << user_event + @event_processor.process(user_event) + end + + # Wait until other thread has processed the event. + until @event_processor.event_queue.empty?; end + until @event_processor.current_batch.empty?; end + + expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once + expect(@event_dispatcher).to have_received(:dispatch_event).with( + Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) + ).once + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once + end + + it 'should dispatch the event when flush is called' do + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + @event_processor = Optimizely::BatchEventProcessor.new( event_queue: @event_queue, event_dispatcher: @event_dispatcher, - batch_size: 5, - flush_interval: 10_000, + flush_interval: 100_000, + logger: spy_logger + ) + + @event_processor.process(conversion_event) + @event_processor.flush + + @event_processor.process(conversion_event) + @event_processor.flush + + # Wait until other thread has processed the event. + until @event_processor.event_queue.empty?; end + until @event_processor.current_batch.empty?; end + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice + expect(@event_processor.event_queue.length).to eq(0) + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received flush signal.').twice + end + + it 'should flush on mismatch revision' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, logger: spy_logger, notification_center: @notification_center ) + allow(project_config).to receive(:revision).and_return('1', '2') + user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) + + expect(user_event1.event_context[:revision]).to eq('1') + @event_processor.process(user_event1) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end + + expect(user_event2.event_context[:revision]).to eq('2') + @event_processor.process(user_event2) + @event_processor.process(user_event2) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 2; end + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once + expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + end + + it 'should flush on mismatch project id' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + logger: spy_logger, + notification_center: @notification_center + ) + + allow(project_config).to receive(:project_id).and_return('X', 'Y') + user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(user_event1, spy_logger) + + expect(user_event1.event_context[:project_id]).to eq('X') + @event_processor.process(user_event1) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end + + expect(user_event2.event_context[:project_id]).to eq('Y') + @event_processor.process(user_event2) + @event_processor.process(user_event2) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 2; end + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once + expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + end + + it 'should set default batch size when provided invalid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 'test', logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: [], logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 0, logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: -5, logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5.5, logger: spy_logger) + expect(event_processor.batch_size).to eq(10) + event_processor.stop! + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default batch_size: 10.').exactly(5).times + end + + it 'should set batch size when provided valid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, batch_size: 5) + expect(event_processor.batch_size).to eq(5) + event_processor.stop! + end + + it 'should set default flush interval when provided invalid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 'test', logger: spy_logger) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: [], logger: spy_logger) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0, logger: spy_logger) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: -5, logger: spy_logger) + expect(event_processor.flush_interval).to eq(30_000) + event_processor.stop! + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Setting to default flush_interval: 30000 ms.').exactly(4).times + end + + it 'should set flush interval when provided valid' do + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000) + expect(event_processor.flush_interval).to eq(2000) + event_processor.stop! + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0.5) + expect(event_processor.flush_interval).to eq(0.5) + event_processor.stop! + end + + it 'should send log event notification when event is dispatched' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + logger: spy_logger, + notification_center: @notification_center + ) + + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + + @event_processor.process(conversion_event) + + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end + @event_processor.flush + # Wait until other thread has processed the event. + until @event_processor.current_batch.empty?; end + + expect(@notification_center).to have_received(:send_notifications).with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event + ).once + + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + end + + it 'should log an error when dispatch event raises timeout exception' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + logger: spy_logger, + notification_center: @notification_center + ) + + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) + + timeout_error = Timeout::Error.new + allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) + + @event_processor.process(conversion_event) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end + @event_processor.flush + # Wait until other thread has processed the event. + until @event_processor.current_batch.empty?; end + + expect(@notification_center).not_to have_received(:send_notifications) + expect(spy_logger).to have_received(:log).once.with( + Logger::ERROR, + "Error dispatching event: #{log_event} Timeout::Error." + ) + end + + it 'should flush pending events when stop is called' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + batch_size: 5, + flush_interval: 10_000, + logger: spy_logger + ) + + experiment = project_config.get_experiment_from_key('test_experiment') + impression_event = Optimizely::UserEventFactory.create_impression_event(project_config, experiment, '111128', 'test_user', nil) + log_event = Optimizely::EventFactory.create_log_event(impression_event, spy_logger) + + allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args).and_return(log_event) + expected_batch = [] 4.times do user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) @@ -312,7 +315,7 @@ end # Wait until other thread has processed the event. - until @event_processor.current_batch.length != 4; end + while @event_processor.current_batch.length != 4; end expect(@event_dispatcher).not_to have_received(:dispatch_event) @event_processor.stop! @@ -323,4 +326,40 @@ Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) ) end + + it 'should log a warning when Queue gets full' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_queue: SizedQueue.new(10), + event_dispatcher: @event_dispatcher, + batch_size: 100, + flush_interval: 100_000, + logger: spy_logger + ) + + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + 11.times do + @event_processor.process(user_event) + end + + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 10; end + expect(@event_dispatcher).not_to have_received(:dispatch_event) + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue.').once + end + + it 'should not process and log when Executor is not running' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + batch_size: 100, + flush_interval: 100_000, + logger: spy_logger + ) + + @event_processor.stop! + + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + @event_processor.process(user_event) + expect(@event_processor.event_queue.length).to eq(0) + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Executor shutdown, not accepting tasks.').once + end end From a7ecb716891381148546897aadb6152ef41fabaf Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Mon, 21 Oct 2019 19:46:26 +0500 Subject: [PATCH 4/6] nit --- lib/optimizely/event/batch_event_processor.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index e9e214a8..b817e6d0 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -167,7 +167,7 @@ def add_to_batch(user_event) # Reset the deadline if starting a new batch. @flushing_interval_deadline = (Helpers::DateTimeUtils.create_timestamp + @flush_interval) if @current_batch.empty? - @logger.log(Logger::DEBUG, "Adding user events: #{user_event} to batch.") + @logger.log(Logger::DEBUG, "Adding user event: #{user_event} to batch.") @current_batch << user_event return unless @current_batch.length >= @batch_size From 2cd808b2d627d7784054e1b3998ced056f3f217f Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Tue, 22 Oct 2019 11:53:55 +0500 Subject: [PATCH 5/6] Revert revision number due to merge --- spec/config_manager/http_project_config_manager_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/config_manager/http_project_config_manager_spec.rb b/spec/config_manager/http_project_config_manager_spec.rb index 677ec340..e82b5869 100644 --- a/spec/config_manager/http_project_config_manager_spec.rb +++ b/spec/config_manager/http_project_config_manager_spec.rb @@ -225,7 +225,7 @@ expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Fetching datafile from https://cdn.optimizely.com/datafiles/valid_sdk_key.json').once expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received new datafile and updated config. ' \ - 'Old revision number: 42. New revision number: 83.').once + 'Old revision number: 42. New revision number: 81.').once # Asserts that config has updated from URL. expect(@http_project_config_manager.config.account_id).not_to eql(datafile_project_config.account_id) @@ -277,7 +277,7 @@ expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Fetching datafile from https://cdn.optimizely.com/datafiles/valid_sdk_key.json').at_least(2) expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received new datafile and updated config. ' \ - 'Old revision number: 42. New revision number: 83.').once + 'Old revision number: 42. New revision number: 81.').once expect(@http_project_config_manager.config.account_id).not_to eql(datafile_project_config.account_id) end From 6f4467895686d2e0d92920ca60c7949e0f108c9d Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Tue, 29 Oct 2019 12:14:16 +0500 Subject: [PATCH 6/6] Added log --- lib/optimizely/event/batch_event_processor.rb | 7 ++++++- spec/event/batch_event_processor_spec.rb | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index b817e6d0..975531b8 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -147,6 +147,11 @@ def flush_queue! log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) begin + @logger.log( + Logger::INFO, + 'Flushing Queue.' + ) + @event_dispatcher.dispatch_event(log_event) @notification_center&.send_notifications( NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], @@ -171,7 +176,7 @@ def add_to_batch(user_event) @current_batch << user_event return unless @current_batch.length >= @batch_size - @logger.log(Logger::DEBUG, 'Flushing on max batch size!') + @logger.log(Logger::DEBUG, 'Flushing on max batch size.') flush_queue! end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index aa8e4e96..8af18a5d 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -69,6 +69,7 @@ Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], log_event ).once + expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Flushing Queue.').once end it 'should flush the current batch when max batch size met' do @@ -98,7 +99,7 @@ expect(@event_dispatcher).to have_received(:dispatch_event).with( Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) ).once - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size.').once end it 'should dispatch the event when flush is called' do