From 089ff22ecfa01ca071eb4e83bc6abbc3bafb28c8 Mon Sep 17 00:00:00 2001 From: zashraf1985 Date: Wed, 31 Aug 2022 23:20:51 -0700 Subject: [PATCH 1/5] added ODPEventManager implementation --- .../optimizely/ab/odp/ODPEventManager.java | 161 ++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java new file mode 100644 index 000000000..371b6c9a7 --- /dev/null +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -0,0 +1,161 @@ +/** + * + * Copyright 2022, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.optimizely.ab.odp; + +import com.optimizely.ab.event.internal.BuildVersionInfo; +import com.optimizely.ab.event.internal.ClientEngineInfo; +import com.optimizely.ab.odp.serializer.ODPJsonSerializerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; + +public class ODPEventManager { + private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class); + private static final int DEFAULT_BATCH_SIZE = 10; + private static final int DEFAULT_QUEUE_SIZE = 10000; + private static final int FLUSH_INTERVAL = 1000; + private static final int MAX_RETRIES = 3; + private static final String EVENT_URL_PATH = "/v3/events"; + + private int queueSize = DEFAULT_QUEUE_SIZE; + private int batchSize = DEFAULT_BATCH_SIZE; + + private Boolean isRunning = false; + private ODPConfig odpConfig; + private EventDispatcherThread eventDispatcherThread; + + private final ODPApiManager apiManager; + private final List currentBatch = new ArrayList<>(); + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); + + public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) { + this.apiManager = apiManager; + this.odpConfig = odpConfig; + } + + public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, int batchSize, int queueSize) { + this(odpConfig, apiManager); + this.batchSize = batchSize; + this.queueSize = queueSize; + } + + public void start() { + isRunning = true; + eventDispatcherThread = new EventDispatcherThread(); + eventDispatcherThread.start(); + } + + public void updateSettings(ODPConfig odpConfig) { + this.odpConfig = odpConfig; + } + + public void sendEvents(List events) { + for (ODPEvent event: events) { + sendEvent(event); + } + } + + public void sendEvent(ODPEvent event) { + event.setData(augmentCommonData(event.getData())); + processEvent(event); + } + + private Map augmentCommonData(Map sourceData) { + Map data = new HashMap<>(); + data.put("idempotence_id", UUID.randomUUID().toString()); + data.put("data_source_type", "sdk"); + data.put("data_source", ClientEngineInfo.getClientEngine().getClientEngineValue()); + data.put("data_source_version", BuildVersionInfo.getClientVersion()); + data.putAll(sourceData); + return data; + } + + private void processEvent(ODPEvent event) { + if (!isRunning) { + logger.warn("Failed to Process ODP Event. ODPEventManager is not running"); + return; + } + + if (eventQueue.size() >= queueSize) { + logger.warn("Failed to Process ODP Event. Event Queue full. queueSize = " + queueSize); + return; + } + + if (!eventQueue.offer(event)) { + logger.warn("Failed to Process ODP Event. Event Queue is not accepting any more events"); + } + } + + public void stop() { + logger.debug("Sending stop signal to ODP Event Dispatcher Thread"); + eventDispatcherThread.signalStop(); + } + + private class EventDispatcherThread extends Thread { + + private volatile boolean shouldStop = false; + + @Override + public void run() { + while (true) { + try { + ODPEvent nextEvent = eventQueue.poll(FLUSH_INTERVAL, TimeUnit.MILLISECONDS); + if (nextEvent == null) { + // null means no new events received and flush interval is over, dispatch whatever is in the batch. + if (!currentBatch.isEmpty()) { + flush(); + } + if (shouldStop) { + break; + } + continue; + } + if (currentBatch.size() >= batchSize) { + flush(); + } + currentBatch.add(nextEvent); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + logger.warn("Exiting ODP Event Dispatcher Thread"); + } + + private void flush() { + if (odpConfig.isReady()) { + String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); + String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; + Integer statusCode; + int numAttempts = 0; + do { + statusCode = apiManager.sendEvents(odpConfig.getApiKey(), endpoint, payload); + numAttempts ++; + } while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500)); + } else { + logger.warn("ODPConfig not ready, discarding event batch"); + } + currentBatch.clear(); + } + + public void signalStop() { + shouldStop = true; + } + } +} From 432db4dceba1f03f3caa4cf02d89c390ac4ea99c Mon Sep 17 00:00:00 2001 From: zashraf1985 Date: Mon, 12 Sep 2022 18:40:54 -0700 Subject: [PATCH 2/5] incorporated review feedback. Unit tests are still pending. --- .../optimizely/ab/odp/ODPEventManager.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java index 371b6c9a7..7940e045f 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -37,11 +37,10 @@ public class ODPEventManager { private int batchSize = DEFAULT_BATCH_SIZE; private Boolean isRunning = false; - private ODPConfig odpConfig; + private volatile ODPConfig odpConfig; private EventDispatcherThread eventDispatcherThread; private final ODPApiManager apiManager; - private final List currentBatch = new ArrayList<>(); private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) { @@ -87,6 +86,11 @@ private Map augmentCommonData(Map sourceData) { } private void processEvent(ODPEvent event) { + if (!odpConfig.isReady()) { + logger.debug("Unable to Process Event. ODPConfig is not ready."); + return; + } + if (!isRunning) { logger.warn("Failed to Process ODP Event. ODPEventManager is not running"); return; @@ -98,7 +102,7 @@ private void processEvent(ODPEvent event) { } if (!eventQueue.offer(event)) { - logger.warn("Failed to Process ODP Event. Event Queue is not accepting any more events"); + logger.error("Failed to Process ODP Event. Event Queue is not accepting any more events"); } } @@ -111,11 +115,17 @@ private class EventDispatcherThread extends Thread { private volatile boolean shouldStop = false; + private final List currentBatch = new ArrayList<>(); + + private long lastFlushTime = new Date().getTime(); + @Override public void run() { while (true) { try { - ODPEvent nextEvent = eventQueue.poll(FLUSH_INTERVAL, TimeUnit.MILLISECONDS); + long nextFlushMillis = Math.max(0, FLUSH_INTERVAL - (new Date().getTime() - lastFlushTime)); + ODPEvent nextEvent = eventQueue.poll(nextFlushMillis, TimeUnit.MILLISECONDS); + if (nextEvent == null) { // null means no new events received and flush interval is over, dispatch whatever is in the batch. if (!currentBatch.isEmpty()) { @@ -126,10 +136,12 @@ public void run() { } continue; } + + currentBatch.add(nextEvent); + if (currentBatch.size() >= batchSize) { flush(); } - currentBatch.add(nextEvent); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -139,6 +151,8 @@ public void run() { } private void flush() { + lastFlushTime = new Date().getTime(); + if (odpConfig.isReady()) { String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; @@ -149,7 +163,7 @@ private void flush() { numAttempts ++; } while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500)); } else { - logger.warn("ODPConfig not ready, discarding event batch"); + logger.debug("ODPConfig not ready, discarding event batch"); } currentBatch.clear(); } From 6d3bb1881c2df8f28506550cdb0d090725fa3faa Mon Sep 17 00:00:00 2001 From: zashraf1985 Date: Tue, 13 Sep 2022 18:58:52 -0700 Subject: [PATCH 3/5] Added unit tests --- .../java/com/optimizely/ab/odp/ODPEvent.java | 5 +- .../optimizely/ab/odp/ODPEventManager.java | 42 ++-- .../ab/odp/ODPEventManagerTest.java | 234 ++++++++++++++++++ 3 files changed, 260 insertions(+), 21 deletions(-) create mode 100644 core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java index 34bd340b6..514495568 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java @@ -15,6 +15,7 @@ */ package com.optimizely.ab.odp; +import java.util.Collections; import java.util.Map; public class ODPEvent { @@ -26,8 +27,8 @@ public class ODPEvent { public ODPEvent(String type, String action, Map identifiers, Map data) { this.type = type; this.action = action; - this.identifiers = identifiers; - this.data = data; + this.identifiers = identifiers != null ? identifiers : Collections.emptyMap(); + this.data = data != null ? data : Collections.emptyMap(); } public String getType() { diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java index 7940e045f..0c2cbd42c 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -29,12 +29,13 @@ public class ODPEventManager { private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class); private static final int DEFAULT_BATCH_SIZE = 10; private static final int DEFAULT_QUEUE_SIZE = 10000; - private static final int FLUSH_INTERVAL = 1000; + private static final int DEFAULT_FLUSH_INTERVAL = 1000; private static final int MAX_RETRIES = 3; private static final String EVENT_URL_PATH = "/v3/events"; - private int queueSize = DEFAULT_QUEUE_SIZE; - private int batchSize = DEFAULT_BATCH_SIZE; + private final int queueSize; + private final int batchSize; + private final int flushInterval; private Boolean isRunning = false; private volatile ODPConfig odpConfig; @@ -44,14 +45,15 @@ public class ODPEventManager { private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) { - this.apiManager = apiManager; - this.odpConfig = odpConfig; + this(odpConfig, apiManager, null, null, null); } - public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, int batchSize, int queueSize) { - this(odpConfig, apiManager); - this.batchSize = batchSize; - this.queueSize = queueSize; + public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, Integer batchSize, Integer queueSize, Integer flushInterval) { + this.odpConfig = odpConfig; + this.apiManager = apiManager; + this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE; + this.queueSize = queueSize != null ? queueSize : DEFAULT_QUEUE_SIZE; + this.flushInterval = (flushInterval != null && flushInterval > 0) ? flushInterval : DEFAULT_FLUSH_INTERVAL; } public void start() { @@ -64,10 +66,12 @@ public void updateSettings(ODPConfig odpConfig) { this.odpConfig = odpConfig; } - public void sendEvents(List events) { - for (ODPEvent event: events) { - sendEvent(event); - } + public void identifyUser(String vuid, String userId) { + Map identifiers = new HashMap<>(); + identifiers.put(ODPUserKey.VUID.getKeyString(), vuid); + identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId); + ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null); + sendEvent(event); } public void sendEvent(ODPEvent event) { @@ -86,13 +90,13 @@ private Map augmentCommonData(Map sourceData) { } private void processEvent(ODPEvent event) { - if (!odpConfig.isReady()) { - logger.debug("Unable to Process Event. ODPConfig is not ready."); + if (!isRunning) { + logger.warn("Failed to Process ODP Event. ODPEventManager is not running"); return; } - if (!isRunning) { - logger.warn("Failed to Process ODP Event. ODPEventManager is not running"); + if (!odpConfig.isReady()) { + logger.debug("Unable to Process Event. ODPConfig is not ready."); return; } @@ -123,7 +127,7 @@ private class EventDispatcherThread extends Thread { public void run() { while (true) { try { - long nextFlushMillis = Math.max(0, FLUSH_INTERVAL - (new Date().getTime() - lastFlushTime)); + long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); ODPEvent nextEvent = eventQueue.poll(nextFlushMillis, TimeUnit.MILLISECONDS); if (nextEvent == null) { @@ -147,7 +151,7 @@ public void run() { } } - logger.warn("Exiting ODP Event Dispatcher Thread"); + logger.debug("Exiting ODP Event Dispatcher Thread."); } private void flush() { diff --git a/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java new file mode 100644 index 000000000..9eb951e64 --- /dev/null +++ b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java @@ -0,0 +1,234 @@ +/** + * + * Copyright 2022, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.optimizely.ab.odp; + +import ch.qos.logback.classic.Level; +import com.optimizely.ab.internal.LogbackVerifier; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +@RunWith(MockitoJUnitRunner.class) +public class ODPEventManagerTest { + + @Rule + public LogbackVerifier logbackVerifier = new LogbackVerifier(); + + @Mock + ODPApiManager mockApiManager; + + @Captor + ArgumentCaptor payloadCaptor; + + @Before + public void setup() { + mockApiManager = mock(ODPApiManager.class); + } + + @Test + public void logAndDiscardEventWhenEventManagerIsNotRunning() { + ODPConfig odpConfig = new ODPConfig("key", "host", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + ODPEvent event = new ODPEvent("test-type", "test-action", Collections.emptyMap(), Collections.emptyMap()); + eventManager.sendEvent(event); + logbackVerifier.expectMessage(Level.WARN, "Failed to Process ODP Event. ODPEventManager is not running"); + } + + @Test + public void logAndDiscardEventWhenODPConfigNotReady() { + ODPConfig odpConfig = new ODPConfig(null, null, null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + ODPEvent event = new ODPEvent("test-type", "test-action", Collections.emptyMap(), Collections.emptyMap()); + eventManager.sendEvent(event); + logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process Event. ODPConfig is not ready."); + } + + @Test + public void dispatchEventsInCorrectNumberOfBatches() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + } + + @Test + public void dispatchEventsWithCorrectPayload() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + int batchSize = 2; + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager, batchSize, null, null); + eventManager.start(); + for (int i = 0; i < 6; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), payloadCaptor.capture()); + List payloads = payloadCaptor.getAllValues(); + + for (int i = 0; i < payloads.size(); i++) { + JSONArray events = new JSONArray(payloads.get(i)); + assertEquals(batchSize, events.length()); + for (int j = 0; j < events.length(); j++) { + int id = (batchSize * i) + j; + JSONObject event = events.getJSONObject(j); + assertEquals("test-type-" + id , event.getString("type")); + assertEquals("test-action-" + id , event.getString("action")); + assertEquals("value1-" + id, event.getJSONObject("identifiers").getString("identifier1")); + assertEquals("value2-" + id, event.getJSONObject("identifiers").getString("identifier2")); + assertEquals("data-value1-" + id, event.getJSONObject("data").getString("data1")); + assertEquals(id, event.getJSONObject("data").getInt("data2")); + assertEquals("sdk", event.getJSONObject("data").getString("data_source_type")); + } + } + } + + @Test + public void dispatchEventsWithCorrectFlushInterval() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + + // Last batch is incomplete so it needs almost a second to flush. + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + } + + @Test + public void retryFailedEvents() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(500); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + + // Should be called thrice for each batch + Mockito.verify(mockApiManager, times(6)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + + // Last batch is incomplete so it needs almost a second to flush. + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(9)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + } + + @Test + public void shouldFlushAllScheduledEventsBeforeStopping() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + eventManager.stop(); + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + logbackVerifier.expectMessage(Level.DEBUG, "Exiting ODP Event Dispatcher Thread."); + } + + @Test + public void prepareCorrectPayloadForIdentifyUser() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + int batchSize = 2; + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager, batchSize, null, null); + eventManager.start(); + for (int i = 0; i < 2; i++) { + eventManager.identifyUser("the-vuid-" + i, "the-fs-user-id-" + i); + } + + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(1)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), payloadCaptor.capture()); + + String payload = payloadCaptor.getValue(); + JSONArray events = new JSONArray(payload); + assertEquals(batchSize, events.length()); + for (int i = 0; i < events.length(); i++) { + JSONObject event = events.getJSONObject(i); + assertEquals("fullstack", event.getString("type")); + assertEquals("client_initialized", event.getString("action")); + assertEquals("the-vuid-" + i, event.getJSONObject("identifiers").getString("vuid")); + assertEquals("the-fs-user-id-" + i, event.getJSONObject("identifiers").getString("fs_user_id")); + assertEquals("sdk", event.getJSONObject("data").getString("data_source_type")); + } + } + + @Test + public void applyUpdatedODPConfigWhenAvailable() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + eventManager.updateSettings(new ODPConfig("new-key", "http://www.new-odp-host.com")); + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(1)).sendEvents(eq("new-key"), eq("http://www.new-odp-host.com/v3/events"), any()); + } + + private ODPEvent getEvent(int id) { + Map identifiers = new HashMap<>(); + identifiers.put("identifier1", "value1-" + id); + identifiers.put("identifier2", "value2-" + id); + + Map data = new HashMap<>(); + data.put("data1", "data-value1-" + id); + data.put("data2", id); + + return new ODPEvent("test-type-" + id , "test-action-" + id, identifiers, data); + } +} From b7883eb3d4b6ddf7dd7e1a473ecf8f559f5d24ed Mon Sep 17 00:00:00 2001 From: zashraf1985 Date: Wed, 14 Sep 2022 22:54:29 -0700 Subject: [PATCH 4/5] Incorporated review feedback --- .../java/com/optimizely/ab/odp/ODPEvent.java | 4 +- .../optimizely/ab/odp/ODPEventManager.java | 38 ++++++++++++++----- .../ab/odp/ODPEventManagerTest.java | 2 +- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java index 514495568..903bcf663 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java @@ -15,6 +15,8 @@ */ package com.optimizely.ab.odp; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Map; @@ -24,7 +26,7 @@ public class ODPEvent { private Map identifiers; private Map data; - public ODPEvent(String type, String action, Map identifiers, Map data) { + public ODPEvent(@Nonnull String type, @Nonnull String action, @Nullable Map identifiers, @Nullable Map data) { this.type = type; this.action = action; this.identifiers = identifiers != null ? identifiers : Collections.emptyMap(); diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java index 0c2cbd42c..e2eb3ecb1 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -22,6 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.*; import java.util.concurrent.*; @@ -38,17 +40,20 @@ public class ODPEventManager { private final int flushInterval; private Boolean isRunning = false; + + // This needs to be volatile because it will be updated in the main thread and the event dispatcher thread + // needs to see the change immediately. private volatile ODPConfig odpConfig; private EventDispatcherThread eventDispatcherThread; private final ODPApiManager apiManager; private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); - public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) { + public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) { this(odpConfig, apiManager, null, null, null); } - public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, Integer batchSize, Integer queueSize, Integer flushInterval) { + public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager, @Nullable Integer batchSize, @Nullable Integer queueSize, @Nullable Integer flushInterval) { this.odpConfig = odpConfig; this.apiManager = apiManager; this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE; @@ -66,9 +71,11 @@ public void updateSettings(ODPConfig odpConfig) { this.odpConfig = odpConfig; } - public void identifyUser(String vuid, String userId) { + public void identifyUser(@Nullable String vuid, String userId) { Map identifiers = new HashMap<>(); - identifiers.put(ODPUserKey.VUID.getKeyString(), vuid); + if (vuid != null) { + identifiers.put(ODPUserKey.VUID.getKeyString(), vuid); + } identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId); ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null); sendEvent(event); @@ -96,7 +103,7 @@ private void processEvent(ODPEvent event) { } if (!odpConfig.isReady()) { - logger.debug("Unable to Process Event. ODPConfig is not ready."); + logger.debug("Unable to Process ODP Event. ODPConfig is not ready."); return; } @@ -121,14 +128,22 @@ private class EventDispatcherThread extends Thread { private final List currentBatch = new ArrayList<>(); - private long lastFlushTime = new Date().getTime(); + private long nextFlushTime = new Date().getTime(); @Override public void run() { while (true) { try { - long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); - ODPEvent nextEvent = eventQueue.poll(nextFlushMillis, TimeUnit.MILLISECONDS); + ODPEvent nextEvent; + + // If batch has events, set the timeout to remaining time for flush interval, + // otherwise wait for the new event indefinitely + if (currentBatch.size() > 0) { + //long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); + nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS); + } else { + nextEvent = eventQueue.poll(); + } if (nextEvent == null) { // null means no new events received and flush interval is over, dispatch whatever is in the batch. @@ -141,6 +156,11 @@ public void run() { continue; } + if (currentBatch.size() == 0) { + // Batch starting, create a new flush time + nextFlushTime = new Date().getTime() + flushInterval; + } + currentBatch.add(nextEvent); if (currentBatch.size() >= batchSize) { @@ -155,8 +175,6 @@ public void run() { } private void flush() { - lastFlushTime = new Date().getTime(); - if (odpConfig.isReady()) { String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; diff --git a/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java index 9eb951e64..7be51e415 100644 --- a/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java +++ b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java @@ -72,7 +72,7 @@ public void logAndDiscardEventWhenODPConfigNotReady() { eventManager.start(); ODPEvent event = new ODPEvent("test-type", "test-action", Collections.emptyMap(), Collections.emptyMap()); eventManager.sendEvent(event); - logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process Event. ODPConfig is not ready."); + logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process ODP Event. ODPConfig is not ready."); } @Test From 8caa93e870c2718fe18034a941de429961b416cd Mon Sep 17 00:00:00 2001 From: zashraf1985 Date: Wed, 14 Sep 2022 22:58:13 -0700 Subject: [PATCH 5/5] Added a comment on an instance variable --- .../src/main/java/com/optimizely/ab/odp/ODPEventManager.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java index e2eb3ecb1..7cc601f29 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -47,6 +47,9 @@ public class ODPEventManager { private EventDispatcherThread eventDispatcherThread; private final ODPApiManager apiManager; + + // The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here + // because `LinkedBlockingQueue` itself is thread safe. private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) { @@ -139,7 +142,6 @@ public void run() { // If batch has events, set the timeout to remaining time for flush interval, // otherwise wait for the new event indefinitely if (currentBatch.size() > 0) { - //long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS); } else { nextEvent = eventQueue.poll();