-
Notifications
You must be signed in to change notification settings - Fork 32
feat: Added ODPEventManager implementation #487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
089ff22
432db4d
6d3bb18
b7883eb
8caa93e
File filter
Filter by extension
Conversations
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,8 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import javax.annotation.Nonnull; | ||
import javax.annotation.Nullable; | ||
import java.util.*; | ||
import java.util.concurrent.*; | ||
|
||
|
@@ -38,17 +40,20 @@ public class ODPEventManager { | |
private final int flushInterval; | ||
|
||
private Boolean isRunning = false; | ||
|
||
// This needs to be volatile because it will be updated in the main thread and the event dispatcher thread | ||
// needs to see the change immediately. | ||
10000 private volatile ODPConfig odpConfig; | ||
private EventDispatcherThread eventDispatcherThread; | ||
|
||
private final ODPApiManager apiManager; | ||
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comment about thready-safety requirement will be helpful. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) { | ||
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) { | ||
this(odpConfig, apiManager, null, null, null); | ||
} | ||
|
||
public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, Integer batchSize, Integer queueSize, Integer flushInterval) { | ||
public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager, @Nullable Integer batchSize, @Nullable Integer queueSize, @Nullable Integer flushInterval) { | ||
this.odpConfig = odpConfig; | ||
this.apiManager = apiManager; | ||
this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE; | ||
|
@@ -66,9 +71,11 @@ public void updateSettings(ODPConfig odpConfig) { | |
this.odpConfig = odpConfig; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this replacement thread-safe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made |
||
} | ||
|
||
public void identifyUser(String vuid, String userId) { | ||
public void identifyUser(@Nullable String vuid, String userId) { | ||
Map<String, String> identifiers = new HashMap<>(); | ||
identifiers.put(ODPUserKey.VUID.getKeyString(), vuid); | ||
if (vuid != null) { | ||
identifiers.put(ODPUserKey.VUID.getKeyString(), vuid); | ||
} | ||
identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId); | ||
ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null); | ||
sendEvent(event); | ||
|
@@ -96,7 +103,7 @@ private void processEvent(ODPEvent event) { | |
} | ||
|
||
if (!odpConfig.isReady()) { | ||
logger.debug("Unable to Process Event. ODPConfig is not ready."); | ||
logger.debug("Unable to Process ODP Event. ODPConfig is not ready."); | ||
return; | ||
} | ||
|
||
|
@@ -121,14 +128,22 @@ private class EventDispatcherThread extends Thread { | |
|
||
private final List<ODPEvent> currentBatch = new ArrayList<>(); | ||
|
||
private long lastFlushTime = new Date().getTime(); | ||
private long nextFlushTime = new Date().getTime(); | ||
|
||
@Override | ||
public void run() { | ||
while (true) { | ||
try { | ||
long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); | ||
ODPEvent nextEvent = eventQueue.poll(nextFlushMillis, TimeUnit.MILLISECONDS); | ||
ODPEvent nextEvent; | ||
|
||
// If batch has events, set the timeout to remaining time for flush interval, | ||
// otherwise wait for the new event indefinitely | ||
if (currentBatch.size() > 0) { | ||
//long nextFlushMillis = Math.max(0, flushInterval - (new Date().getTime() - lastFlushTime)); | ||
nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS); | ||
} else { | ||
nextEvent = eventQueue.poll(); | ||
} | ||
|
||
if (nextEvent == null) { | ||
// null means no new events received and flush interval is over, dispatch whatever is in the batch. | ||
|
@@ -141,6 +156,11 @@ public void run() { | |
continue; | ||
} | ||
|
||
if (currentBatch.size() == 0) { | ||
// Batch starting, create a new flush time | ||
nextFlushTime = new Date().getTime() + flushInterval; | ||
} | ||
|
||
currentBatch.add(nextEvent); | ||
|
||
if (currentBatch.size() >= batchSize) { | ||
|
@@ -155,8 +175,6 @@ public void run() { | |
} | ||
|
||
private void flush() { | ||
lastFlushTime = new Date().getTime(); | ||
|
||
if (odpConfig.isReady()) { | ||
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); | ||
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment why this should be volatile?