-
Notifications
You must be signed in to change notification settings - Fork 32
feat: Added ODPEventManager implementation #487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
089ff22
432db4d
6d3bb18
b7883eb
8caa93e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
/** | ||
* | ||
* Copyright 2022, Optimizely | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.optimizely.ab.odp; | ||
|
||
import com.optimizely.ab.event.internal.BuildVersionInfo; | ||
import com.optimizely.ab.event.internal.ClientEngineInfo; | ||
import com.optimizely.ab.odp.serializer.ODPJsonSerializerFactory; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import java.util.*; | ||
import java.util.concurrent.*; | ||
|
||
public class ODPEventManager { | ||
private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class); | ||
private static final int DEFAULT_BATCH_SIZE = 10; | ||
private static final int DEFAULT_QUEUE_SIZE = 10000; | ||
private static final int DEFAULT_FLUSH_INTERVAL = 1000; | ||
private static final int MAX_RETRIES = 3; | ||
private static final String EVENT_URL_PATH = "/v3/events"; | ||
|
||
private final int queueSize; | ||
private final int batchSize; | ||
private final int flushInterval; | ||
|
||
private Boolean isRunning = false; | ||
|
||
// This needs to be volatile because it will be updated in the main thread and the event dispatcher thread | ||
// needs to see the change immediately. | ||
private volatile ODPConfig odpConfig; | ||
private EventDispatcherThread eventDispatcherThread; | ||
|
||
private final ODPApiManager apiManager; | ||
|
||
// The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here | ||
// because `LinkedBlockingQueue` itself is thread safe. | ||
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment about thready-safety requirement will be helpful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) { | ||
this(odpConfig, apiManager, null, null, null); | ||
} | ||
|
||
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager, @Nullable Integer batchSize, @Nullable Integer queueSize, @Nullable Integer flushInterval) { | ||
this.odpConfig = odpConfig; | ||
this.apiManager = apiManager; | ||
this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE; | ||
this.queueSize = queueSize != null ? queueSize : DEFAULT_QUEUE_SIZE; | ||
this.flushInterval = (flushInterval != null && flushInterval > 0) ? flushInterval : DEFAULT_FLUSH_INTERVAL; | ||
} | ||
|
||
public void start() { | ||
isRunning = true; | ||
eventDispatcherThread = new EventDispatcherThread(); | ||
eventDispatcherThread.start(); | ||
} | ||
|
||
public void updateSettings(ODPConfig odpConfig) { | ||
this.odpConfig = odpConfig; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this replacement thread-safe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made |
||
} | ||
|
||
public void identifyUser(@Nullable String vuid, String userId) { | ||
Map<String, String> identifiers = new HashMap<>(); | ||
if (vuid != null) { | ||
identifiers.put(ODPUserKey.VUID.getKeyString(), vuid); | ||
} | ||
identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId); | ||
ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null); | ||
sendEvent(event); | ||
} | ||
|
||
public void sendEvent(ODPEvent event) { | ||
event.setData(augmentCommonData(event.getData())); | ||
processEvent(event); | ||
} | ||
|
||
private Map<String, Object> augmentCommonData(Map<String, Object> sourceData) { | ||
Map<String, Object> data = new HashMap<>(); | ||
data.put("idempotence_id", UUID.randomUUID().toString()); | ||
data.put("data_source_type", "sdk"); | ||
data.put("data_source", ClientEngineInfo.getClientEngine().getClientEngineValue()); | ||
data.put("data_source_version", BuildVersionInfo.getClientVersion()); | ||
data.putAll(sourceData); | ||
return data; | ||
} | ||
|
||
private void processEvent(ODPEvent event) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can consider filtering out silently events when apiKey/host is null. I see you discards them when flushing below. They don't need to be queued and discarded if ODP is not integrated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! |
||
if (!isRunning) { | ||
logger.warn("Failed to Process ODP Event. ODPEventManager is not running"); | ||
return; | ||
} | ||
|
||
if (!odpConfig.isReady()) { | ||
logger.debug("Unable to Process ODP Event. ODPConfig is not ready."); | ||
return; | ||
} | ||
|
||
if (eventQueue.size() >= queueSize) { | ||
logger.warn("Failed to Process ODP Event. Event Queue full. queueSize = " + queueSize); | ||
return; | ||
} | ||
|
||
if (!eventQueue.offer(event)) { | ||
logger.error("Failed to Process ODP Event. Event Queue is not accepting any more events"); | ||
} | ||
} | ||
|
||
public void stop() { | ||
logger.debug("Sending stop signal to ODP Event Dispatcher Thread"); | ||
eventDispatcherThread.signalStop(); | ||
} | ||
|
||
private class EventDispatcherThread extends Thread { | ||
|
||
private volatile boolean shouldStop = false; | ||
|
||
private final List<ODPEvent> currentBatch = new ArrayList<>(); | ||
|
||
private long nextFlushTime = new Date().getTime(); | ||
|
||
@Override | ||
public void run() { | ||
while (true) { | ||
try { | ||
ODPEvent nextEvent; | ||
|
||
// If batch has events, set the timeout to remaining time for flush interval, | ||
// otherwise wait for the new event indefinitely | ||
if (currentBatch.size() > 0) { | ||
nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS); | ||
} else { | ||
nextEvent = eventQueue.poll(); | ||
} | ||
|
||
if (nextEvent == null) { | ||
// null means no new events received and flush interval is over, dispatch whatever is in the batch. | ||
if (!currentBatch.isEmpty()) { | ||
flush(); | ||
} | ||
if (shouldStop) { | ||
break; | ||
} | ||
continue; | ||
} | ||
|
||
if (currentBatch.size() == 0) { | ||
// Batch starting, create a new flush time | ||
nextFlushTime = new Date().getTime() + flushInterval; | ||
} | ||
|
||
currentBatch.add(nextEvent); | ||
|
||
if (currentBatch.size() >= batchSize) { | ||
flush(); | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
|
||
logger.debug("Exiting ODP Event Dispatcher Thread."); | ||
} | ||
|
||
private void flush() { | ||
if (odpConfig.isReady()) { | ||
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); | ||
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; | ||
Integer statusCode; | ||
int numAttempts = 0; | ||
do { | ||
statusCode = apiManager.sendEvents(odpConfig.getApiKey(), endpoint, payload); | ||
numAttempts ++; | ||
} while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500)); | ||
} else { | ||
logger.debug("ODPConfig not ready, discarding event batch"); | ||
} | ||
currentBatch.clear(); | ||
} | ||
|
||
public void signalStop() { | ||
shouldStop = true; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment why this should be volatile?