8000 feat: Added ODPEventManager implementation by zashraf1985 · Pull Request #487 · optimizely/java-sdk · GitHub
[go: up one dir, main page]

Skip to content
8000

feat: Added ODPEventManager implementation #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 16, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
added ODPEventManager implementation
  • Loading branch information
zashraf1985 committed Sep 1, 2022
commit 089ff22ecfa01ca071eb4e83bc6abbc3bafb28c8
161 changes: 161 additions & 0 deletions core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java
8000
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/**
*
* Copyright 2022, Optimizely
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.optimizely.ab.odp;

import com.optimizely.ab.event.internal.BuildVersionInfo;
import com.optimizely.ab.event.internal.ClientEngineInfo;
import com.optimizely.ab.odp.serializer.ODPJsonSerializerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

public class ODPEventManager {
private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class);
private static final int DEFAULT_BATCH_SIZE = 10;
private static final int DEFAULT_QUEUE_SIZE = 10000;
private static final int FLUSH_INTERVAL = 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the default interval to 10secs for all SDKs consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

private static final int MAX_RETRIES = 3;
private static final String EVENT_URL_PATH = "/v3/events";

private int queueSize = DEFAULT_QUEUE_SIZE;
private int batchSize = DEFAULT_BATCH_SIZE;

private Boolean isRunning = false;
private ODPConfig odpConfig;
private EventDispatcherThread eventDispatcherThread;

private final ODPApiManager apiManager;
private final List<ODPEvent> currentBatch = new ArrayList<>();
private final BlockingQueue<ODPEvent> eventQueue = new LinkedBlockingQueue<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread-safety for these - currentBatch and eventQueue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Moved currentBatch to the thread instance itself which runs in a loop which means there will be NO simultaneous access on this at all.
  2. eventQueue is an instance of LinkedBlockingQueue which itself is thread safe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment about thready-safety requirement will be helpful.

Copy link
Contributor Author
@zashraf1985 zashraf1985 Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager) {
this.apiManager = apiManager;
this.odpConfig = odpConfig;
}

public ODPEventManager(ODPConfig odpConfig, ODPApiManager apiManager, int batchSize, int queueSize) {
this(odpConfig, apiManager);
this.batchSize = batchSize;
this.queueSize = queueSize;
}

public void start() {
isRunning = true;
eventDispatcherThread = new EventDispatcherThread();
eventDispatcherThread.start();
}

public void updateSettings(ODPConfig odpConfig) {
this.odpConfig = odpConfig;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this replacement thread-safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made odpConfig volatile to make it thread safe.

}

public void sendEvents(List<ODPEvent> events) {
for (ODPEvent event: events) {
sendEvent(event);
}
}

public void sendEvent(ODPEvent event) {
event.setData(augmentCommonData(event.getData()));
processEvent(event);
}

private Map<String, Object> augmentCommonData(Map<String, Object> sourceData) {
Map<String, Object> data = new HashMap<>();
data.put("idempotence_id", UUID.randomUUID().toString());
data.put("data_source_type", "sdk");
data.put("data_source", ClientEngineInfo.getClientEngine().getClientEngineValue());
data.put("data_source_version", BuildVersionInfo.getClientVersion());
data.putAll(sourceData);
return data;
}

private void processEvent(ODPEvent event) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider filtering out silently events when apiKey/host is null. I see you discards them when flushing below. They don't need to be queued and discarded if ODP is not integrated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

if (!isRunning) {
logger.warn("Failed to Process ODP Event. ODPEventManager is not running");
return;
}

if (eventQueue.size() >= queueSize) {
logger.warn("Failed to Process ODP Event. Event Queue full. queueSize = " + queueSize);
return;
}

if (!eventQueue.offer(event)) {
logger.warn("Failed to Process ODP Event. Event Queue is not accepting any more events");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case, event is discarded here? It should be logger.error in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usual case would be when queue is at capacity but we are already checking that before this. This check is just there because offer can return false in case of an unexpected error. Changing log level to error

}
}

public void stop() {
logger.debug("Sending stop signal to ODP Event Dispatcher Thread");
eventDispatcherThread.signalStop();
}

private class EventDispatcherThread extends Thread {

private volatile boolean shouldStop = false;

@Override
public void run() {
while (true) {
try {
ODPEvent nextEvent = eventQueue.poll(FLUSH_INTERVAL, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use this fixed interval between events (instead of overrall interval between flush), the max gap between flushes can be increased up to (batchSize * FLUSH_INTERVAL). Is this what we have for logx event batching?
Some ODP events can be time-sensitive (relatively more than logx events), so it'll be good if we have more tight control of the extra delay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this whole thing. Made this timeout dynamic based on how much time elapsed since last flush.

if (nextEvent == null) {
// null means no new events received and flush interval is over, dispatch whatever is in the batch.
if (!currentBatch.isEmpty()) {
flush();
}
if (shouldStop) {
break;
}
continue;
}
if (currentBatch.size() >= batchSize) {
flush();
}
currentBatch.add(nextEvent);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

logger.warn("Exiting ODP Event Dispatcher Thread");
}

private void flush() {
if (odpConfig.isReady()) {
String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch);
String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH;
Integer statusCode;
int numAttempts = 0;
do {
statusCode = apiManager.sendEvents(odpConfig.getApiKey(), endpoint, payload);
numAttempts ++;
} while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500));
} else {
logger.warn("ODPConfig not ready, discarding event batch");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be debug level? When ODP is not integrated, we do not want dump many warning messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
currentBatch.clear();
}

public void signalStop() {
shouldStop = true;
}
}
}
0