8000 refact: Batch Event Processor and Unit Tests by oakbani · Pull Request #215 · optimizely/ruby-sdk · GitHub
[go: up one dir, main page]

Skip to content

refact: Batch Event Processor and Unit Tests #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 22 additions & 38 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
module Optimizely
class BatchEventProcessor < EventProcessor
# BatchEventProcessor is a batched implementation of the Interface EventProcessor.
# Events passed to the BatchEventProcessor are immediately added to a EventQueue.
# Events passed to the BatchEventProcessor are immediately added to an EventQueue.
# The BatchEventProcessor maintains a single consumer thread that pulls events off of
# the BlockingQueue and buffers them for either a configured batch size or for a
# maximum duration before the resulting LogEvent is sent to the NotificationCenter.
Expand All @@ -30,6 +30,7 @@ class BatchEventProcessor < EventProcessor
DEFAULT_BATCH_SIZE = 10
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
DEFAULT_QUEUE_CAPACITY = 1000
DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds

FLUSH_SIGNAL = 'FLUSH_SIGNAL'
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'
Expand Down Expand Up @@ -58,8 +59,6 @@ def initialize(
DEFAULT_BATCH_INTERVAL
end
@notification_center = notification_center
@mutex = Mutex.new
@received = ConditionVariable.new
@current_batch = []
@started = false
start!
Expand All @@ -71,15 +70,13 @@ def start!
return
end
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
@logger.log(Logger::INFO, 'Starting scheduler.')
@thread = Thread.new { run }
@started = true
end

def flush
@mutex.synchronize do
@event_queue << FLUSH_SIGNAL
@received.signal
end
@event_queue << FLUSH_SIGNAL
end

def process(user_event)
Expand All @@ -90,56 +87,38 @@ def process(user_event)
return
end

@mutex.synchronize do
begin
@event_queue << user_event
@received.signal
rescue Exception
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
return
end
begin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The begin/end wrapper is unnecessary.

@event_queue.push(user_event, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting 2nd arg to true so that the main thread doesn't block and an exception is raised if the Queue is full.
https://ruby-doc.org/core-2.3.0_preview1/SizedQueue.html#method-i-push

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please make sure in other SDKs, we are also getting queue size, but here we are just setting default queue size. Can you please add attribute to support that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already allowing user to pass it's own queue. Like we do in spec file, send our own SizedQueue with different size.

rescue Exception
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be rescuing Exception. That will prevent the ruby process from safely exiting on exit and other base Exception subclasses. (should only be rescuing StandardError, which is the default behavior when no exception class is provided.)

@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return redundant.

end
end

def stop!
return unless @started

@mutex.synchronize do
@event_queue << SHUTDOWN_SIGNAL
@received.signal
end

@logger.log(Logger::INFO, 'Stopping scheduler.')
@event_queue << SHUTDOWN_SIGNAL
@thread.join(DEFAULT_TIMEOUT_INTERVAL)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thread.exit would kill the inner thread and we may miss out events in the buffer. .join would instead wait for the thread to complete up to the given timeout.

@started = false
@logger.log(Logger::WARN, 'Stopping scheduler.')
@thread.exit
end

private

def run
loop do
if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline
@logger.log(
Logger::DEBUG,
'Deadline exceeded flushing current batch.'
)
flush_queue!
end

item = nil
flush_queue! if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you pleae add logs in flush_queue while flushing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@mutex.synchronize do
@received.wait(@mutex, 0.05)
item = @event_queue.pop if @event_queue.length.positive?
end
item = @event_queue.pop if @event_queue.length.positive?

if item.nil?
sleep(0.05)
next
end

if item == SHUTDOWN_SIGNAL
@logger.log(Logger::INFO, 'Received shutdown signal.')
@logger.log(Logger::DEBUG, 'Received shutdown signal.')
break
end

Expand All @@ -152,7 +131,7 @@ def run
add_to_batch(item) if item.is_a? Optimizely::UserEvent
end
rescue 6D40 SignalException
@logger.log(Logger::INFO, 'Interrupted while processing buffer.')
@logger.log(Logger::ERROR, 'Interrupted while processing buffer.')
rescue Exception => e
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
ensure
Expand All @@ -168,6 +147,11 @@ def flush_queue!

log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger)
begin
@logger.log(
Logger::INFO,
'Flushing Queue.'
)

@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
Expand All @@ -192,7 +176,7 @@ def add_to_batch(user_event)
@current_batch << user_event
return unless @current_batch.length >= @batch_size

@logger.log(Logger::DEBUG, 'Flushing on max batch size!')
@logger.log(Logger::DEBUG, 'Flushing on max batch size.')
flush_queue!
end

Expand Down
Loading
0