From 66cad0a10a5218b69b9d3717a2723688bad0e31c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= <a_meszaros@apple.com>
Date: Fri, 11 Apr 2025 14:03:57 +0200
Subject: [PATCH 1/2] feat: support delete event reconiliation
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

The idea is that reconcile method will be called on resource delete event.
What would allow to clear inMemory caches without using a finalizer (thus Cleaner interface)

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
 .../api/config/ControllerConfiguration.java   |  4 ++
 .../operator/api/reconciler/Context.java      |  2 +
 .../reconciler/ControllerConfiguration.java   |  6 ++
 .../api/reconciler/DefaultContext.java        | 10 +++-
 .../processing/event/EventProcessor.java      | 56 ++++++++++++++-----
 .../processing/event/ExecutionScope.java      | 12 +++-
 .../event/ReconciliationDispatcher.java       |  6 +-
 .../processing/event/ResourceState.java       | 23 +++++++-
 .../event/ResourceStateManager.java           | 12 ++--
 9 files changed, 105 insertions(+), 26 deletions(-)

diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java
index 2c18fa55d3..c4854a1422 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java
@@ -92,4 +92,8 @@ default String fieldManager() {
   }
 
   <C> C getConfigurationFor(DependentResourceSpec<?, P, C> spec);
+
+  default boolean reconcileOnPrimaryDelete() {
+    return false;
+  }
 }
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
index e5fbaad68e..2996c813ee 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
@@ -55,6 +55,8 @@ default <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
   @SuppressWarnings("unused")
   IndexedResourceCache<P> getPrimaryCache();
 
+  boolean isPrimaryDeleted();
+
   /**
    * Determines whether a new reconciliation will be triggered right after the current
    * reconciliation is finished. This allows to optimize certain situations, helping avoid unneeded
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java
index d407ed0fc6..30ac9ffc4b 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java
@@ -77,4 +77,10 @@ MaxReconciliationInterval maxReconciliationInterval() default
    * @return the name used as field manager for SSA operations
    */
   String fieldManager() default CONTROLLER_NAME_AS_FIELD_MANAGER;
+
+  /**
+   * Will trigger reconciliation on delete event of the primary resource. Can be set to true only if
+   * the reconciler does not implement {@link Cleaner} interface.
+   */
+  boolean reconcileOnPrimaryDelete() default false;
 }
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
index b5ea66f8bc..6363e6892e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
@@ -24,12 +24,15 @@ public class DefaultContext<P extends HasMetadata> implements Context<P> {
   private final ControllerConfiguration<P> controllerConfiguration;
   private final DefaultManagedWorkflowAndDependentResourceContext<P>
       defaultManagedDependentResourceContext;
+  private final boolean isPrimaryDeleted;
 
-  public DefaultContext(RetryInfo retryInfo, Controller<P> controller, P primaryResource) {
+  public DefaultContext(
+      RetryInfo retryInfo, Controller<P> controller, P primaryResource, boolean isPrimaryDeleted) {
     this.retryInfo = retryInfo;
     this.controller = controller;
     this.primaryResource = primaryResource;
     this.controllerConfiguration = controller.getConfiguration();
+    this.isPrimaryDeleted = isPrimaryDeleted;
     this.defaultManagedDependentResourceContext =
         new DefaultManagedWorkflowAndDependentResourceContext<>(controller, primaryResource, this);
   }
@@ -49,6 +52,11 @@ public IndexedResourceCache<P> getPrimaryCache() {
     return controller.getEventSourceManager().getControllerEventSource();
   }
 
+  @Override
+  public boolean isPrimaryDeleted() {
+    return isPrimaryDeleted;
+  }
+
   @Override
   public boolean isNextReconciliationImminent() {
     return controller
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
index f9af175053..7883b4607f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
@@ -43,7 +43,7 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
   private final Cache<P> cache;
   private final EventSourceManager<P> eventSourceManager;
   private final RateLimiter<? extends RateLimitState> rateLimiter;
-  private final ResourceStateManager resourceStateManager = new ResourceStateManager();
+  private final ResourceStateManager<P> resourceStateManager = new ResourceStateManager<>();
   private final Map<String, Object> metricsMetadata;
   private ExecutorService executor;
 
@@ -121,19 +121,24 @@ public synchronized void handleEvent(Event event) {
     }
   }
 
-  private void handleMarkedEventForResource(ResourceState state) {
-    if (state.deleteEventPresent()) {
+  private void handleMarkedEventForResource(ResourceState<P> state) {
+    if (skipDeleteEventProcessing(state)) {
       cleanupForDeletedEvent(state.getId());
-    } else if (!state.processedMarkForDeletionPresent()) {
+    } else if (!state.processedMarkForDeletionPresent()
+        && !state.deleteEventReconciliationSubmitted()) {
       submitReconciliationExecution(state);
     }
   }
 
-  private void submitReconciliationExecution(ResourceState state) {
+  private boolean skipDeleteEventProcessing(ResourceState<P> state) {
+    return state.deleteEventPresent() && !controllerConfiguration.reconcileOnPrimaryDelete();
+  }
+
+  private void submitReconciliationExecution(ResourceState<P> state) {
     try {
       boolean controllerUnderExecution = isControllerUnderExecution(state);
       final var resourceID = state.getId();
-      Optional<P> maybeLatest = cache.get(resourceID);
+      Optional<P> maybeLatest = getCachedResource(resourceID, state);
       maybeLatest.ifPresent(MDCUtils::addResourceInfo);
       if (!controllerUnderExecution && maybeLatest.isPresent()) {
         var rateLimit = state.getRateLimit();
@@ -148,8 +153,15 @@ private void submitReconciliationExecution(ResourceState state) {
         }
         state.setUnderProcessing(true);
         final var latest = maybeLatest.get();
-        ExecutionScope<P> executionScope = new ExecutionScope<>(state.getRetry());
-        state.unMarkEventReceived();
+        ExecutionScope<P> executionScope =
+            new ExecutionScope<>(state.getRetry(), state.deleteEventPresent());
+
+        if (state.deleteEventPresent()) {
+          state.markDeleteEventReconciliationSubmitted();
+        } else {
+          state.unMarkEventReceived();
+        }
+
         metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
         log.debug("Executing events for custom resource. Scope: {}", executionScope);
         executor.execute(new ReconcilerExecutor(resourceID, executionScope));
@@ -164,7 +176,7 @@ private void submitReconciliationExecution(ResourceState state) {
           // there can be multiple reasons why the primary resource is not present, one is that the
           // informer is currently disconnected from k8s api server, but will eventually receive the
           // resource. Other is that simply there is no primary resource present for an event, this
-          // might indicate issue with the implementation, but could happen also naturally, thus
+          // might indicate an issue with the implementation, but could happen also naturally, thus
           // this is not necessarily a problem.
           log.debug("no primary resource found in cache with resource id: {}", resourceID);
         }
@@ -174,12 +186,25 @@ private void submitReconciliationExecution(ResourceState state) {
     }
   }
 
-  private void handleEventMarking(Event event, ResourceState state) {
+  private Optional<P> getCachedResource(ResourceID resourceID, ResourceState<P> state) {
+    var resource = cache.get(resourceID);
+    if (resource.isPresent()) {
+      return resource;
+    }
+    if (controllerConfiguration.reconcileOnPrimaryDelete() && state.deleteEventPresent()) {
+      return Optional.of(state.getDeletedResource());
+    }
+    return Optional.empty();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void handleEventMarking(Event event, ResourceState<P> state) {
     final var relatedCustomResourceID = event.getRelatedCustomResourceID();
     if (event instanceof ResourceEvent resourceEvent) {
       if (resourceEvent.getAction() == ResourceAction.DELETED) {
         log.debug("Marking delete event received for: {}", relatedCustomResourceID);
-        state.markDeleteEventReceived();
+        // todo check can there be delete event without resource?
+        state.markDeleteEventReceived((P) resourceEvent.getResource().orElseThrow());
       } else {
         if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
           log.debug(
@@ -196,6 +221,7 @@ private void handleEventMarking(Event event, ResourceState state) {
         // event as below.
         markEventReceived(state);
       }
+      // todo this if is weird
     } else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) {
       markEventReceived(state);
     } else if (log.isDebugEnabled()) {
@@ -207,7 +233,7 @@ private void handleEventMarking(Event event, ResourceState state) {
     }
   }
 
-  private void markEventReceived(ResourceState state) {
+  private void markEventReceived(ResourceState<P> state) {
     log.debug("Marking event received for: {}", state.getId());
     state.markEventReceived();
   }
@@ -251,7 +277,7 @@ synchronized void eventProcessingFinished(
     }
     cleanupOnSuccessfulExecution(executionScope);
     metrics.finishedReconciliation(executionScope.getResource(), metricsMetadata);
-    if (state.deleteEventPresent()) {
+    if (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted()) {
       cleanupForDeletedEvent(executionScope.getResourceID());
     } else if (postExecutionControl.isFinalizerRemoved()) {
       state.markProcessedMarkForDeletion();
@@ -383,7 +409,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<P> executionScope) {
     retryEventSource().cancelOnceSchedule(executionScope.getResourceID());
   }
 
-  private ResourceState getOrInitRetryExecution(ExecutionScope<P> executionScope) {
+  private ResourceState<P> getOrInitRetryExecution(ExecutionScope<P> executionScope) {
     final var state = resourceStateManager.getOrCreate(executionScope.getResourceID());
     RetryExecution retryExecution = state.getRetry();
     if (retryExecution == null) {
@@ -399,7 +425,7 @@ private void cleanupForDeletedEvent(ResourceID resourceID) {
     metrics.cleanupDoneFor(resourceID, metricsMetadata);
   }
 
-  private boolean isControllerUnderExecution(ResourceState state) {
+  private boolean isControllerUnderExecution(ResourceState<P> state) {
     return state.isUnderProcessing();
   }
 
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java
index 90899a6e1a..faac3a8deb 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExecutionScope.java
@@ -8,9 +8,11 @@ class ExecutionScope<R extends HasMetadata> {
   // the latest custom resource from cache
   private R resource;
   private final RetryInfo retryInfo;
+  private boolean isPrimaryDeleted = false;
 
-  ExecutionScope(RetryInfo retryInfo) {
+  ExecutionScope(RetryInfo retryInfo, boolean isPrimaryDeleted) {
     this.retryInfo = retryInfo;
+    this.isPrimaryDeleted = isPrimaryDeleted;
   }
 
   public ExecutionScope<R> setResource(R resource) {
@@ -42,4 +44,12 @@ public String toString() {
   public RetryInfo getRetryInfo() {
     return retryInfo;
   }
+
+  public void setPrimaryDeleted(boolean primaryDeleted) {
+    isPrimaryDeleted = primaryDeleted;
+  }
+
+  public boolean isPrimaryDeleted() {
+    return isPrimaryDeleted;
+  }
 }
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java
index ee861982b1..08b2e608aa 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java
@@ -90,7 +90,11 @@ private PostExecutionControl<P> handleDispatch(ExecutionScope<P> executionScope)
     }
 
     Context<P> context =
-        new DefaultContext<>(executionScope.getRetryInfo(), controller, resourceForExecution);
+        new DefaultContext<>(
+            executionScope.getRetryInfo(),
+            controller,
+            resourceForExecution,
+            executionScope.isPrimaryDeleted());
     if (markedForDeletion) {
       return handleCleanup(resourceForExecution, originalResource, context);
     } else {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java
index 5d4e74d681..b963f94161 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java
@@ -1,9 +1,10 @@
 package io.javaoperatorsdk.operator.processing.event;
 
+import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
 import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
 
-class ResourceState {
+class ResourceState<P extends HasMetadata> {
 
   /**
    * Manages the state of received events. Basically there can be only three distinct states
@@ -21,6 +22,7 @@ private enum EventingState {
     PROCESSED_MARK_FOR_DELETION,
     /** Delete event present, from this point other events are not relevant */
     DELETE_EVENT_PRESENT,
+    DELETE_EVENT_RECONCILIATION_SUBMITTED
   }
 
   private final ResourceID id;
@@ -30,6 +32,8 @@ private enum EventingState {
   private EventingState eventing;
   private RateLimitState rateLimit;
 
+  private P deletedResource;
+
   public ResourceState(ResourceID id) {
     this.id = id;
     eventing = EventingState.NO_EVENT_PRESENT;
@@ -63,8 +67,13 @@ public void setUnderProcessing(boolean underProcessing) {
     this.underProcessing = underProcessing;
   }
 
-  public void markDeleteEventReceived() {
+  public void markDeleteEventReceived(P deletedResource) {
     eventing = EventingState.DELETE_EVENT_PRESENT;
+    this.deletedResource = deletedResource;
+  }
+
+  public void markDeleteEventReconciliationSubmitted() {
+    this.eventing = EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED;
   }
 
   public boolean deleteEventPresent() {
@@ -75,8 +84,16 @@ public boolean processedMarkForDeletionPresent() {
     return eventing == EventingState.PROCESSED_MARK_FOR_DELETION;
   }
 
+  public boolean deleteEventReconciliationSubmitted() {
+    return eventing == EventingState.DELETE_EVENT_RECONCILIATION_SUBMITTED;
+  }
+
+  public P getDeletedResource() {
+    return deletedResource;
+  }
+
   public void markEventReceived() {
-    if (deleteEventPresent()) {
+    if (deleteEventPresent() || deleteEventReconciliationSubmitted()) {
       throw new IllegalStateException("Cannot receive event after a delete event received");
     }
     eventing = EventingState.EVENT_PRESENT;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java
index 6932e1ca5e..87051f028e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java
@@ -5,17 +5,19 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-class ResourceStateManager {
+import io.fabric8.kubernetes.api.model.HasMetadata;
+
+class ResourceStateManager<P extends HasMetadata> {
   // maybe we should have a way for users to specify a hint on the amount of CRs their reconciler
   // will process to avoid under- or over-sizing the state maps and avoid too many resizing that
   // take time and memory?
-  private final Map<ResourceID, ResourceState> states = new ConcurrentHashMap<>(100);
+  private final Map<ResourceID, ResourceState<P>> states = new ConcurrentHashMap<>(100);
 
-  public ResourceState getOrCreate(ResourceID resourceID) {
+  public ResourceState<P> getOrCreate(ResourceID resourceID) {
     return states.computeIfAbsent(resourceID, ResourceState::new);
   }
 
-  public ResourceState remove(ResourceID resourceID) {
+  public ResourceState<P> remove(ResourceID resourceID) {
     return states.remove(resourceID);
   }
 
@@ -23,7 +25,7 @@ public boolean contains(ResourceID resourceID) {
     return states.containsKey(resourceID);
   }
 
-  public List<ResourceState> resourcesWithEventPresent() {
+  public List<ResourceState<P>> resourcesWithEventPresent() {
     return states.values().stream()
         .filter(state -> !state.noEventPresent())
         .collect(Collectors.toList());

From 44b12457d68010f2c5c86d2a5b19deeabcd1aa6a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= <a_meszaros@apple.com>
Date: Fri, 11 Apr 2025 14:59:56 +0200
Subject: [PATCH 2/2] wip
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Attila Mészáros <a_meszaros@apple.com>
---
 .../processing/event/EventProcessor.java      | 47 +++++++++++++------
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
index 7883b4607f..360be6f20f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java
@@ -47,6 +47,7 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
   private final Map<String, Object> metricsMetadata;
   private ExecutorService executor;
 
+  // todo handle/test case when there is finalizer but not ours
   public EventProcessor(
       EventSourceManager<P> eventSourceManager, ConfigurationService configurationService) {
     this(
@@ -122,7 +123,7 @@ public synchronized void handleEvent(Event event) {
   }
 
   private void handleMarkedEventForResource(ResourceState<P> state) {
-    if (skipDeleteEventProcessing(state)) {
+    if (doCleanupForDeleteEvent(state)) {
       cleanupForDeletedEvent(state.getId());
     } else if (!state.processedMarkForDeletionPresent()
         && !state.deleteEventReconciliationSubmitted()) {
@@ -130,7 +131,7 @@ private void handleMarkedEventForResource(ResourceState<P> state) {
     }
   }
 
-  private boolean skipDeleteEventProcessing(ResourceState<P> state) {
+  private boolean doCleanupForDeleteEvent(ResourceState<P> state) {
     return state.deleteEventPresent() && !controllerConfiguration.reconcileOnPrimaryDelete();
   }
 
@@ -146,6 +147,7 @@ private void submitReconciliationExecution(ResourceState<P> state) {
           rateLimit = rateLimiter.initState();
           state.setRateLimit(rateLimit);
         }
+        // todo rate limit handling
         var rateLimiterPermission = rateLimiter.isLimited(rateLimit);
         if (rateLimiterPermission.isPresent()) {
           handleRateLimitedSubmission(resourceID, rateLimiterPermission.get());
@@ -158,13 +160,16 @@ private void submitReconciliationExecution(ResourceState<P> state) {
 
         if (state.deleteEventPresent()) {
           state.markDeleteEventReconciliationSubmitted();
-        } else {
+        } else if (!state.deleteEventReconciliationSubmitted()) { // if there is a retry
           state.unMarkEventReceived();
         }
-
         metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
         log.debug("Executing events for custom resource. Scope: {}", executionScope);
-        executor.execute(new ReconcilerExecutor(resourceID, executionScope));
+        executor.execute(
+            new ReconcilerExecutor(
+                resourceID,
+                executionScope,
+                state.deleteEventReconciliationSubmitted() ? latest : null));
       } else {
         log.debug(
             "Skipping executing controller for resource id: {}. Controller in execution: {}. Latest"
@@ -191,7 +196,8 @@ private Optional<P> getCachedResource(ResourceID resourceID, ResourceState<P> st
     if (resource.isPresent()) {
       return resource;
     }
-    if (controllerConfiguration.reconcileOnPrimaryDelete() && state.deleteEventPresent()) {
+    if (controllerConfiguration.reconcileOnPrimaryDelete()
+        && (state.deleteEventPresent() || state.deleteEventReconciliationSubmitted())) {
       return Optional.of(state.getDeletedResource());
     }
     return Optional.empty();
@@ -206,10 +212,14 @@ private void handleEventMarking(Event event, ResourceState<P> state) {
         // todo check can there be delete event without resource?
         state.markDeleteEventReceived((P) resourceEvent.getResource().orElseThrow());
       } else {
-        if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) {
+        if (state.deleteEventReconciliationSubmitted()
+            || (state.processedMarkForDeletionPresent()
+                && isResourceMarkedForDeletion(resourceEvent))) {
           log.debug(
-              "Skipping mark of event received, since already processed mark for deletion and"
-                  + " resource marked for deletion: {}",
+              "Skipping mark of event received, delete event reconciliation submitted ({}), or"
+                  + " marked for deletion but already processed mark. resource marked for deletion:"
+                  + " {}",
+              state.deleteEventReconciliationSubmitted(),
               relatedCustomResourceID);
           return;
         }
@@ -221,15 +231,17 @@ private void handleEventMarking(Event event, ResourceState<P> state) {
         // event as below.
         markEventReceived(state);
       }
-      // todo this if is weird
-    } else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) {
+    } else if (!state.deleteEventPresent()
+        && !state.processedMarkForDeletionPresent()
+        && !state.deleteEventReconciliationSubmitted()) {
       markEventReceived(state);
     } else if (log.isDebugEnabled()) {
       log.debug(
           "Skipped marking event as received. Delete event present: {}, processed mark for"
-              + " deletion: {}",
+              + " deletion: {}, delete event reconciliation submitted: {}",
           state.deleteEventPresent(),
-          state.processedMarkForDeletionPresent());
+          state.processedMarkForDeletionPresent(),
+          state.deleteEventReconciliationSubmitted());
     }
   }
 
@@ -469,10 +481,13 @@ private void handleAlreadyMarkedEvents() {
   private class ReconcilerExecutor implements Runnable {
     private final ExecutionScope<P> executionScope;
     private final ResourceID resourceID;
+    private final P deleteEventResource;
 
-    private ReconcilerExecutor(ResourceID resourceID, ExecutionScope<P> executionScope) {
+    private ReconcilerExecutor(
+        ResourceID resourceID, ExecutionScope<P> executionScope, P deleteEventResource) {
       this.executionScope = executionScope;
       this.resourceID = resourceID;
+      this.deleteEventResource = deleteEventResource;
     }
 
     @Override
@@ -488,7 +503,9 @@ public void run() {
       final var thread = Thread.currentThread();
       final var name = thread.getName();
       try {
-        var actualResource = cache.get(resourceID);
+
+        var actualResource =
+            deleteEventResource != null ? Optional.of(deleteEventResource) : cache.get(resourceID);
         if (actualResource.isEmpty()) {
           log.debug("Skipping execution; primary resource missing from cache: {}", resourceID);
           return;