8000 feat: Added ODPEventManager implementation by zashraf1985 · Pull Request #487 · optimizely/java-sdk · GitHub
[go: up one dir, main page]

Skip to content

feat: Added ODPEventManager implementation #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Incorporated review feedback
  • Loading branch information
zashraf1985 committed Sep 15, 2022
commit b7883eb3d4b6ddf7dd7e1a473ecf8f559f5d24ed
4 changes: 3 additions & 1 deletion core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.optimizely.ab.odp;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Map;

Expand All @@ -24,7 +26,7 @@ public class ODPEvent {
private Map<String, String > identifiers;
private Map<String, Object> data;

public ODPEvent(String type, String action, Map<String, String> identifiers, Map<String, Object> data) {
public ODPEvent(@Nonnull String type, @Nonnull String action, @Nullable Map<String, String> identifiers, @Nullable Map<String, Object> data) {
this.type = type;
this.action = action;
this.identifiers = identifiers != null ? identifiers : Collections.emptyMap();
Expand Down
38 changes: 28 additions & 10 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.*;

Expand All @@ -38,17 +40,20 @@ public class ODPEventManager {
private final int flushInterval;

private Boolean isRunning = false;

// This needs to be volatile because it will be updated in the main thread and the event dispatcher thread
// needs to see the change immediately.
10000 private volatile ODPConfig odpConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment why this should be volatile?

private EventDispatcherThread eventDispatcherThread;

private final ODPApiManager apiManager;
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment about thready-safety requirement will be helpful.

Copy link
Contributor Author
@zashraf1985 zashraf1985 Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) {
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
this(odpConfig, apiManager, null, null, null);
}

public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, Integer batchSize, Integer queueSize, Integer flushInterval) {
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager, @Nullable Integer batchSize, @Nullable Integer queueSize, @Nullable Integer flushInterval) {
this.odpConfig = odpConfig;
this.apiManager = apiManager;
this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE;
Expand All @@ -66,9 +71,11 @@ public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this replacement thread-safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made odpConfig volatile to make it thread safe.

}

public void identifyUser(String vuid, String userId) {
public void identifyUser(@Nullable String vuid, String userId) {
Map<String, String> identifiers = new HashMap<>();
identifiers.put(ODPUserKey.VUID.getKeyString(), vuid);
if (vuid != null) {
identifiers.put(ODPUserKey.VUID.getKeyString(), vuid);
}
identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId);
ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null);
sendEvent(event);
Expand Down Expand Up @@ -96,7 +103,7 @@ private void processEvent(ODPEvent event) {
}

if (!odpConfig.isReady()) {
logger.debug("Unable to Process Event. ODPConfig is not ready.");
logger.debug("Unable to Process ODP Event. ODPConfig is not ready.");
return;
}

Expand All @@ -121,14 +128,22 @@ private class EventDispatcherThread extends Thread {

private final List<ODPEvent> currentBatch = new ArrayList<>();

private long lastFlushTime = new Date().getTime();
private long nextFlushTime = new Date().getTime();

@Override
public void run() {
while (true) {
try {
long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime));
ODPEvent nextEvent = eventQueue.poll(nextFlushMillis, TimeUnit.MILLISECONDS);
ODPEvent nextEvent;

// If batch has events, set the timeout to remaining time for flush interval,
// otherwise wait for the new event indefinitely
if (currentBatch.size() > 0) {
//long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime));
nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS);
} else {
nextEvent = eventQueue.poll();
}

if (nextEvent == null) {
// null means no new events received and flush interval is over, dispatch whatever is in the batch.
Expand All @@ -141,6 +156,11 @@ public void run() {
continue;
}

if (currentBatch.size() == 0) {
// Batch starting, create a new flush time
nextFlushTime = new Date().getTime() + flushInterval;
}

currentBatch.add(nextEvent);

if (currentBatch.size() >= batchSize) {
Expand All @@ -155,8 +175,6 @@ public void run() {
}

private void flush() {
lastFlushTime = new Date().getTime();

if (odpConfig.isReady()) {
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch);
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void logAndDiscardEventWhenODPConfigNotReady() {
eventManager.start();
ODPEvent event = new ODPEvent("test-type", "test-action", Collections.emptyMap(), Collections.emptyMap());
eventManager.sendEvent(event);
logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process Event. ODPConfig is not ready.");
logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process ODP Event. ODPConfig is not ready.");
}

@Test
Expand Down
0