-
Notifications
You must be signed in to change notification settings - Fork 32
feat: Added ODPEventManager implementation #487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
089ff22
432db4d
6d3bb18
b7883eb
8caa93e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/** | ||
* | ||
* Copyright 2022, Optimizely | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.optimizely.ab.odp; | ||
|
||
import com.optimizely.ab.event.internal.BuildVersionInfo; | ||
import com.optimizely.ab.event.internal.ClientEngineInfo; | ||
import com.optimizely.ab.odp.serializer.ODPJsonSerializerFactory; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.*; | ||
import java.util.concurrent.*; | ||
|
||
public class ODPEventManager { | ||
private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class); | ||
private static final int DEFAULT_BATCH_SIZE = 10; | ||
private static final int DEFAULT_QUEUE_SIZE = 10000; | ||
private static final int FLUSH_INTERVAL = 1000; | ||
private static final int MAX_RETRIES = 3; | ||
private static final String EVENT_URL_PATH = "/v3/events"; | ||
|
||
private int queueSize = DEFAULT_QUEUE_SIZE; | ||
private int batchSize = DEFAULT_BATCH_SIZE; | ||
|
||
private Boolean isRunning = false; | ||
private ODPConfig odpConfig; | ||
private EventDispatcherThread eventDispatcherThread; | ||
|
||
private final ODPApiManager apiManager; | ||
private final List<ODPEvent> currentBatch = new ArrayList<>(); | ||
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thread-safety for these - currentBatch and eventQueue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment about thready-safety requirement will be helpful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) { | ||
this.apiManager = apiManager; | ||
this.odpConfig = odpConfig; | ||
} | ||
|
||
public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, int batchSize, int queueSize) { | ||
this(odpConfig, apiManager); | ||
this.batchSize = batchSize; | ||
this.queueSize = queueSize; | ||
} | ||
|
||
public void start() { | ||
isRunning = true; | ||
eventDispatcherThread = new EventDispatcherThread(); | ||
eventDispatcherThread.start(); | ||
} | ||
|
||
public void updateSettings(ODPConfig odpConfig) { | ||
this.odpConfig = odpConfig; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this replacement thread-safe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made |
||
} | ||
|
||
public void sendEvents(List<ODPEvent> events) { | ||
for (ODPEvent event: events) { | ||
sendEvent(event); | ||
} | ||
} | ||
|
||
public void sendEvent(ODPEvent event) { | ||
event.setData(augmentCommonData(event.getData())); | ||
processEvent(event); | ||
} | ||
|
||
private Map<String, Object> augmentCommonData(Map<String, Object> sourceData) { | ||
Map<String, Object> data = new HashMap<>(); | ||
data.put("idempotence_id", UUID.randomUUID().toString()); | ||
data.put("data_source_type", "sdk"); | ||
data.put("data_source", ClientEngineInfo.getClientEngine().getClientEngineValue()); | ||
data.put("data_source_version", BuildVersionInfo.getClientVersion()); | ||
data.putAll(sourceData); | ||
return data; | ||
} | ||
|
||
private void processEvent(ODPEvent event) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can consider filtering out silently events when apiKey/host is null. I see you discards them when flushing below. They don't need to be queued and discarded if ODP is not integrated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! |
||
if (!isRunning) { | ||
logger.warn("Failed to Process ODP Event. ODPEventManager is not running"); | ||
return; | ||
} | ||
|
||
if (eventQueue.size() >= queueSize) { | ||
logger.warn("Failed to Process ODP Event. Event Queue full. queueSize = " + queueSize); | ||
return; | ||
} | ||
|
||
if (!eventQueue.offer(event)) { | ||
logger.warn("Failed to Process ODP Event. Event Queue is not accepting any more events"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what case, event is discarded here? It should be logger.error in this case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The usual case would be when queue is at capacity but we are already checking that before this. This check is just there because |
||
} | ||
} | ||
|
||
public void stop() { | ||
logger.debug("Sending stop signal to ODP Event Dispatcher Thread"); | ||
eventDispatcherThread.signalStop(); | ||
} | ||
|
||
private class EventDispatcherThread extends Thread { | ||
|
||
private volatile boolean shouldStop = false; | ||
|
||
@Override | ||
public void run() { | ||
while (true) { | ||
try { | ||
ODPEvent nextEvent = eventQueue.poll(FLUSH_INTERVAL, TimeUnit.MILLISECONDS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we use this fixed interval between events (instead of overrall interval between flush), the max gap between flushes can be increased up to (batchSize * FLUSH_INTERVAL). Is this what we have for logx event batching? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed this whole thing. Made this timeout dynamic based on how much time elapsed since last flush. |
||
if (nextEvent == null) { | ||
// null means no new events received and flush interval is over, dispatch whatever is in the batch. | ||
if (!currentBatch.isEmpty()) { | ||
flush(); | ||
} | ||
if (shouldStop) { | ||
break; | ||
} | ||
continue; | ||
} | ||
if (currentBatch.size() >= batchSize) { | ||
flush(); | ||
} | ||
currentBatch.add(nextEvent); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
|
||
logger.warn("Exiting ODP Event Dispatcher Thread"); | ||
} | ||
|
||
private void flush() { | ||
if (odpConfig.isReady()) { | ||
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); | ||
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; | ||
Integer statusCode; | ||
int numAttempts = 0; | ||
do { | ||
statusCode = apiManager.sendEvents(odpConfig.getApiKey(), endpoint, payload); | ||
numAttempts ++; | ||
} while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500)); | ||
} else { | ||
logger.warn("ODPConfig not ready, discarding event batch"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be debug level? When ODP is not integrated, we do not want dump many warning messages. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
currentBatch.clear(); | ||
} | ||
|
||
public void signalStop() { | ||
shouldStop = true; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make the default interval to 10secs for all SDKs consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!