-
Notifications
You must be signed in to change notification settings - Fork 28
refact: Batch Event Processor and Unit Tests #215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b25ec6f
34820c1
1a3a7d1
3e49f98
a7ecb71
c38d76e
2cd808b
6f44678
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
module Optimizely | ||
class BatchEventProcessor < EventProcessor | ||
# BatchEventProcessor is a batched implementation of the Interface EventProcessor. | ||
# Events passed to the BatchEventProcessor are immediately added to a EventQueue. | ||
# Events passed to the BatchEventProcessor are immediately added to an EventQueue. | ||
# The BatchEventProcessor maintains a single consumer thread that pulls events off of | ||
# the BlockingQueue and buffers them for either a configured batch size or for a | ||
# maximum duration before the resulting LogEvent is sent to the NotificationCenter. | ||
|
@@ -30,6 +30,7 @@ class BatchEventProcessor < EventProcessor | |
DEFAULT_BATCH_SIZE = 10 | ||
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds | ||
DEFAULT_QUEUE_CAPACITY = 1000 | ||
DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds | ||
|
||
FLUSH_SIGNAL = 'FLUSH_SIGNAL' | ||
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL' | ||
|
@@ -58,8 +59,6 @@ def initialize( | |
DEFAULT_BATCH_INTERVAL | ||
end | ||
@notification_center = notification_center | ||
@mutex = Mutex.new | ||
@received = ConditionVariable.new | ||
@current_batch = [] | ||
@started = false | ||
start! | ||
|
@@ -71,15 +70,13 @@ def start! | |
return | ||
end | ||
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval | ||
@logger.log(Logger::INFO, 'Starting scheduler.') | ||
@thread = Thread.new { run } | ||
@started = true | ||
end | ||
|
||
def flush | ||
@mutex.synchronize do | ||
@event_queue << FLUSH_SIGNAL | ||
@received.signal | ||
end | ||
@event_queue << FLUSH_SIGNAL | ||
end | ||
|
||
def process(user_event) | ||
|
@@ -90,56 +87,38 @@ def process(user_event) | |
return | ||
end | ||
|
||
@mutex.synchronize do | ||
begin | ||
@event_queue << user_event | ||
@received.signal | ||
rescue Exception | ||
@logger.log(Logger::WARN, 'Payload not accepted by the queue.') | ||
return | ||
end | ||
begin | ||
@event_queue.push(user_event, true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Setting 2nd arg to true so that the main thread doesn't block and an exception is raised if the Queue is full. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you please make sure in other SDKs, we are also getting queue size, but here we are just setting default queue size. Can you please add attribute to support that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are already allowing user to pass it's own queue. Like we do in spec file, send our own SizedQueue with different size. |
||
rescue Exception | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not be rescuing Exception. That will prevent the ruby process from safely exiting on |
||
@logger.log(Logger::WARN, 'Payload not accepted by the queue.') | ||
return | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This return redundant. |
||
end | ||
end | ||
|
||
def stop! | ||
return unless @started | ||
|
||
@mutex.synchronize do | ||
@event_queue << SHUTDOWN_SIGNAL | ||
@received.signal | ||
end | ||
|
||
@logger.log(Logger::INFO, 'Stopping scheduler.') | ||
@event_queue << SHUTDOWN_SIGNAL | ||
@thread.join(DEFAULT_TIMEOUT_INTERVAL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @thread.exit would kill the inner thread and we may miss out events in the buffer. .join would instead wait for the thread to complete up to the given timeout. |
||
@started = false | ||
@logger.log(Logger::WARN, 'Stopping scheduler.') | ||
@thread.exit | ||
end | ||
|
||
private | ||
|
||
def run | ||
loop do | ||
if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline | ||
@logger.log( | ||
Logger::DEBUG, | ||
'Deadline exceeded flushing current batch.' | ||
) | ||
flush_queue! | ||
end | ||
|
||
item = nil | ||
flush_queue! if Helpers::DateTimeUtils.create_timestamp > @flushing_interval_deadline | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you pleae add logs in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
@mutex.synchronize do | ||
@received.wait(@mutex, 0.05) | ||
item = @event_queue.pop if @event_queue.length.positive? | ||
end | ||
item = @event_queue.pop if @event_queue.length.positive? | ||
|
||
if item.nil? | ||
sleep(0.05) | ||
next | ||
end | ||
|
||
if item == SHUTDOWN_SIGNAL | ||
@logger.log(Logger::INFO, 'Received shutdown signal.') | ||
@logger.log(Logger::DEBUG, 'Received shutdown signal.') | ||
break | ||
end | ||
|
||
|
@@ -152,7 +131,7 @@ def run | |
add_to_batch(item) if item.is_a? Optimizely::UserEvent | ||
end | ||
rescue 6D40 SignalException | ||
@logger.log(Logger::INFO, 'Interrupted while processing buffer.') | ||
@logger.log(Logger::ERROR, 'Interrupted while processing buffer.') | ||
rescue Exception => e | ||
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") | ||
ensure | ||
|
@@ -168,6 +147,11 @@ def flush_queue! | |
|
||
log_event = Optimizely::EventFactory.create_log_event(@current_batch, @logger) | ||
begin | ||
@logger.log( | ||
Logger::INFO, | ||
'Flushing Queue.' | ||
) | ||
|
||
@event_dispatcher.dispatch_event(log_event) | ||
@notification_center&.send_notifications( | ||
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], | ||
|
@@ -192,7 +176,7 @@ def add_to_batch(user_event) | |
@current_batch << user_event | ||
return unless @current_batch.length >= @batch_size | ||
|
||
@logger.log(Logger::DEBUG, 'Flushing on max batch size!') | ||
@logger.log(Logger::DEBUG, 'Flushing on max batch size.') | ||
flush_queue! | ||
end | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The begin/end wrapper is unnecessary.