8000 feat: Added ODPManager implementation by zashraf1985 · Pull Request #489 · optimizely/java-sdk · GitHub
[go: up one dir, main page]

Skip to content

feat: Added ODPManager implementation #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added force flush on when settings update.
  • Loading branch information
zashraf1985 committed Oct 17, 2022
commit ad5c86adeb6e4d602e36eada29629e29212c888c
4 changes: 4 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public synchronized void setAllSegments(List<String> allSegments) {
public Boolean equals(ODPConfig toCompare) {
return getApiHost().equals(toCompare.getApiHost()) && getApiKey().equals(toCompare.getApiKey()) && getAllSegments().equals(toCompare.allSegments);
}

public ODPConfig getClone() {
return new ODPConfig(apiKey, apiHost, allSegments);
}
}
33 changes: 28 additions & 5 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ODPEventManager {

// The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here
// because `LinkedBlockingQueue` itself is thread safe.
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<Object> eventQueue = new LinkedBlockingQueue<>();

public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) {
this(odpConfig, apiManager, null, null, null);
Expand All @@ -71,7 +71,10 @@ public void start() {
}

public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
if (!this.odpConfig.equals(odpConfig)) {
eventQueue.offer(new FlushEvent(this.odpConfig));
this.odpConfig = odpConfig;
}
}

public void identifyUser(@Nullable String vuid, String userId) {
Expand Down Expand Up @@ -137,7 +140,7 @@ private class EventDispatcherThread extends Thread {
public void run() {
while (true) {
try {
ODPEvent nextEvent;
Object nextEvent;

// If batch has events, set the timeout to remaining time for flush interval,
// otherwise wait for the new event indefinitely
Expand All @@ -158,12 +161,17 @@ public void run() {
continue;
}

if (nextEvent instanceof FlushEvent) {
flush(((FlushEvent) nextEvent).getOdpConfig());
continue;
}

if (currentBatch.size() == 0) {
// Batch starting, create a new flush time
nextFlushTime = new Date().getTime() + flushInterval;
}

currentBatch.add(nextEvent);
currentBatch.add((ODPEvent) nextEvent);

if (currentBatch.size() >= batchSize) {
flush();
Expand All @@ -176,7 +184,7 @@ public void run() {
logger.debug("Exiting ODP Event Dispatcher Thread.");
}

private void flush() {
private void flush(ODPConfig odpConfig) {
if (odpConfig.isReady()) {
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch);
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH;
Expand All @@ -192,8 +200,23 @@ private void flush() {
currentBatch.clear();
}

private void flush() {
flush(odpConfig);
}

public void signalStop() {
shouldStop = true;
}
}

private static class FlushEvent {
private final ODPConfig odpConfig;
public FlushEvent(ODPConfig odpConfig) {
this.odpConfig = odpConfig.getClone();
}

public ODPConfig getOdpConfig() {
return odpConfig;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,16 @@ public void applyUpdatedODPConfigWhenAvailable() throws InterruptedException {
Thread.sleep(500);
Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());
eventManager.updateSettings(new ODPConfig("new-key", "http://www.new-odp-host.com"));
Thread.sleep(1500);

// Should immediately Flush current batch with old ODP config when settings are changed
Thread.sleep(100);
Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any());

// New events should use new config
for (int i = 0; i < 10; i++) {
eventManager.sendEvent(getEvent(i));
}
Thread.sleep(100);
Mockito.verify(mockApiManager, times(1)).sendEvents(eq("new-key"), eq("http://www.new-odp-host.com/v3/events"), any());
}

Expand Down
0