diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 62f43045..975531b8 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -20,7 +20,7 @@ module Optimizely class BatchEventProcessor < EventProcessor # BatchEventProcessor is a batched implementation of the Interface EventProcessor. - # Events passed to the BatchEventProcessor are immediately added to a EventQueue. + # Events passed to the BatchEventProcessor are immediately added to an EventQueue. # The BatchEventProcessor maintains a single consumer thread that pulls events off of # the BlockingQueue and buffers them for either a configured batch size or for a # maximum duration before the resulting LogEvent is sent to the NotificationCenter. @@ -30,6 +30,7 @@ class BatchEventProcessor < EventProcessor DEFAULT_BATCH_SIZE = 10 DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds DEFAULT_QUEUE_CAPACITY = 1000 + DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds FLUSH_SIGNAL = 'FLUSH_SIGNAL' SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL' @@ -58,8 +59,6 @@ def initialize( DEFAULT_BATCH_INTERVAL end @notification_center = notification_center - @mutex = Mutex.new - @received = ConditionVariable.new @current_batch = [] @started = false start! @@ -71,15 +70,13 @@ def start! return end @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval + @logger.log(Logger::INFO, 'Starting scheduler.') @thread = Thread.new { run } @started = true end def flush - @mutex.synchronize do - @event_queue << FLUSH_SIGNAL - @received.signal - end + @event_queue << FLUSH_SIGNAL end def process(user_event) @@ -90,48 +87,30 @@ def process(user_event) return end - @mutex.synchronize do - begin - @event_queue << user_event - @received.signal - rescue Exception - @logger.log(Logger::WARN, 'Payload not accepted by the queue.') - return - end + begin + @event_queue.push(user_event, true) + rescue Exception + @logger.log(Logger::WARN, 'Payload not accepted by the queue.') + return end end def stop! return unless @started - @mutex.synchronize do - @event_queue << SHUTDOWN_SIGNAL - @received.signal - end - + @logger.log(Logger::INFO, 'Stopping scheduler.') + @event_queue << SHUTDOWN_SIGNAL + @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false - @logger.log(Logger::WARN, 'Stopping scheduler.') - @thread.exit end private def run loop do - if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline - @logger.log( - Logger::DEBUG, - 'Deadline exceeded flushing current batch.' - ) - flush_queue! - end - - item = nil + flush_queue! if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline - @mutex.synchronize do - @received.wait(@mutex, 0.05) - item = @event_queue.pop if @event_queue.length.positive? - end + item = @event_queue.pop if @event_queue.length.positive? if item.nil? sleep(0.05) @@ -139,7 +118,7 @@ def run end if item == SHUTDOWN_SIGNAL - @logger.log(Logger::INFO, 'Received shutdown signal.') + @logger.log(Logger::DEBUG, 'Received shutdown signal.') break end @@ -152,7 +131,7 @@ def run add_to_batch(item) if item.is_a? Optimizely::UserEvent end rescue SignalException - @logger.log(Logger::INFO, 'Interrupted while processing buffer.') + @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') rescue Exception => e @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") ensure @@ -168,6 +147,11 @@ def flush_queue! log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) begin + @logger.log( + Logger::INFO, + 'Flushing Queue.' + ) + @event_dispatcher.dispatch_event(log_event) @notification_center&.send_notifications( NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], @@ -192,7 +176,7 @@ def add_to_batch(user_event) @current_batch << user_event return unless @current_batch.length >= @batch_size - @logger.log(Logger::DEBUG, 'Flushing on max batch size!') + @logger.log(Logger::DEBUG, 'Flushing on max batch size.') flush_queue! end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 172d528a..8af18a5d 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -25,17 +25,11 @@ require 'optimizely/helpers/validator' require 'optimizely/logger' describe Optimizely::BatchEventProcessor do - WebMock.allow_net_connect! let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON } let(:error_handler) { Optimizely::NoOpErrorHandler.new } let(:spy_logger) { spy('logger') } let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) } let(:event) { project_config.get_event_from_key('test_event') } - let(:log_url) { 'https://logx.optimizely.com/v1/events' } - let(:post_headers) { {'Content-Type' => 'application/json'} } - - MAX_BATCH_SIZE = 10 - MAX_DURATION_MS = 1000 before(:example) do @event_queue = SizedQueue.new(100) @@ -43,107 +37,104 @@ allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler) allow(@notification_center).to receive(:send_notifications) - - @event_processor = Optimizely::BatchEventProcessor.new( - event_queue: @event_queue, - event_dispatcher: @event_dispatcher, - batch_size: MAX_BATCH_SIZE, - flush_interval: MAX_DURATION_MS, - logger: spy_logger, - notification_center: @notification_center - ) end after(:example) do - @event_processor.stop! - @event_queue.clear + @event_processor.stop! if @event_processor.instance_of? Optimizely::BatchEventProcessor end it 'should log waring when service is already started' do + @event_processor = Optimizely::BatchEventProcessor.new(logger: spy_logger) @event_processor.start! expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once end - it 'return return empty event queue and dispatch log event when event is processed' do + it 'should flush the current batch when flush deadline exceeded' do conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + flush_interval: 100, + logger: spy_logger, + notification_center: @notification_center + ) + @event_processor.process(conversion_event) - sleep 1.5 + # flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched. + sleep 0.3 - expect(@event_processor.event_queue.length).to eq(0) expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once + expect(@notification_center).to have_received(:send_notifications).with( + Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event + ).once + expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Flushing Queue.').once end - it 'should flush the current batch when deadline exceeded' do - user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - logger = spy('logger') - event_processor = Optimizely::BatchEventProcessor.new( - event_queue: @event_queue, + it 'should flush the current batch when max batch size met' do + @event_processor = Optimizely::BatchEventProcessor.new( event_dispatcher: @event_dispatcher, - batch_size: MAX_BATCH_SIZE, - flush_interval: MAX_DURATION_MS * 3, - logger: logger + batch_size: 11, + flush_interval: 100_000, + logger: spy_logger ) - sleep 0.025 - event_processor.process(user_event) - sleep 1 - expect(event_processor.event_queue.length).to eq(0) - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - end + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + log_event = Optimizely::EventFactory.create_log_event(user_event, spy_logger) + + allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args).and_return(log_event) - it 'should flush the current batch when max batch size' do - allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) expected_batch = [] - counter = 0 - until counter >= 11 - event['key'] = event['key'] + counter.to_s - user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + 11.times do expected_batch << user_event @event_processor.process(user_event) - counter += 1 end - sleep 1 - - expected_batch.pop # Removes 11th element - expect(@event_processor.current_batch.size).to be 10 + # Wait until other thread has processed the event. + until @event_processor.event_queue.empty?; end + until @event_processor.current_batch.empty?; end - expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).twice + expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once expect(@event_dispatcher).to have_received(:dispatch_event).with( Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) - ).twice - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!').once + ).once + expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size.').once end it 'should dispatch the event when flush is called' do conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - event_processor = Optimizely::BatchEventProcessor.new( + @event_processor = Optimizely::BatchEventProcessor.new( event_queue: @event_queue, event_dispatcher: @event_dispatcher, - batch_size: MAX_BATCH_SIZE, - flush_interval: MAX_DURATION_MS / 2, + flush_interval: 100_000, logger: spy_logger ) - event_processor.process(conversion_event) - event_processor.flush - sleep 1.5 + @event_processor.process(conversion_event) + @event_processor.flush - event_processor.process(conversion_event) - event_processor.flush - sleep 1.5 + @event_processor.process(conversion_event) + @event_processor.flush - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice + # Wait until other thread has processed the event. + until @event_processor.event_queue.empty?; end + until @event_processor.current_batch.empty?; end - expect(event_processor.event_queue.length).to eq(0) + expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice + expect(@event_processor.event_queue.length).to eq(0) expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Received flush signal.').twice end it 'should flush on mismatch revision' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + logger: spy_logger, + notification_center: @notification_center + ) + allow(project_config).to receive(:revision).and_return('1', '2') user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) @@ -151,19 +142,27 @@ expect(user_event1.event_context[:revision]).to eq('1') @event_processor.process(user_event1) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end expect(user_event2.event_context[:revision]).to eq('2') @event_processor.process(user_event2) - - sleep 0.25 + @event_processor.process(user_event2) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 2; end expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') end it 'should flush on mismatch project id' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + logger: spy_logger, + notification_center: @notification_center + ) + allow(project_config).to receive(:project_id).and_return('X', 'Y') user_event1 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) user_event2 = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) @@ -171,42 +170,20 @@ expect(user_event1.event_context[:project_id]).to eq('X') @event_processor.process(user_event1) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end expect(user_event2.event_context[:project_id]).to eq('Y') @event_processor.process(user_event2) - - sleep 0.25 + @event_processor.process(user_event2) + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 2; end expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.') end - it 'should process and halt event when start or stop are called' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) - - @event_processor.process(conversion_event) - sleep 1.75 - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - - @event_processor.stop! - @event_processor.process(conversion_event) - expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once - @event_processor.start! - @event_processor.stop! - expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Deadline exceeded flushing current batch.').at_least(1).times - end - - it 'should not dispatch event when close is called during process' do - conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - - @event_processor.process(conversion_event) - @event_processor.stop! - expect(@event_dispatcher).not_to have_received(:dispatch_event) - end - it 'should set default batch size when provided invalid' do event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher) expect(event_processor.batch_size).to eq(10) @@ -258,17 +235,28 @@ event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000) expect(event_processor.flush_interval).to eq(2000) event_processor.stop! - event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 2000.5) - expect(event_processor.flush_interval).to eq(2000.5) + event_processor = Optimizely::BatchEventProcessor.new(event_dispatcher: @event_dispatcher, flush_interval: 0.5) + expect(event_processor.flush_interval).to eq(0.5) event_processor.stop! end it 'should send log event notification when event is dispatched' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + logger: spy_logger, + notification_center: @notification_center + ) + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) @event_processor.process(conversion_event) - sleep 1.5 + + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end + @event_processor.flush + # Wait until other thread has processed the event. + until @event_processor.current_batch.empty?; end expect(@notification_center).to have_received(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], @@ -279,6 +267,12 @@ end it 'should log an error when dispatch event raises timeout exception' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + logger: spy_logger, + notification_center: @notification_center + ) + conversion_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) log_event = Optimizely::EventFactory.create_log_event(conversion_event, spy_logger) allow(Optimizely::EventFactory).to receive(:create_log_event).and_return(log_event) @@ -287,10 +281,13 @@ allow(@event_dispatcher).to receive(:dispatch_event).and_raise(timeout_error) @event_processor.process(conversion_event) - sleep 1.5 + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 1; end + @event_processor.flush + # Wait until other thread has processed the event. + until @event_processor.current_batch.empty?; end expect(@notification_center).not_to have_received(:send_notifications) - expect(spy_logger).to have_received(:log).once.with( Logger::ERROR, "Error dispatching event: #{log_event} Timeout::Error." @@ -298,32 +295,72 @@ end it 'should flush pending events when stop is called' do - allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args) + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + batch_size: 5, + flush_interval: 10_000, + logger: spy_logger + ) + + experiment = project_config.get_experiment_from_key('test_experiment') + impression_event = Optimizely::UserEventFactory.create_impression_event(project_config, experiment, '111128', 'test_user', nil) + log_event = Optimizely::EventFactory.create_log_event(impression_event, spy_logger) + + allow(Optimizely::EventFactory).to receive(:create_log_event).with(any_args).and_return(log_event) + expected_batch = [] - counter = 0 - until counter >= 10 - event['key'] = event['key'] + counter.to_s + 4.times do user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) expected_batch << user_event @event_processor.process(user_event) - counter += 1 end - sleep 0.25 - - # max batch size not occurred and batch is not dispatched. - expect(@event_processor.current_batch.size).to be < 10 + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 4; end expect(@event_dispatcher).not_to have_received(:dispatch_event) - # Stop should flush the queue! @event_processor.stop! - sleep 0.75 expect(spy_logger).to have_received(:log).with(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.') + expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!') expect(@event_dispatcher).to have_received(:dispatch_event).with( Optimizely::EventFactory.create_log_event(expected_batch, spy_logger) ) + end - expect(spy_logger).not_to have_received(:log).with(Logger::DEBUG, 'Flushing on max batch size!') + it 'should log a warning when Queue gets full' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_queue: SizedQueue.new(10), + event_dispatcher: @event_dispatcher, + batch_size: 100, + flush_interval: 100_000, + logger: spy_logger + ) + + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + 11.times do + @event_processor.process(user_event) + end + + # Wait until other thread has processed the event. + while @event_processor.current_batch.length != 10; end + expect(@event_dispatcher).not_to have_received(:dispatch_event) + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue.').once + end + + it 'should not process and log when Executor is not running' do + @event_processor = Optimizely::BatchEventProcessor.new( + event_dispatcher: @event_dispatcher, + batch_size: 100, + flush_interval: 100_000, + logger: spy_logger + ) + + @event_processor.stop! + + user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) + @event_processor.process(user_event) + expect(@event_processor.event_queue.length).to eq(0) + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Executor shutdown, not accepting tasks.').once end end