Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,27 @@
Class<? extends OnAddFilter> onAddFilter() default OnAddFilter.class;

/**
* Optional {@link OnUpdateFilter} to filter update events sent to the associated informer
* Optional {@link OnUpdateFilter} to filter update events sent to the associated informer.
* Combined with JOSDK's internal filters using AND logic — the event is only accepted when both
* this filter and JOSDK's internal filters accept it.
*
* @return the {@link OnUpdateFilter} filter implementation to use, defaulting to the interface
* itself if no value is set
*/
Class<? extends OnUpdateFilter> onUpdateFilter() default OnUpdateFilter.class;

/**
* Optional {@link OnUpdateFilter} combined with JOSDK's internal filters using OR logic — the
* event is accepted when either this filter or JOSDK's internal filters accept it. Use this to
* expand the set of events that trigger reconciliation beyond what JOSDK's internal filters (e.g.
* generation-aware filtering) would normally allow, for instance to also reconcile on specific
* status field updates.
Comment on lines +93 to +97
*
* @return the {@link OnUpdateFilter} filter implementation to use, defaulting to the interface
* itself if no value is set
*/
Class<? extends OnUpdateFilter> onUpdateFilterOr() default OnUpdateFilter.class;

/**
* Optional {@link OnDeleteFilter} to filter delete events sent to the associated informer
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class InformerConfiguration<R extends HasMetadata> {
private String labelSelector;
private OnAddFilter<? super R> onAddFilter;
private OnUpdateFilter<? super R> onUpdateFilter;
private OnUpdateFilter<? super R> onUpdateFilterOr;
private OnDeleteFilter<? super R> onDeleteFilter;
private GenericFilter<? super R> genericFilter;
private ItemStore<R> itemStore;
Expand All @@ -64,6 +65,7 @@ protected InformerConfiguration(
String labelSelector,
OnAddFilter<? super R> onAddFilter,
OnUpdateFilter<? super R> onUpdateFilter,
OnUpdateFilter<? super R> onUpdateFilterOr,
OnDeleteFilter<? super R> onDeleteFilter,
GenericFilter<? super R> genericFilter,
ItemStore<R> itemStore,
Expand All @@ -79,6 +81,7 @@ protected InformerConfiguration(
this.labelSelector = labelSelector;
this.onAddFilter = onAddFilter;
this.onUpdateFilter = onUpdateFilter;
this.onUpdateFilterOr = onUpdateFilterOr;
this.onDeleteFilter = onDeleteFilter;
this.genericFilter = genericFilter;
this.itemStore = itemStore;
Expand Down Expand Up @@ -115,6 +118,7 @@ public static <R extends HasMetadata> InformerConfiguration<R>.Builder builder(
original.labelSelector,
original.onAddFilter,
original.onUpdateFilter,
original.onUpdateFilterOr,
original.onDeleteFilter,
original.genericFilter,
original.itemStore,
Expand Down Expand Up @@ -259,6 +263,10 @@ public OnUpdateFilter<? super R> getOnUpdateFilter() {
return onUpdateFilter;
}

public OnUpdateFilter<? super R> getOnUpdateFilterOr() {
return onUpdateFilterOr;
}

public OnDeleteFilter<? super R> getOnDeleteFilter() {
return onDeleteFilter;
}
Expand Down Expand Up @@ -359,6 +367,9 @@ public InformerConfiguration<R>.Builder initFromAnnotation(
withOnUpdateFilter(
Utils.instantiate(informerConfig.onUpdateFilter(), OnUpdateFilter.class, context));

withOnUpdateFilterOr(
Utils.instantiate(informerConfig.onUpdateFilterOr(), OnUpdateFilter.class, context));

withOnDeleteFilter(
Utils.instantiate(informerConfig.onDeleteFilter(), OnDeleteFilter.class, context));

Expand Down Expand Up @@ -456,6 +467,11 @@ public Builder withOnUpdateFilter(OnUpdateFilter<? super R> onUpdateFilter) {
return this;
}

public Builder withOnUpdateFilterOr(OnUpdateFilter<? super R> onUpdateFilterOr) {
InformerConfiguration.this.onUpdateFilterOr = onUpdateFilterOr;
return this;
}

public Builder withOnDeleteFilter(OnDeleteFilter<? super R> onDeleteFilter) {
InformerConfiguration.this.onDeleteFilter = onDeleteFilter;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ public Builder<R> withOnUpdateFilter(OnUpdateFilter<? super R> onUpdateFilter) {
return this;
}

public Builder<R> withOnUpdateFilterOr(OnUpdateFilter<? super R> onUpdateFilterOr) {
config.withOnUpdateFilterOr(onUpdateFilterOr);
return this;
}

public Builder<R> withOnDeleteFilter(OnDeleteFilter<? super R> onDeleteFilter) {
config.withOnDeleteFilter(onDeleteFilter);
return this;
Expand Down Expand Up @@ -311,6 +316,7 @@ public void updateFrom(InformerConfiguration<R> informerConfig) {
.withItemStore(informerConfig.getItemStore())
.withOnAddFilter(informerConfig.getOnAddFilter())
.withOnUpdateFilter(informerConfig.getOnUpdateFilter())
.withOnUpdateFilterOr(informerConfig.getOnUpdateFilterOr())
.withOnDeleteFilter(informerConfig.getOnDeleteFilter())
.withGenericFilter(informerConfig.getGenericFilter())
.withInformerListLimit(informerConfig.getInformerListLimit())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ public ControllerEventSource(Controller<T> controller) {
// by default the on add should be processed in all cases regarding internal filters
final var informerConfig = config.getInformerConfig();
Optional.ofNullable(informerConfig.getOnAddFilter()).ifPresent(this::setOnAddFilter);
Optional.ofNullable(informerConfig.getOnUpdateFilter())

var effectiveUpdateFilter =
Optional.ofNullable(informerConfig.getOnUpdateFilter())
.map(filter -> filter.and(internalOnUpdateFilter))
.orElse(internalOnUpdateFilter);
Optional.ofNullable(informerConfig.getOnUpdateFilterOr())
.ifPresentOrElse(
filter -> setOnUpdateFilter(filter.and(internalOnUpdateFilter)),
() -> setOnUpdateFilter(internalOnUpdateFilter));
orFilter -> setOnUpdateFilter(effectiveUpdateFilter.or(orFilter)),
() -> setOnUpdateFilter(effectiveUpdateFilter));

Optional.ofNullable(informerConfig.getGenericFilter()).ifPresent(this::setGenericFilter);
setControllerConfiguration(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,48 @@ void callsBroadcastsOnResourceEvents() {
eq(ResourceAction.UPDATED), eq(customResource1), eq(customResource1));
}

@Test
void orFilterTriggersEventWhenInternalFilterWouldReject() {
TestCustomResource cr = TestUtils.testCustomResource();
cr.getMetadata().setFinalizers(List.of(FINALIZER));
cr.getMetadata().setGeneration(1L);

// Internal generation-aware filter would reject same-generation update,
// but the OR filter accepts it unconditionally.
OnUpdateFilter<TestCustomResource> orFilter = (newRes, oldRes) -> true;
source = new ControllerEventSource<>(new TestController(null, null, orFilter, null));
setUpSource(source, true, controllerConfig);

source.handleEvent(ResourceAction.UPDATED, cr, cr, null);

verify(eventHandler, times(1)).handleEvent(any());
}

@Test
void orFilterDoesNotOverrideAndFilter() {
TestCustomResource cr = TestUtils.testCustomResource();
cr.getMetadata().setFinalizers(List.of(FINALIZER));
cr.getMetadata().setGeneration(1L);

// AND filter rejects, OR filter also rejects → event must be dropped.
OnUpdateFilter<TestCustomResource> andFilter = (newRes, oldRes) -> false;
OnUpdateFilter<TestCustomResource> orFilter = (newRes, oldRes) -> false;
source = new ControllerEventSource<>(new TestController(null, andFilter, orFilter, null));
setUpSource(source, true, controllerConfig);

source.handleEvent(ResourceAction.UPDATED, cr, cr, null);

verify(eventHandler, never()).handleEvent(any());
}

@Test
void filtersOutEventsOnAddAndUpdate() {
TestCustomResource cr = TestUtils.testCustomResource();

OnAddFilter<TestCustomResource> onAddFilter = (res) -> false;
OnUpdateFilter<TestCustomResource> onUpdatePredicate = (res, res2) -> false;
source = new ControllerEventSource<>(new TestController(onAddFilter, onUpdatePredicate, null));
source =
new ControllerEventSource<>(new TestController(onAddFilter, onUpdatePredicate, null, null));
setUpSource(source, true, controllerConfig);

source.handleEvent(ResourceAction.ADDED, cr, null, null);
Expand All @@ -159,7 +194,7 @@ void filtersOutEventsOnAddAndUpdate() {
void genericFilterFiltersOutAddUpdateAndDeleteEvents() {
TestCustomResource cr = TestUtils.testCustomResource();

source = new ControllerEventSource<>(new TestController(null, null, res -> false));
source = new ControllerEventSource<>(new TestController(null, null, null, res -> false));
setUpSource(source, true, controllerConfig);

source.handleEvent(ResourceAction.ADDED, cr, null, null);
Expand All @@ -174,7 +209,7 @@ void ownUpdateEchoIsFilteredOutByEventFilter() throws InterruptedException {
// End-to-end smoke for the event-filter wiring on the controller path: an event for our
// own write must not propagate. Detail-level filter scenarios are covered in
// EventingDetailTest / EventFilterSupportTest.
source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
source = spy(new ControllerEventSource<>(new TestController(null, null, null, null)));
setUpSource(source, true, controllerConfig);
doReturn(Optional.empty()).when(source).get(any());

Expand All @@ -189,7 +224,7 @@ void ownUpdateEchoIsFilteredOutByEventFilter() throws InterruptedException {
@Test
void foreignUpdateDuringFilteringPropagatesAsUpdate() {
// An external event during the filter window must surface (not be filtered as own).
source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
source = spy(new ControllerEventSource<>(new TestController(null, null, null, null)));
setUpSource(source, true, controllerConfig);

var latch = sendForEventFilteringUpdate(2);
Expand All @@ -203,7 +238,7 @@ void foreignUpdateDuringFilteringPropagatesAsUpdate() {
void deleteEventDuringFilteringPropagatesAsDelete() {
// A DELETE arriving during the filter window must surface — the resource has gone,
// so the filter must not silence it just because our own write is still tracking RVs.
source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
source = spy(new ControllerEventSource<>(new TestController(null, null, null, null)));
setUpSource(source, true, controllerConfig);

var latch = sendForEventFilteringUpdate(2);
Expand All @@ -223,7 +258,7 @@ void deleteEventDuringFilteringPropagatesAsDelete() {
void multipleForeignEventsDuringFilteringMergeIntoSingleEvent() {
// Several external events during one filter window collapse into a single
// synthesized event spanning prev → latest seen.
source = spy(new ControllerEventSource<>(new TestController(null, null, null)));
source = spy(new ControllerEventSource<>(new TestController(null, null, null, null)));
setUpSource(source, true, controllerConfig);

var latch = sendForEventFilteringUpdate(2);
Expand Down Expand Up @@ -266,17 +301,18 @@ private static class TestController extends Controller<TestCustomResource> {
public TestController(
OnAddFilter<TestCustomResource> onAddFilter,
OnUpdateFilter<TestCustomResource> onUpdateFilter,
OnUpdateFilter<TestCustomResource> onUpdateFilterOr,
GenericFilter<TestCustomResource> genericFilter) {
super(
reconciler,
new TestConfiguration(true, onAddFilter, onUpdateFilter, genericFilter),
new TestConfiguration(true, onAddFilter, onUpdateFilter, onUpdateFilterOr, genericFilter),
MockKubernetesClient.client(TestCustomResource.class));
}

public TestController(boolean generationAware) {
super(
reconciler,
new TestConfiguration(generationAware, null, null, null),
new TestConfiguration(generationAware, null, null, null, null),
MockKubernetesClient.client(TestCustomResource.class));
}

Expand All @@ -298,6 +334,7 @@ public TestConfiguration(
boolean generationAware,
OnAddFilter<TestCustomResource> onAddFilter,
OnUpdateFilter<TestCustomResource> onUpdateFilter,
OnUpdateFilter<TestCustomResource> onUpdateFilterOr,
GenericFilter<TestCustomResource> genericFilter) {
super(
"test",
Expand All @@ -313,6 +350,7 @@ public TestConfiguration(
InformerConfiguration.builder(TestCustomResource.class)
.withOnAddFilter(onAddFilter)
.withOnUpdateFilter(onUpdateFilter)
.withOnUpdateFilterOr(onUpdateFilterOr)
.withGenericFilter(genericFilter)
.withComparableResourceVersions(true)
.buildForController(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.baseapi.filter;

import java.time.Duration;
import java.util.Map;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class OrFilterIT {

public static final String RESOURCE_NAME = "or-filter-test";
public static final int POLL_DELAY = 150;

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder().withReconciler(new OrFilterTestReconciler()).build();

@Test
void orFilterTriggersReconciliationEvenWhenInternalFilterWouldReject() {
var res = operator.create(createResource());

await()
.pollDelay(Duration.ofMillis(POLL_DELAY))
.untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(1));

// Spec update bumps generation — internal generation-aware filter accepts -> reconcile
res = operator.get(FilterTestCustomResource.class, RESOURCE_NAME);
res.getSpec().setValue("updated");
operator.replace(res);

await()
.pollDelay(Duration.ofMillis(POLL_DELAY))
.untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(2));

// Annotation-only update does not bump generation — internal filter would reject,
// but the OR filter accepts -> reconcile still happens
res = operator.get(FilterTestCustomResource.class, RESOURCE_NAME);
res.getMetadata().setAnnotations(Map.of(OrFilterTestReconciler.TRIGGER_ANNOTATION, "true"));
operator.replace(res);

await()
.pollDelay(Duration.ofMillis(POLL_DELAY))
.untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(3));

// Removing the annotation: OR filter rejects, no generation change -> no reconcile
res = operator.get(FilterTestCustomResource.class, RESOURCE_NAME);
res.getMetadata().getAnnotations().remove(OrFilterTestReconciler.TRIGGER_ANNOTATION);
operator.replace(res);

await()
.pollDelay(Duration.ofMillis(POLL_DELAY))
.untilAsserted(() -> assertThat(reconciler().getNumberOfExecutions()).isEqualTo(3));
Comment on lines +71 to +73
}

private OrFilterTestReconciler reconciler() {
return operator.getReconcilerOfType(OrFilterTestReconciler.class);
}

FilterTestCustomResource createResource() {
var resource = new FilterTestCustomResource();
resource.setMetadata(new ObjectMetaBuilder().withName(RESOURCE_NAME).build());
resource.setSpec(new FilterTestResourceSpec());
resource.getSpec().setValue("initial");
return resource;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Java Operator SDK Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.javaoperatorsdk.operator.baseapi.filter;

import java.util.concurrent.atomic.AtomicInteger;

import io.javaoperatorsdk.operator.api.config.informer.Informer;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;

@ControllerConfiguration(informer = @Informer(onUpdateFilterOr = OrUpdateFilter.class))
public class OrFilterTestReconciler implements Reconciler<FilterTestCustomResource> {

public static final String TRIGGER_ANNOTATION = "trigger-or-filter";

private final AtomicInteger numberOfExecutions = new AtomicInteger(0);

@Override
public UpdateControl<FilterTestCustomResource> reconcile(
FilterTestCustomResource resource, Context<FilterTestCustomResource> context) {
numberOfExecutions.incrementAndGet();
return UpdateControl.noUpdate();
}

public int getNumberOfExecutions() {
return numberOfExecutions.get();
}
}
Loading
Loading