8000 feat: add lifecycle hooks to the operator framework · jeesmon/java-operator-sdk@d7871f0 · GitHub
[go: up one dir, main page]

Skip to content

Commit d7871f0

Browse files
committed
feat: add lifecycle hooks to the operator framework
1 parent 83b36c5 commit d7871f0

File tree

8 files changed

+125
-41
lines changed

8 files changed

+125
-41
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@
1515
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
1616
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1717
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
18+
import java.io.Closeable;
19+
import java.io.IOException;
20+
import java.util.ArrayList;
1821
import java.util.Arrays;
22+
import java.util.List;
1923
import org.slf4j.Logger;
2024
import org.slf4j.LoggerFactory;
2125

2226
@SuppressWarnings("rawtypes")
23-
public class Operator {
27+
public class Operator implements AutoCloseable {
2428

2529
private static final Logger log = LoggerFactory.getLogger(Operator.class);
2630
private final KubernetesClient k8sClient;
2731
private final ConfigurationService configurationService;
2832
private final ObjectMapper objectMapper;
33+
private final List<Closeable> closeables;
2934

3035
public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) {
3136
this(k8sClient, configurationService, new ObjectMapper());
@@ -38,6 +43,7 @@ public Operator(
3843
this.k8sClient = k8sClient;
3944
this.configurationService = configurationService;
4045
this.objectMapper = objectMapper;
46+
this.closeables = new ArrayList<>();
4147
}
4248

4349
/**
@@ -64,6 +70,21 @@ public void start() {
6470
}
6571
}
6672

73+
/** Stop the operator. */
74+
@Override
75+
public void close() {
76+
log.info("Operator {} is shutting down...", configurationService.getVersion().getSdkVersion());
77+
78+
for (Closeable closeable : this.closeables) {
79+
try {
80+
log.debug("closing {}", closeable);
81+
closeable.close();
82+
} catch (IOException e) {
83+
log.warn("Error closing {}", closeable, e);
84+
}
85+
}
86+
}
87+
6788
/**
6889
* Registers the specified controller with this operator.
6990
*
@@ -160,10 +181,15 @@ public <R extends CustomResource> void register(
160181
customResourceCache,
161182
watchAllNamespaces,
162183
targetNamespaces,
163-
defaultEventHandler,
164184
configuration.isGenerationAware(),
165-
finalizer);
166-
eventSourceManager.registerCustomResourceEventSource(customResourceEventSource);
185+
finalizer,
186+
resClass);
187+
188+
closeables.add(customResourceEventSource);
189+
closeables.add(eventSourceManager);
190+
191+
customResourceEventSource.setEventHandler(defaultEventHandler);
192+
customResourceEventSource.start();
167193

168194
log.info(
169195
"Registered Controller: '{}' for CRD: '{}' for namespace(s): {}",
@@ -178,18 +204,14 @@ private CustomResourceEventSource createCustomResourceEventSource(
178204
CustomResourceCache customResourceCache,
179205
boolean watchAllNamespaces,
180206
String[] targetNamespaces,
181-
DefaultEventHandler defaultEventHandler,
182207
boolean generationAware,
183-
String finalizer) {
184-
CustomResourceEventSource customResourceEventSource =
185-
watchAllNamespaces
186-
? CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
187-
customResourceCache, client, generationAware, finalizer)
188-
: CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(
189-
customResourceCache, client, targetNamespaces, generationAware, finalizer);
190-
191-
customResourceEventSource.setEventHandler(defaultEventHandler);
192-
193-
return customResourceEventSource;
208+
String finalizer,
209+
Class<?> resClass) {
210+
211+
return watchAllNamespaces
212+
? CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
213+
customResourceCache, client, generationAware, finalizer, resClass)
214+
: CustomResourceEventSource.customResourceEventSourceForTargetNamespaces(
215+
customResourceCache, client, targetNamespaces, generationAware, finalizer, resClass);
194216
}
195217
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 12 additions & 0 deletions
D5B
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class DefaultEventHandler implements EventHandler {
3737
private final EventDispatcher eventDispatcher;
3838
private final Retry retry;
3939
private final Map<String, RetryExecution> retryState = new HashMap<>();
40+
private final String controllerName;
4041
private DefaultEventSourceManager eventSourceManager;
4142

4243
private final ReentrantLock lock = new ReentrantLock();
@@ -50,6 +51,7 @@ public DefaultEventHandler(
5051
this.customResourceCache = customResourceCache;
5152
this.eventDispatcher = eventDispatcher;
5253
this.retry = retry;
54+
this.controllerName = relatedControllerName;
5355
eventBuffer = new EventBuffer();
5456
executor =
5557
new ScheduledThreadPoolExecutor(
@@ -70,6 +72,16 @@ public DefaultEventHandler(
7072
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
7173
}
7274

75+
@Override
76+
public void close() {
77+
if (eventSourceManager != null) {
78+
log.debug("Closing EventSourceManager {} -> {}", controllerName, eventSourceManager);
79+
eventSourceManager.close();
80+
}
81+
82+
executor.shutdownNow();
83+
}
84+
7385
public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) {
7486
this.eventSourceManager = eventSourceManager;
7587
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

33
import io.javaoperatorsdk.operator.processing.DefaultEventHandler;
4-
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
54
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
65
import java.util.Collections;
76
import java.util.Map;
@@ -30,9 +29,23 @@ public DefaultEventSourceManager(DefaultEventHandler defaultEventHandler, boolea
3029
}
3130
}
3231

33-
public void registerCustomResourceEventSource(
34-
CustomResourceEventSource customResourceEventSource) {
35-
customResourceEventSource.addedToEventManager();
32+
@Override
33+
public void close() {
34+
try {
35+
lock.lock();
36+
for (var entry : eventSources.entrySet()) {
37+
try {
38+
log.debug("Closing {} -> {}", entry.getKey(), entry.getValue());
39+
entry.getValue().close();
40+
} catch (Exception e) {
41+
log.warn("Error closing {} -> {}", entry.getKey(), entry.getValue(), e);
42+
}
43+
}
44+
45+
eventSources.clear();
46+
} finally {
47+
lock.unlock();
48+
}
3649
}
3750

3851
@Override
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
public interface EventHandler {
3+
import java.io.Closeable;
4+
5+
public interface EventHandler extends Closeable {
46

57
void handleEvent(Event event);
8+
9+
@Override
10+
default void close() {}
611
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
public interface EventSource extends AutoCloseable {
3+
import java.io.Closeable;
4+
5+
public interface EventSource extends Closeable {
46

57
/**
68
* This method is invoked when this {@link EventSource} instance is properly registered to a

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.io.Closeable;
34
import java.util.Map;
45
import java.util.Optional;
56

6-
public interface EventSourceManager {
7+
public interface EventSourceManager extends Closeable {
78

89
/**
910
* Add the {@link EventSource} identified by the given <code>name</code> to the event manager.
@@ -29,4 +30,7 @@ Optional<EventSource> deRegisterCustomResourceFromEventSource(
2930
String name, String customResourceUid);
3031

3132
Map<String, EventSource> getRegisteredEventSources();
33+
34+
@Override
35+
default void close() {}
3236
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
55

66
import io.fabric8.kubernetes.client.CustomResource;
7+
import io.fabric8.kubernetes.client.Watch;
78
import io.fabric8.kubernetes.client.Watcher;
89
import io.fabric8.kubernetes.client.WatcherException;
910
import io.fabric8.kubernetes.client.dsl.MixedOperation;
1011
import io.fabric8.kubernetes.client.dsl.internal.CustomResourceOperationsImpl;
1112
import io.javaoperatorsdk.operator.processing.CustomResourceCache;
1213
import io.javaoperatorsdk.operator.processing.KubernetesResourceUtils;
1314
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
15+
import java.util.ArrayList;
16+
import java.util.List;
1417
import java.util.Map;
1518
import java.util.concurrent.ConcurrentHashMap;
1619
import org.slf4j.Logger;
@@ -23,62 +26,84 @@ public class CustomResourceEventSource extends AbstractEventSource
2326
private static final Logger log = LoggerFactory.getLogger(CustomResourceEventSource.class);
2427

2528
private final CustomResourceCache resourceCache;
26-
private MixedOperation client;
29+
private final MixedOperation client;
2730
private final String[] targetNamespaces;
2831
private final boolean generationAware;
2932
private final String resourceFinalizer;
3033
private final Map<String, Long> lastGenerationProcessedSuccessfully = new ConcurrentHashMap<>();
34+
private final List<Watch> watches;
35+
private final String resClass;
3136

3237
public static CustomResourceEventSource customResourceEventSourceForAllNamespaces(
3338
CustomResourceCache customResourceCache,
3439
MixedOperation client,
3540
boolean generationAware,
36-
String resourceFinalizer) {
41+
String resourceFinalizer,
42+
Class<?> resClass) {
3743
return new CustomResourceEventSource(
38-
customResourceCache, client, null, generationAware, resourceFinalizer);
44+
customResourceCache, client, null, generationAware, resourceFinalizer, resClass);
3945
}
4046

4147
public static CustomResourceEventSource customResourceEventSourceForTargetNamespaces(
4248
CustomResourceCache customResourceCache,
4349
MixedOperation client,
4450
String[] namespaces,
4551
boolean generationAware,
46-
String resourceFinalizer) {
52+
String resourceFinalizer,
53+
Class<?> resClass) {
4754
return new CustomResourceEventSource(
48-
customResourceCache, client, namespaces, generationAware, resourceFinalizer);
55+
customResourceCache, client, namespaces, generationAware, resourceFinalizer, resClass);
4956
}
5057

5158
private CustomResourceEventSource(
5259
CustomResourceCache customResourceCache,
5360
MixedOperation client,
5461
String[] targetNamespaces,
5562
boolean generationAware,
56-
String resourceFinalizer) {
63+
String resourceFinalizer,
64+
Class<?> resClass) {
5765
this.resourceCache = customResourceCache;
5866
this.client = client;
5967
this.targetNamespaces = targetNamespaces;
6068
this.generationAware = generationAware;
6169
this.resourceFinalizer = resourceFinalizer;
70+
this.watches = new ArrayList<>();
71+
this.resClass = resClass.getName();
6272
}
6373

6474
private boolean isWatchAllNamespaces() {
6575
return targetNamespaces == null;
6676
}
6777

68-
public void addedToEventManager() {
69-
registerWatch();
70-
}
71-
72-
private void registerWatch() {
78+
@Override
79+
public void start() {
7380
CustomResourceOperationsImpl crClient = (CustomResourceOperationsImpl) client;
7481
if (isWatchAllNamespaces()) {
75-
crClient.inAnyNamespace().watch(this);
82+
var w = crClient.inAnyNamespace().watch(this);
83+
watches.add(w);
84+
log.debug("Registered controller {} -> {} for any namespace", resClass, w);
7685
} else if (targetNamespaces.length == 0) {
77-
client.watch(this);
86+
var w = client.watch(this);
87+
watches.add(w);
88+
log.debug(
89+
"Registered controller {} -> {} for namespace {}", resClass, w, crClient.getNamespace());
7890
} else {
7991
for (String targetNamespace : targetNamespaces) {
80-
crClient.inNamespace(targetNamespace).watch(this);
81-
log.debug("Registered controller for namespace: {}", targetNamespace);
92+
var w = crClient.inNamespace(targetNamespace).watch(this);
93+
watches.add(w);
94+
log.debug("Registered controller {} -> {} for namespace: {}", resClass, w, targetNamespace);
95+
}
96+
}
97+
}
98+
99+
@Override
100+
public void close() {
101+
for (Watch watch : this.watches) {
102+
try {
103+
log.debug("Closing watch {} -> {}", resClass, watch);
104+
watch.close();
105+
} catch (Exception e) {
106+
log.warn("Error closing watcher {} -> {}", resClass, watch, e);
82107
}
83108
}
84109
}
@@ -155,7 +180,8 @@ public void onClose(WatcherException e) {
155180
if (e.isHttpGone()) {
156181
log.warn("Received error for watch, will try to reconnect.", e);
157182
try {
158-
registerWatch();
183+
close();
184+
start();
159185
} catch (Throwable ex) {
160186
log.error("Unexpected error happened with watch reconnect. Will exit.", e);
161187
System.exit(1);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ class CustomResourceEventSourceTest {
2525

2626
private CustomResourceEventSource customResourceEventSource =
2727
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
28-
customResourceCache, mixedOperation, true, FINALIZER);
28+
customResourceCache, mixedOperation, true, FINALIZER, TestCustomResource.class);
2929

3030
@BeforeEach
3131
public void setup() {
@@ -73,7 +73,7 @@ public void normalExecutionIfGenerationChanges() {
7373
public void handlesAllEventIfNotGenerationAware() {
7474
customResourceEventSource =
7575
CustomResourceEventSource.customResourceEventSourceForAllNamespaces(
76-
customResourceCache, mixedOperation, false, FINALIZER);
76+
customResourceCache, mixedOperation, false, FINALIZER, TestCustomResource.class);
7777
setup();
7878

7979
TestCustomResource customResource1 = TestUtils.testCustomResource();

0 commit comments

Comments
 (0)
0