Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content/en/docs/documentation/eventing.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ rare corner cases. Returning an empty set means that the mapper considered the s
resource event as irrelevant and the SDK will thus not trigger a reconciliation of the primary
resource in that situation.

On an update event, the SDK calls `toPrimaryResourceIDs` for **both the old and the new version**
of the secondary resource. This way it can reconcile not only the primaries that the secondary
currently maps to, but also those it previously mapped to and no longer does. So when a reference
changes — including when only a subset of the referenced primaries changes — both the newly
referenced and the dropped primaries are reconciled, and a dropped primary can revert to its
default state. Because the mapper can be invoked for an older version of a resource, keep your
implementation a pure function of the resource passed to it.

Adding a `SecondaryToPrimaryMapper` is typically sufficient when there is a one-to-many relationship
between primary and secondary resources. The secondary resources can be mapped to its primary
owner, and this is enough information to also get these secondary resources from the `Context`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
*/
@FunctionalInterface
public interface SecondaryToPrimaryMapper<R> {

/**
* @param resource - secondary
* @return set of primary resource IDs
* Maps a secondary resource to the set of primary resources that should be reconciled in
* response.
*
* @param resource the secondary resource for which an event was received
* @return set of primary resource IDs to enqueue for reconciliation; an empty set means the event
* is irrelevant and no reconciliation is triggered. On update events, this method is invoked
* for both the old and the new versions of the resource.
*/
Set<ResourceID> toPrimaryResourceIDs(R resource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,42 @@ public DefaultPrimaryToSecondaryIndex(SecondaryToPrimaryMapper<R> secondaryToPri
}

@Override
public synchronized void onAddOrUpdate(R resource) {
public synchronized Set<ResourceID> onAddOrUpdate(R resource, R oldResource) {

Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);

var secondaryId = ResourceID.fromResource(resource);

primaryResources.forEach(
primaryResource -> {
var resourceSet =
index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet());
resourceSet.add(ResourceID.fromResource(resource));
resourceSet.add(secondaryId);
});

if (oldResource != null) {
var obsoletePrimaries =
new HashSet<>(secondaryToPrimaryMapper.toPrimaryResourceIDs(oldResource));
if (!primaryResources.containsAll(obsoletePrimaries)) {
var result = new HashSet<>(primaryResources);
obsoletePrimaries.removeAll(primaryResources);
obsoletePrimaries.forEach(
p ->
index.computeIfPresent(
p,
(id, currentSet) -> {
currentSet.remove(secondaryId);
return currentSet.isEmpty() ? null : currentSet;
}));
result.addAll(obsoletePrimaries);
return result;
}
}
return primaryResources;
}

@Override
public synchronized void onDelete(R resource) {
public synchronized Set<ResourceID> onDelete(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
Expand All @@ -58,6 +82,7 @@ public synchronized void onDelete(R resource) {
}
}
});
return primaryResources;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -127,10 +128,10 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown)
if (resultEvent.isEmpty()) {
return;
}
primaryToSecondaryIndex.onDelete(resource);
var primaryIds = primaryToSecondaryIndex.onDelete(resource);
if (eventAcceptedByFilter(
ResourceAction.DELETED, resource, null, deletedFinalStateUnknown)) {
propagateEvent(resource);
propagateEvent(resource, null, primaryIds);
}
});
}
Expand All @@ -144,11 +145,12 @@ protected void handleEvent(
// onAdd/onUpdate/onDelete watch paths. The index is updated for DELETED regardless of the
// filter outcome — the resource is really gone, so leaving a tombstone in the index would
// make getSecondaryResources keep returning a stale entry.
Set<ResourceID> primaryIds = null;
if (action == ResourceAction.DELETED) {
log.debug(
"handleEvent: removing from primaryToSecondaryIndex. id={}",
ResourceID.fromResource(resource));
primaryToSecondaryIndex.onDelete(resource);
primaryIds = primaryToSecondaryIndex.onDelete(resource);
}
Comment thread
csviri marked this conversation as resolved.
if (!eventAcceptedByFilter(action, resource, oldResource, deletedFinalStateUnknown)) {
if (log.isDebugEnabled()) {
Expand All @@ -166,7 +168,7 @@ protected void handleEvent(
action,
resource.getMetadata().getResourceVersion());
}
propagateEvent(resource);
propagateEvent(resource, oldResource, primaryIds);
}

@Override
Expand All @@ -177,12 +179,12 @@ public synchronized void start() {
super.start();
// this makes sure that on first reconciliation all resources are
// present on the index
manager().list().forEach(primaryToSecondaryIndex::onAddOrUpdate);
manager().list().forEach(r -> primaryToSecondaryIndex.onAddOrUpdate(r, null));
}

@SuppressWarnings("unchecked")
private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R oldObject) {
primaryToSecondaryIndex.onAddOrUpdate(newObject);
var primaryIds = primaryToSecondaryIndex.onAddOrUpdate(newObject, oldObject);
var resourceID = ResourceID.fromResource(newObject);

var resultEvent = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
Comment thread
csviri marked this conversation as resolved.
Expand All @@ -194,15 +196,22 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
"Propagating event for {}, resource with same version not result of a our update.",
action);
var event = resultEvent.get();
propagateEvent((R) event.getResource().orElseThrow());
propagateEvent((R) event.getResource().orElseThrow(), oldObject, primaryIds);
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
}
}

protected void propagateEvent(R object) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
protected void propagateEvent(R resource, R oldResource, Set<ResourceID> primaryResourceIdSet) {
if (primaryResourceIdSet == null) {
primaryResourceIdSet = new HashSet<>();
primaryResourceIdSet.addAll(
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(resource));
if (oldResource != null) {
Comment on lines +206 to +210

@csviri csviri Jun 18, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is relevant will take a look

primaryResourceIdSet.addAll(
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(oldResource));
}
}
if (primaryResourceIdSet.isEmpty()) {
return;
}
Expand Down Expand Up @@ -249,16 +258,16 @@ public Set<R> getSecondaryResources(P primary) {
@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
handleRecentCreateOrUpdate(resource);
handleRecentCreateOrUpdate(resource, previousVersionOfResource);
}

@Override
public void handleRecentResourceCreate(ResourceID resourceID, R resource) {
handleRecentCreateOrUpdate(resource);
handleRecentCreateOrUpdate(resource, null);
}

private void handleRecentCreateOrUpdate(R newResource) {
primaryToSecondaryIndex.onAddOrUpdate(newResource);
private void handleRecentCreateOrUpdate(R newResource, R previousVersion) {
primaryToSecondaryIndex.onAddOrUpdate(newResource, previousVersion);
temporaryResourceCache.putResource(newResource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ public static <T extends HasMetadata> NOOPPrimaryToSecondaryIndex<T> getInstance
private NOOPPrimaryToSecondaryIndex() {}

@Override
public void onAddOrUpdate(R resource) {
// empty method because of noop implementation
public Set<ResourceID> onAddOrUpdate(R resource, R oldResource) {
return null;

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very ugly, might be better to remove the NOOP index

}

@Override
public void onDelete(R resource) {
public Set<ResourceID> onDelete(R resource) {
// empty method because of noop implementation
return null;
}
Comment on lines 35 to 44

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THis is relevant, but will need a refactoring to remove, will get rid of it in a subsequent PR


@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

public interface PrimaryToSecondaryIndex<R extends HasMetadata> {

void onAddOrUpdate(R resource);
Set<ResourceID> onAddOrUpdate(R resource, R oldResource);

void onDelete(R resource);
Set<ResourceID> onDelete(R resource);

Set<ResourceID> getSecondaryResources(ResourceID primary);
}
Original file line number Diff line number Diff line change
Expand Up @@ -655,11 +655,13 @@ void handleEventUpdatesIndexWhenDeletePropagatesFromTempCache() throws Exception
// and getSecondaryResources keeps returning a tombstone.
var indexMock = injectIndexMock();
var resource = testDeployment();
// onDelete now returns the primaries to reconcile; propagateEvent uses that set directly
when(indexMock.onDelete(resource)).thenReturn(Set.of(ResourceID.fromResource(resource)));

informerEventSource.handleEvent(ResourceAction.DELETED, resource, null, false);

verify(indexMock, times(1)).onDelete(resource);
verify(indexMock, never()).onAddOrUpdate(any());
verify(indexMock, never()).onAddOrUpdate(any(), any());
verify(eventHandlerMock, times(1)).handleEvent(any());
}

Expand All @@ -673,7 +675,7 @@ void handleEventDoesNotTouchIndexForNonDeleteAction() throws Exception {
ResourceAction.UPDATED, testDeployment(), testDeployment(), null);

verify(indexMock, never()).onDelete(any());
verify(indexMock, never()).onAddOrUpdate(any());
verify(indexMock, never()).onAddOrUpdate(any(), any());
verify(eventHandlerMock, times(1)).handleEvent(any());
}

Expand Down Expand Up @@ -745,14 +747,17 @@ private void assertNoEventProduced() {
await()
.pollDelay(Duration.ofMillis(70))
.timeout(Duration.ofMillis(150))
.untilAsserted(() -> verify(informerEventSource, never()).propagateEvent(any()));
.untilAsserted(
() -> verify(informerEventSource, never()).propagateEvent(any(), any(), any()));
}

private void expectPropagateEvent(Deployment newResourceVersion) {
await()
.atMost(Duration.ofSeconds(1))
.untilAsserted(
() -> verify(informerEventSource, times(1)).propagateEvent(newResourceVersion));
() ->
verify(informerEventSource, times(1))
.propagateEvent(eq(newResourceVersion), any(), any()));
}

private void expectHandleUpdateEvent(int newResourceVersion, int oldResourceVersion) {
Expand Down
Loading
Loading