Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.GenericResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected synchronized void handleEvent(
try {
if (log.isDebugEnabled()) {
log.debug("Event received with action: {}", action);
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
log.debug("Event Old resource: {},\n new resource: {}", oldResource, resource);
}
Comment thread
csviri marked this conversation as resolved.
MDCUtils.addResourceInfo(resource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
Expand Down Expand Up @@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
ResourceAction action, T oldCustomResource, T newCustomResource) {
var handling =
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
if (handling == EventHandling.NEW) {
handleEvent(action, newCustomResource, oldCustomResource, null);
} else if (log.isDebugEnabled()) {
log.debug("{} event propagation for action: {}", handling, action);
}
handling.ifPresentOrElse(
this::handleEvent,
() -> {
if (log.isDebugEnabled()) {
log.debug("Skipping/deferring event propagation for action: {}", action);
}
});
}

@SuppressWarnings("unchecked")
private void handleEvent(GenericResourceEvent r) {
handleEvent(
r.getAction(),
(T) r.getResource().orElseThrow(),
(T) r.getPreviousResource().orElse(null),
r.getLastStateUnknow());
}

@Override
Expand All @@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
resource,
ResourceAction.DELETED,
() -> {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up
// caches on delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
res.ifPresent(this::handleEvent);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

class EventFilterDetails {

private int activeUpdates = 0;
private ResourceEvent lastEvent;
private String lastOwnUpdatedResourceVersion;
private final List<GenericResourceEvent> relatedEvents = new ArrayList<>(5);
private final Set<String> allOwnResourceVersions = new HashSet<>(5);

public void increaseActiveUpdates() {
activeUpdates = activeUpdates + 1;
Expand All @@ -37,36 +42,72 @@ public void increaseActiveUpdates() {
* controller to prevent race condition and send event from {@link
* ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)}
*/
public boolean decreaseActiveUpdates(String updatedResourceVersion) {
if (updatedResourceVersion != null
&& (lastOwnUpdatedResourceVersion == null
|| ReconcilerUtilsInternal.compareResourceVersions(
updatedResourceVersion, lastOwnUpdatedResourceVersion)
> 0)) {
lastOwnUpdatedResourceVersion = updatedResourceVersion;
}

public boolean decreaseActiveUpdates() {
activeUpdates = activeUpdates - 1;
return activeUpdates == 0;
}

public void setLastEvent(ResourceEvent event) {
lastEvent = event;
public int getActiveUpdates() {
return activeUpdates;
}

public boolean isNoActiveUpdate() {
return activeUpdates == 0;
}

void addToOwnResourceVersions(String updateVersion) {
allOwnResourceVersions.add(updateVersion);
}

public void addRelatedEvent(GenericResourceEvent event) {
relatedEvents.add(event);
}

public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
if (lastEvent != null
&& (lastOwnUpdatedResourceVersion == null
|| ReconcilerUtilsInternal.compareResourceVersions(
lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(),
lastOwnUpdatedResourceVersion)
> 0)) {
public Optional<GenericResourceEvent> prepareSummaryEventIfNotOwnEventsPresent() {
if (relatedEvents.isEmpty()) {
return Optional.empty();
}
if (allOwnResourceVersions.containsAll(relatedEventResourceVersions())) {
return Optional.empty();
}
// we propagate delete event only if it is the last, if there are newer events
// means the resource was re-created (not necessarily by our controller)
var lastEvent = relatedEvents.get(relatedEvents.size() - 1);
if (lastEvent.getAction() == ResourceAction.DELETED) {
return Optional.of(lastEvent);
}
return Optional.empty();
if (relatedEvents.size() == 1) {
return Optional.of(relatedEvents.get(0));
}
var firstEvent = relatedEvents.get(0);
var firstResource =
firstEvent.getPreviousResource().orElseGet(() -> firstEvent.getResource().orElseThrow());

return Optional.of(
new GenericResourceEvent(
ResourceAction.UPDATED,
relatedEvents.get(relatedEvents.size() - 1).getResource().orElseThrow(),
firstResource,
null));
}

public int getActiveUpdates() {
return activeUpdates;
private Set<String> relatedEventResourceVersions() {
return relatedEvents.stream()
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
.collect(Collectors.toSet());
}

public boolean newerOrEqualEventReceivedForOwnLastUpdate() {
// this means our update was not successful
if (allOwnResourceVersions.isEmpty()) {
return true;
}
String lastOwn =
allOwnResourceVersions.stream()
.reduce((a, b) -> ReconcilerUtilsInternal.compareResourceVersions(a, b) >= 0 ? a : b)
.orElseThrow();
return relatedEvents.stream()
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
.anyMatch(rv -> ReconcilerUtilsInternal.compareResourceVersions(rv, lastOwn) >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,32 @@
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

/** Used only for resource event filtering. */
public class ExtendedResourceEvent extends ResourceEvent {
public class GenericResourceEvent extends ResourceEvent {

private final HasMetadata previousResource;
private final Boolean lastStateUnknow;

public ExtendedResourceEvent(
public GenericResourceEvent(
ResourceAction action,
ResourceID resourceID,
HasMetadata latestResource,
HasMetadata previousResource) {
super(action, resourceID, latestResource);
HasMetadata previousResource,
Boolean lastStateUnknow) {
super(action, ResourceID.fromResource(latestResource), latestResource);
this.previousResource = previousResource;
this.lastStateUnknow = lastStateUnknow;
}

public Optional<HasMetadata> getPreviousResource() {
return Optional.ofNullable(previousResource);
}

public Boolean getLastStateUnknow() {
return lastStateUnknow;
}

@Override
public String toString() {
return "ExtendedResourceEvent{"
return "GenericResourceEvent{"
+ getPreviousResource()
.map(r -> "previousResourceVersion=" + r.getMetadata().getResourceVersion())
.orElse("")
Expand All @@ -61,7 +67,7 @@ public String toString() {
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ExtendedResourceEvent that = (ExtendedResourceEvent) o;
GenericResourceEvent that = (GenericResourceEvent) o;
return Objects.equals(previousResource, that.previousResource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

/**
* Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
Expand Down Expand Up @@ -123,8 +122,15 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown)
log.debug(
"On delete event received. deletedFinalStateUnknown: {}", deletedFinalStateUnknown);
}
var resultEvent =
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
if (resultEvent.isEmpty()) {
return;
}
if (resultEvent.orElseThrow().getAction() != ResourceAction.DELETED) {
log.warn("Non delete event received on onDelete handling. This should not happen.");
}
primaryToSecondaryIndex.onDelete(resource);
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
if (acceptedByDeleteFilters(resource, deletedFinalStateUnknown)) {
propagateEvent(resource);
}
Expand Down Expand Up @@ -152,22 +158,26 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
primaryToSecondaryIndex.onAddOrUpdate(newObject);
var resourceID = ResourceID.fromResource(newObject);

var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
var resultEvent = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);

if (eventHandling != EventHandling.NEW) {
log.debug(
"{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping");
if (resultEvent.isEmpty()) {
log.debug("Deferring event propagation");
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
log.debug(
"Propagating event for {}, resource with same version not result of a reconciliation.",
"Propagating event for {}, resource with same version not result of a our update.",
action);
Comment on lines 166 to 168
propagateEvent(newObject);
var event = resultEvent.get();
handleEvent(
event.getAction(),
(R) event.getResource().orElseThrow(),
(R) event.getPreviousResource().orElse(null),
event.getLastStateUnknow());
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
}
}

private void propagateEvent(R object) {
protected void propagateEvent(R object) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
if (primaryResourceIdSet.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.*;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;

@SuppressWarnings("rawtypes")
public abstract class ManagedInformerEventSource<
Expand Down Expand Up @@ -101,45 +100,19 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
try {
temporaryResourceCache.startEventFilteringModify(id);
updatedResource = updateMethod.apply(resourceToUpdate);
log.debug("Resource update successful");
handleRecentResourceUpdate(id, updatedResource, resourceToUpdate);
log.debug("Caching resource update successful");
return updatedResource;
} finally {
var res =
temporaryResourceCache.doneEventFilterModify(
id,
updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
var updatedForLambda = updatedResource;
var res = temporaryResourceCache.doneEventFilterModify(id);
res.ifPresentOrElse(
r -> {
R latestResource = (R) r.getResource().orElseThrow();
// as previous resource version we use the one from successful update, since
// we process new event here only if that is more recent then the event from our update.
// Note that this is equivalent with the scenario when an informer watch connection
// would reconnect and loose some events in between.
// If that update was not successful we still record the previous version from the
// actual event in the ExtendedResourceEvent.
R extendedResourcePrevVersion =
(r instanceof ExtendedResourceEvent)
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
: null;
R prevVersionOfResource =
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
if (log.isDebugEnabled()) {
log.debug(
"Previous resource version: {} resource from update present: {}"
+ " extendedPrevResource present: {}",
prevVersionOfResource.getMetadata().getResourceVersion(),
updatedForLambda != null,
extendedResourcePrevVersion != null);
}
log.debug("Propagating not own event");
handleEvent(
r.getAction(),
latestResource,
prevVersionOfResource,
(r instanceof ResourceDeleteEvent)
? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown()
: null);
(R) r.getResource().orElseThrow(),
(R) r.getPreviousResource().orElse(null),
r.getLastStateUnknow());
},
() -> log.debug("No new event present after the filtering update"));
}
Expand Down
Loading
Loading