8000 feat: Added ODPEventManager implementation by zashraf1985 · Pull Request #487 · optimizely/java-sdk · GitHub
[go: up one dir, main page]

Skip to content

feat: Added ODPEventManager implementation #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.optimizely.ab.odp;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;

public class ODPEvent {
Expand All @@ -23,11 +26,11 @@ public class ODPEvent {
private Map<String, String > identifiers;
private Map<String, Object> data;

public ODPEvent(String type, String action, Map<String, String> identifiers, Map<String, Object> data) {
public ODPEvent(@Nonnull String type, @Nonnull String action, @Nullable Map<String, String> identifiers, @Nullable Map<String, Object> data) {
this.type = type;
this.action = action;
this.identifiers = identifiers;
this.data = data;
this.identifiers = identifiers != null ? identifiers : Collections.emptyMap();
this.data = data != null ? data : Collections.emptyMap();
}

public String getType() {
Expand Down
199 changes: 199 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/**
*
* Copyright 2022, Optimizely
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.optimizely.ab.odp;

import com.optimizely.ab.event.internal.BuildVersionInfo;
import com.optimizely.ab.event.internal.ClientEngineInfo;
import com.optimizely.ab.odp.serializer.ODPJsonSerializerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.*;

public class ODPEventManager {
private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class);
private static final int DEFAULT_BATCH_SIZE = 10;
private static final int DEFAULT_QUEUE_SIZE = 10000;
private static final int DEFAULT_FLUSH_INTERVAL = 1000;
private static final int MAX_RETRIES = 3;
private static final String EVENT_URL_PATH = "/v3/events";

private final int queueSize;
private final int batchSize;
private final int flushInterval;

private Boolean isRunning = false;

// This needs to be volatile because it will be updated in the main thread and the event dispatcher thread
// needs to see the change immediately.
private volatile ODPConfig odpConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment why this should be volatile?

private EventDispatcherThread eventDispatcherThread;

private final ODPApiManager apiManager;

// The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here
// because `LinkedBlockingQueue` itself is thread safe.
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment about thready-safety requirement will be helpful.

Copy link
Contributor Author
@zashraf1985 zashraf1985 Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
this(odpConfig, apiManager, null, null, null);
}

public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager, @Nullable Integer batchSize, @Nullable Integer queueSize, @Nullable Integer flushInterval) {
this.odpConfig = odpConfig;
this.apiManager = apiManager;
this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE;
this.queueSize = queueSize != null ? queueSize : DEFAULT_QUEUE_SIZE;
this.flushInterval = (flushInterval != null && flushInterval > 0) ? flushInterval : DEFAULT_FLUSH_INTERVAL;
}

public void start() {
isRunning = true;
eventDispatcherThread = new EventDispatcherThread();
eventDispatcherThread.start();
}

public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this replacement thread-safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made odpConfig volatile to make it thread safe.

}

public void identifyUser(@Nullable String vuid, String userId) {
Map<String, String> identifiers = new HashMap<>();
if (vuid != null) {
identifiers.put(ODPUserKey.VUID.getKeyString(), vuid);
}
identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId);
ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null);
sendEvent(event);
}

public void sendEvent(ODPEvent event) {
event.setData(augmentCommonData(event.getData()));
processEvent(event);
}

private Map<String, Object> augmentCommonData(Map<String, Object> sourceData) {
Map<String, Object> data = new HashMap<>();
data.put("idempotence_id", UUID.randomUUID().toString());
data.put("data_source_type", "sdk");
data.put("data_source", ClientEngineInfo.getClientEngine().getClientEngineValue());
data.put("data_source_version", BuildVersionInfo.getClientVersion());
data.putAll(sourceData);
return data;
}

private void processEvent(ODPEvent event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider filtering out silently events when apiKey/host is null. I see you discards them when flushing below. They don't need to be queued and discarded if ODP is not integrated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if (!isRunning) {
logger.warn("Failed to Process ODP Event. ODPEventManager is not running");
return;
}

if (!odpConfig.isReady()) {
logger.debug("Unable to Process ODP Event. ODPConfig is not ready.");
return;
}

if (eventQueue.size() >= queueSize) {
logger.warn("Failed to Process ODP Event. Event Queue full. queueSize = " + queueSize);
return;
}

if (!eventQueue.offer(event)) {
logger.error("Failed to Process ODP Event. Event Queue is not accepting any more events");
}
}

public void stop() {
logger.debug("Sending stop signal to ODP Event Dispatcher Thread");
eventDispatcherThread.signalStop();
}

private class EventDispatcherThread extends Thread {

private volatile boolean shouldStop = false;

private final List<ODPEvent> currentBatch = new ArrayList<>();

private long nextFlushTime = new Date().getTime();

@Override
public void run() {
while (true) {
try {
ODPEvent nextEvent;

// If batch has events, set the timeout to remaining time for flush interval,
// otherwise wait for the new event indefinitely
if (currentBatch.size() > 0) {
nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS);
} else {
nextEvent = eventQueue.poll();
}

if (nextEvent == null) {
// null means no new events received and flush interval is over, dispatch whatever is in the batch.
if (!currentBatch.isEmpty()) {
flush();
}
if (shouldStop) {
break;
}
continue;
}

if (currentBatch.size() == 0) {
// Batch starting, create a new flush time
nextFlushTime = new Date().getTime() + flushInterval;
}

currentBatch.add(nextEvent);

if (currentBatch.size() >= batchSize) {
flush();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

logger.debug("Exiting ODP Event Dispatcher Thread.");
}

private void flush() {
if (odpConfig.isReady()) {
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch);
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH;
Integer statusCode;
int numAttempts = 0;
do {
statusCode = apiManager.sendEvents(odpConfig.getApiKey(), endpoint, payload);
numAttempts ++;
} while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500));
} else {
logger.debug("ODPConfig not ready, discarding event batch");
}
currentBatch.clear();
}

public void signalStop() {
shouldStop = true;
}
}
}
Loading
0