Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3d67de3
improve: filter only own updates for read-after-write-conistency
csviri Jun 7, 2026
6018bec
wip
csviri Jun 8, 2026
4fad756
wip
csviri Jun 8, 2026
a067b8c
wip
csviri Jun 8, 2026
317d8df
Event filtering with recording
csviri Jun 8, 2026
9273843
test fix
csviri Jun 8, 2026
d8682cc
Simplified EventHandling
csviri Jun 9, 2026
41f6fcb
unit tests fix
csviri Jun 9, 2026
723e03c
small fix, test repeats
csviri Jun 9, 2026
d0f5eef
improvements and releated unit tests
csviri Jun 9, 2026
5ea96dc
cleanup
csviri Jun 9, 2026
0ad15bf
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
0e34663
docs: fix link to kroxy operator (#3408)
csviri Jun 9, 2026
fb9df9a
chore(deps): bump openrewrite.version from 8.84.5 to 8.84.6 (#3409)
dependabot[bot] Jun 10, 2026
dc4b4a5
fix: release job isn't respecting minor latest streams (#3405)
xstefank Jun 10, 2026
2ec3953
improvements on edge cases
csviri Jun 9, 2026
8e231a8
Potential fix for pull request finding
csviri Jun 10, 2026
478b90d
delete related improvements and unit tests
csviri Jun 10, 2026
b12269a
delete handling improvements and test improvements
csviri Jun 10, 2026
57a748a
wip
csviri Jun 10, 2026
c2f3f40
tests
csviri Jun 10, 2026
0c9668c
test fix
csviri Jun 10, 2026
a9178a7
fix typo
csviri Jun 10, 2026
5380a4b
Potential fix for pull request finding
csviri Jun 10, 2026
c1af4ac
fixes
csviri Jun 10, 2026
c940a71
improve: filter only own updates for read-after-write-conistency with…
csviri Jun 9, 2026
89448b4
test fixes
csviri Jun 10, 2026
ddb6525
logging and improvements
csviri Jun 10, 2026
10b35c1
test AI identified cases
csviri Jun 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,27 @@ jobs:
echo "tmp_version_branch=v4" >> "$GITHUB_ENV"
- if: ${{ startsWith(github.event.release.tag_name, 'v5.' ) }}
run: |
echo "Setting version_branch to main"
echo "tmp_version_branch=main" >> "$GITHUB_ENV"
RELEASE_VERSION="${{ github.event.release.tag_name }}"
RELEASE_VERSION="${RELEASE_VERSION#v}"
RELEASE_MAJOR_MINOR=$(echo "$RELEASE_VERSION" | cut -d. -f1-2)

MAIN_POM_VERSION=$(curl -fsSL "https://raw.githubusercontent.com/${{ github.repository }}/main/pom.xml" | yq -p xml '.project.version')
if [ -z "$MAIN_POM_VERSION" ]; then
echo "Failed to determine main branch POM version"
exit 1
fi
MAIN_MAJOR_MINOR=$(echo "$MAIN_POM_VERSION" | cut -d. -f1-2)

echo "Release tag major.minor: $RELEASE_MAJOR_MINOR"
echo "Main branch major.minor: $MAIN_MAJOR_MINOR"

if [ "$RELEASE_MAJOR_MINOR" = "$MAIN_MAJOR_MINOR" ]; then
echo "Setting version_branch to main"
echo "tmp_version_branch=main" >> "$GITHUB_ENV"
else
echo "Setting version_branch to ${RELEASE_MAJOR_MINOR}.x"
echo "tmp_version_branch=${RELEASE_MAJOR_MINOR}.x" >> "$GITHUB_ENV"
fi
- if: ${{ env.tmp_version_branch == '' }}
name: Fail if version_branch is not set
run: |
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ projects want to advertise that fact here. For this reason, we ask that if you'd
to be featured in this section, please open a PR, adding a link to and short description of your
project, as shown below:

- [kroxylicious](https://github.com/kroxylicious/kroxylicious/tree/main/kroxylicious-operator) Kafka proxy operator
- [kroxylicious](https://github.com/kroxylicious/kroxylicious/tree/main/kroxylicious-kubernetes/kroxylicious-operator) Kafka proxy operator
- [ExposedApp operator](https://github.com/halkyonio/exposedapp-rhdblog): a sample operator
written to illustrate JOSDK concepts and its Quarkus extension in the ["Write Kubernetes
Operators in Java with the Java Operator SDK" blog series](https://developers.redhat.com/articles/2022/02/15/write-kubernetes-java-java-operator-sdk#).
Expand Down
2 changes: 1 addition & 1 deletion migration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<description>OpenRewrite migration recipes for Java Operator SDK</description>

<properties>
<openrewrite.version>8.84.5</openrewrite.version>
<openrewrite.version>8.84.6</openrewrite.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.GenericResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected synchronized void handleEvent(
try {
if (log.isDebugEnabled()) {
log.debug("Event received with action: {}", action);
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
log.debug("Event Old resource: {},\n new resource: {}", oldResource, resource);
}
MDCUtils.addResourceInfo(resource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
Expand Down Expand Up @@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
ResourceAction action, T oldCustomResource, T newCustomResource) {
var handling =
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
if (handling == EventHandling.NEW) {
handleEvent(action, newCustomResource, oldCustomResource, null);
} else if (log.isDebugEnabled()) {
log.debug("{} event propagation for action: {}", handling, action);
}
handling.ifPresentOrElse(
this::handleEvent,
() -> {
if (log.isDebugEnabled()) {
log.debug("Skipping/deferring event propagation for action: {}", action);
}
});
}

@SuppressWarnings("unchecked")
private void handleEvent(GenericResourceEvent r) {
handleEvent(
r.getAction(),
(T) r.getResource().orElseThrow(),
(T) r.getPreviousResource().orElse(null),
r.getLastStateUnknow());
}

@Override
Expand All @@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
resource,
ResourceAction.DELETED,
() -> {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up
// caches on delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
res.ifPresent(this::handleEvent);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,34 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

class EventFilterDetails {

private static final Logger log = LoggerFactory.getLogger(EventFilterDetails.class);

private int activeUpdates = 0;
private ResourceEvent lastEvent;
private String lastOwnUpdatedResourceVersion;
private final List<GenericResourceEvent> relatedEvents = new ArrayList<>(5);
private final Set<String> allOwnResourceVersions = new HashSet<>(5);
private boolean affectedByReList;
private volatile boolean reListSummaryEventSent = false;

public EventFilterDetails(boolean affectedByReList) {
this.affectedByReList = affectedByReList;
}

public void increaseActiveUpdates() {
activeUpdates = activeUpdates + 1;
Expand All @@ -37,36 +53,146 @@ public void increaseActiveUpdates() {
* controller to prevent race condition and send event from {@link
* ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)}
*/
public boolean decreaseActiveUpdates(String updatedResourceVersion) {
if (updatedResourceVersion != null
&& (lastOwnUpdatedResourceVersion == null
|| ReconcilerUtilsInternal.compareResourceVersions(
updatedResourceVersion, lastOwnUpdatedResourceVersion)
> 0)) {
lastOwnUpdatedResourceVersion = updatedResourceVersion;
}

public boolean decreaseActiveUpdates() {
activeUpdates = activeUpdates - 1;
return activeUpdates == 0;
}

public void setLastEvent(ResourceEvent event) {
lastEvent = event;
public int getActiveUpdates() {
return activeUpdates;
}

public boolean isNoActiveUpdate() {
return activeUpdates == 0;
}

void addToOwnResourceVersions(String updateVersion) {
allOwnResourceVersions.add(updateVersion);
}

public void addRelatedEvent(GenericResourceEvent event) {
relatedEvents.add(event);
}

public Optional<GenericResourceEvent> summaryEventForReList() {
if (!affectedByReList) {
throw new IllegalStateException(
"ReList summary event requested to detail not affected by relist");
}
if (reListSummaryEventSent) {
throw new IllegalStateException("ReList summary event already sent");
}
reListSummaryEventSent = true;
if (relatedEvents.isEmpty()) {
return Optional.empty();
}
return summaryEvent();
}

// todo unit tests for corner cases with empty collections
public Optional<GenericResourceEvent> summaryEvent() {
if (relatedEvents.isEmpty()) {
return Optional.empty();
}
if (allOwnResourceVersions.containsAll(relatedEventResourceVersions())) {
return Optional.empty();
}
return summaryEventInternal();
}

public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
if (lastEvent != null
&& (lastOwnUpdatedResourceVersion == null
|| ReconcilerUtilsInternal.compareResourceVersions(
lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(),
lastOwnUpdatedResourceVersion)
> 0)) {
private Optional<GenericResourceEvent> summaryEventInternal() {
// we propagate delete event only if it is the last, if there are newer events
// means the resource was re-created (not necessarily by our controller)
var lastEvent = relatedEvents.get(relatedEvents.size() - 1);
if (lastEvent.getAction() == ResourceAction.DELETED) {
return Optional.of(lastEvent);
}
return Optional.empty();
if (relatedEvents.size() == 1) {
return Optional.of(relatedEvents.get(0));
}
var firstEvent = relatedEvents.get(0);
if (log.isDebugEnabled()) {
warnIfFirstEventLooksStale(firstEvent);
}
// Multiple events are collapsed into a single synthesized UPDATED. If the first event is an
// ADD (no previous resource), the added resource itself is used as the synthesized previous,
// intentionally losing the "creation" semantic — the reconciler is triggered by the merged
// event and reads the latest state on its own.
var firstResource =
firstEvent.getPreviousResource().orElseGet(() -> firstEvent.getResource().orElseThrow());

return Optional.of(
new GenericResourceEvent(
ResourceAction.UPDATED,
relatedEvents.get(relatedEvents.size() - 1).getResource().orElseThrow(),
firstResource,
null));
}

public int getActiveUpdates() {
return activeUpdates;
private void warnIfFirstEventLooksStale(GenericResourceEvent firstEvent) {
if (allOwnResourceVersions.isEmpty()) {
return;
}
var firstRv = firstEvent.getResource().orElseThrow().getMetadata().getResourceVersion();
var minOwn =
allOwnResourceVersions.stream()
.reduce((a, b) -> ReconcilerUtilsInternal.compareResourceVersions(a, b) <= 0 ? a : b)
.orElseThrow();
if (ReconcilerUtilsInternal.compareResourceVersions(firstRv, minOwn) < 0) {
log.warn(
"Synthesizing summary event with first relatedEvent rv={} older than smallest own rv={};"
+ " this likely indicates stale event carryover from a previously-parked filter"
+ " entry. {}",
firstRv,
minOwn,
this);
}
}

private Set<String> relatedEventResourceVersions() {
return relatedEvents.stream()
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
.collect(Collectors.toSet());
}

public boolean newerOrEqualEventReceivedForOwnLastUpdate() {
// this means our update was not successful
if (allOwnResourceVersions.isEmpty()) {
return true;
}
String lastOwn =
allOwnResourceVersions.stream()
.reduce((a, b) -> ReconcilerUtilsInternal.compareResourceVersions(a, b) >= 0 ? a : b)
.orElseThrow();
return relatedEvents.stream()
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
.anyMatch(rv -> ReconcilerUtilsInternal.compareResourceVersions(rv, lastOwn) >= 0);
}

public boolean isAffectedByReList() {
return affectedByReList;
}

public void affectedByReList() {
this.affectedByReList = true;
}

public boolean isReListSummaryEventSent() {
return reListSummaryEventSent;
}

@Override
public String toString() {
return "EventFilterDetails{activeUpdates="
+ activeUpdates
+ ", relatedEvents="
+ relatedEvents.size()
+ ", ownResourceVersions="
+ allOwnResourceVersions
+ ", affectedByReList="
+ affectedByReList
+ ", reListSummaryEventSent="
+ reListSummaryEventSent
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,32 @@
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

/** Used only for resource event filtering. */
public class ExtendedResourceEvent extends ResourceEvent {
public class GenericResourceEvent extends ResourceEvent {

private final HasMetadata previousResource;
private final Boolean lastStateUnknow;

public ExtendedResourceEvent(
public GenericResourceEvent(
ResourceAction action,
ResourceID resourceID,
HasMetadata latestResource,
HasMetadata previousResource) {
super(action, resourceID, latestResource);
HasMetadata previousResource,
Boolean lastStateUnknow) {
super(action, ResourceID.fromResource(latestResource), latestResource);
this.previousResource = previousResource;
this.lastStateUnknow = lastStateUnknow;
}

public Optional<HasMetadata> getPreviousResource() {
return Optional.ofNullable(previousResource);
}

public Boolean getLastStateUnknow() {
return lastStateUnknow;
}

@Override
public String toString() {
return "ExtendedResourceEvent{"
return "GenericResourceEvent{"
+ getPreviousResource()
.map(r -> "previousResourceVersion=" + r.getMetadata().getResourceVersion())
.orElse("")
Expand All @@ -61,7 +67,7 @@ public String toString() {
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ExtendedResourceEvent that = (ExtendedResourceEvent) o;
GenericResourceEvent that = (GenericResourceEvent) o;
return Objects.equals(previousResource, that.previousResource);
}

Expand Down
Loading
Loading