Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions infra/feast-operator/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ import (
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -59,6 +66,29 @@ func init() {
// +kubebuilder:scaffold:scheme
}

func newCacheOptions() cache.Options {
managedBySelector := labels.SelectorFromSet(labels.Set{
services.ManagedByLabelKey: services.ManagedByLabelValue,
})
managedByFilter := cache.ByObject{Label: managedBySelector}

return cache.Options{
DefaultTransform: cache.TransformStripManagedFields(),
ByObject: map[client.Object]cache.ByObject{
&corev1.ConfigMap{}: managedByFilter,
&appsv1.Deployment{}: managedByFilter,
&corev1.Service{}: managedByFilter,
&corev1.ServiceAccount{}: managedByFilter,
&corev1.PersistentVolumeClaim{}: managedByFilter,
&rbacv1.RoleBinding{}: managedByFilter,
&rbacv1.Role{}: managedByFilter,
&batchv1.CronJob{}: managedByFilter,
&autoscalingv2.HorizontalPodAutoscaler{}: managedByFilter,
&policyv1.PodDisruptionBudget{}: managedByFilter,
},
}
Comment on lines +77 to +89
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Cache label filter breaks operator for pre-existing resources on upgrade

The new cache ByObject configuration at infra/feast-operator/cmd/main.go:77-88 restricts informer caches to only watch resources with the app.kubernetes.io/managed-by: feast-operator label. However, only ConfigMap and Secret are in the DisableFor list (bypassing cache). All other types (Deployment, Service, ServiceAccount, PVC, RoleBinding, Role, CronJob, HPA, PDB) are read through the label-filtered cache.

On upgrade from a prior version, existing operator-managed resources won't have the managed-by label. When the operator reconciles an existing FeatureStore:

  1. controllerutil.CreateOrUpdate() calls Get() → goes through the cache → cache returns NotFound (object exists in API server but lacks the label, so the informer never cached it)
  2. CreateOrUpdate then calls Create() → fails with AlreadyExists
  3. Error propagates up, reconciliation fails and retries forever

This is a permanent deadlock: the operator can never update the resource to add the label because it can't find it through the cache. This affects all existing FeatureStore instances after an operator upgrade.

Prompt for agents
The cache ByObject label-selector filtering causes all cached Get/List calls to miss pre-existing resources that lack the managed-by label. On upgrade, this creates a permanent deadlock where the operator cannot reconcile existing FeatureStore instances.

Possible approaches to fix:
1. Add the resource types that might pre-exist (Deployment, Service, ServiceAccount, PVC, RoleBinding, Role, CronJob, HPA, PDB) to the Client.CacheOptions.DisableFor list, similar to ConfigMap and Secret. This bypasses the cache for reads but still benefits from the label-filtered watch for triggering reconciliation events.
2. Implement a startup migration that labels all existing operator-owned resources (identified by owner references to FeatureStore CRs) with the managed-by label before starting the informers.
3. Use a cache.TransformFunc that adds the label to objects as they are fetched, though this doesn't help with the initial list.

Option 1 is simplest but loses cache benefits for reads. Option 2 is more correct but requires additional startup logic. The key files involved are cmd/main.go (cache config), and all resource creation paths in internal/controller/services/services.go, scaling.go, client.go, cronjob.go, namespace_registry.go, and internal/controller/authz/authz.go.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}

func main() {
var metricsAddr string
var enableLeaderElection bool
Expand Down Expand Up @@ -145,6 +175,7 @@ func main() {
// if you are doing or is intended to do any operation such as perform cleanups
// after the manager stops then its usage might be unsafe.
// LeaderElectionReleaseOnCancel: true,
Cache: newCacheOptions(),
Client: client.Options{
Cache: &client.CacheOptions{
DisableFor: []client.Object{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
- op: test
path: "/spec/template/spec/containers/0/env/1/name"
value: RELATED_IMAGE_FEATURE_SERVER
- op: replace
path: "/spec/template/spec/containers/0/env/0"
path: "/spec/template/spec/containers/0/env/1"
value:
name: RELATED_IMAGE_FEATURE_SERVER
value: ${FS_IMG}
- op: test
path: "/spec/template/spec/containers/0/env/2/name"
value: RELATED_IMAGE_CRON_JOB
- op: replace
path: "/spec/template/spec/containers/0/env/1"
path: "/spec/template/spec/containers/0/env/2"
value:
name: RELATED_IMAGE_CRON_JOB
value: ${CJ_IMG}
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
- op: test
path: "/spec/template/spec/containers/0/env/1/name"
value: RELATED_IMAGE_FEATURE_SERVER
- op: replace
path: "/spec/template/spec/containers/0/env/0"
path: "/spec/template/spec/containers/0/env/1"
value:
name: RELATED_IMAGE_FEATURE_SERVER
value: quay.io/feastdev/feature-server:0.62.0
- op: test
path: "/spec/template/spec/containers/0/env/2/name"
value: RELATED_IMAGE_CRON_JOB
- op: replace
path: "/spec/template/spec/containers/0/env/1"
path: "/spec/template/spec/containers/0/env/2"
value:
name: RELATED_IMAGE_CRON_JOB
value: quay.io/openshift/origin-cli:4.17
2 changes: 2 additions & 0 deletions infra/feast-operator/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ spec:
drop:
- "ALL"
env:
- name: GOMEMLIMIT
value: "230MiB"
- name: RELATED_IMAGE_FEATURE_SERVER
value: feast:latest
- name: RELATED_IMAGE_CRON_JOB
Expand Down
2 changes: 2 additions & 0 deletions infra/feast-operator/dist/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20650,6 +20650,8 @@ spec:
command:
- /manager
env:
- name: GOMEMLIMIT
value: 230MiB
- name: RELATED_IMAGE_FEATURE_SERVER
value: quay.io/feastdev/feature-server:0.62.0
- name: RELATED_IMAGE_CRON_JOB
Expand Down
1 change: 1 addition & 0 deletions infra/feast-operator/internal/controller/authz/authz.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (authz *FeastAuthorization) getLabels() map[string]string {
return map[string]string{
services.NameLabelKey: authz.Handler.FeatureStore.Name,
services.ServiceTypeLabelKey: string(services.AuthzFeastType),
services.ManagedByLabelKey: services.ManagedByLabelValue,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (feast *FeastServices) setNamespaceRegistryRoleBinding(rb *rbacv1.RoleBindi
ObjectMeta: metav1.ObjectMeta{
Name: roleName,
Namespace: rb.Namespace,
Labels: feast.getLabels(),
},
}
role.Rules = desiredRules
Expand All @@ -205,6 +206,7 @@ func (feast *FeastServices) setNamespaceRegistryRoleBinding(rb *rbacv1.RoleBindi
}
}
Comment on lines 206 to 207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Namespace registry Role Get from filtered cache fails after tolerated AlreadyExists on Create

In setNamespaceRegistryRoleBinding, the code creates a Role via Client.Create and tolerates AlreadyExists (line 187-189). It then immediately calls Client.Get on the same Role (line 193-198). Client.Get goes through the label-filtered cache. If the Role already existed without the managed-by label (pre-upgrade), or even if it was just created (cache hasn't synced the watch event yet), Get returns NotFound, causing the function to return an error. This breaks namespace registry reconciliation on upgrade and introduces a race condition even on fresh installs.

(Refers to lines 182-207)

Prompt for agents
In setNamespaceRegistryRoleBinding (namespace_registry.go), the Role is created with Client.Create (AlreadyExists tolerated), then immediately re-fetched with Client.Get through the filtered cache. This fails if (a) the Role existed before the managed-by label was introduced, or (b) the cache informer hasn't processed the watch event yet for a newly created Role.

The fix should switch from the manual Create-then-Get pattern to using controllerutil.CreateOrUpdate for the Role (similar to how RoleBindings are handled), or use a direct API server read (bypassing cache) for the re-fetch. If using CreateOrUpdate, note that this is also affected by BUG-0001 for the upgrade case. A robust approach would be to use Server-Side Apply (Patch with ApplyPatchType) for the Role, similar to how HPA and PDB are handled in scaling.go, which avoids both the cache lookup and AlreadyExists issues entirely.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


rb.Labels = feast.getLabels()
rb.RoleRef = rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "Role",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (feast *FeastServices) buildPDBApplyConfig() *pdbac.PodDisruptionBudgetAppl
WithBlockOwnerDeletion(true),
).
WithSpec(pdbac.PodDisruptionBudgetSpec().
WithSelector(metaac.LabelSelector().WithMatchLabels(feast.getLabels())),
WithSelector(metaac.LabelSelector().WithMatchLabels(feast.getSelectorLabels())),
)

if pdbConfig.MinAvailable != nil {
Expand All @@ -249,8 +249,7 @@ func (feast *FeastServices) updateScalingStatus(deploy *appsv1.Deployment) {
cr := feast.Handler.FeatureStore

cr.Status.Replicas = deploy.Status.ReadyReplicas
labels := feast.getLabels()
cr.Status.Selector = metav1.FormatLabelSelector(metav1.SetAsLabelSelector(labels))
cr.Status.Selector = metav1.FormatLabelSelector(metav1.SetAsLabelSelector(feast.getSelectorLabels()))

if !isScalingEnabled(cr) {
cr.Status.ScalingStatus = nil
Expand Down
39 changes: 28 additions & 11 deletions infra/feast-operator/internal/controller/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,13 @@ func (feast *FeastServices) createPVC(pvcCreate *feastdevv1.PvcCreate, feastType
}

// PVCs are immutable, so we only create... we don't update an existing one.
// Treat AlreadyExists as success: a pre-existing PVC without the managed-by label
// won't appear in the filtered cache (Client.Get returns NotFound), but Create
// will hit AlreadyExists on the API server — both cases mean the PVC is present.
err = feast.Handler.Client.Get(feast.Handler.Context, client.ObjectKeyFromObject(pvc), pvc)
if err != nil && apierrors.IsNotFound(err) {
err = feast.Handler.Client.Create(feast.Handler.Context, pvc)
if err != nil {
if err != nil && !apierrors.IsAlreadyExists(err) {
return err
}
logger.Info("Successfully created", "PersistentVolumeClaim", pvc.Name)
Expand All @@ -408,9 +411,10 @@ func (feast *FeastServices) setDeployment(deploy *appsv1.Deployment) error {
}

deploy.Labels = feast.getLabels()
selectorLabels := feast.getSelectorLabels()
deploy.Spec = appsv1.DeploymentSpec{
Replicas: replicas,
Selector: metav1.SetAsLabelSelector(deploy.GetLabels()),
Selector: metav1.SetAsLabelSelector(selectorLabels),
Strategy: feast.getDeploymentStrategy(),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -818,7 +822,7 @@ func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServi
}

svc.Spec = corev1.ServiceSpec{
Selector: feast.getLabels(),
Selector: feast.getSelectorLabels(),
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Expand Down Expand Up @@ -868,6 +872,7 @@ func (feast *FeastServices) setServiceAccount(sa *corev1.ServiceAccount) error {

func (feast *FeastServices) createNewPVC(pvcCreate *feastdevv1.PvcCreate, feastType FeastServiceType) (*corev1.PersistentVolumeClaim, error) {
pvc := feast.initPVC(feastType)
pvc.Labels = feast.getFeastTypeLabels(feastType)

pvc.Spec = corev1.PersistentVolumeClaimSpec{
AccessModes: pvcCreate.AccessModes,
Expand Down Expand Up @@ -976,7 +981,7 @@ func (feast *FeastServices) applyTopologySpread(podSpec *corev1.PodSpec) {
MaxSkew: 1,
TopologyKey: "topology.kubernetes.io/zone",
WhenUnsatisfiable: corev1.ScheduleAnyway,
LabelSelector: metav1.SetAsLabelSelector(feast.getLabels()),
LabelSelector: metav1.SetAsLabelSelector(feast.getSelectorLabels()),
}}
}

Expand All @@ -999,10 +1004,10 @@ func (feast *FeastServices) applyAffinity(podSpec *corev1.PodSpec) {
Weight: 100,
PodAffinityTerm: corev1.PodAffinityTerm{
TopologyKey: "kubernetes.io/hostname",
LabelSelector: metav1.SetAsLabelSelector(feast.getLabels()),
},
}},
},
LabelSelector: metav1.SetAsLabelSelector(feast.getSelectorLabels()),
},
}},
},
}
}

Expand Down Expand Up @@ -1060,12 +1065,24 @@ func (feast *FeastServices) getFeastTypeLabels(feastType FeastServiceType) map[s
return labels
}

func (feast *FeastServices) getLabels() map[string]string {
// getSelectorLabels returns the minimal label set used for immutable selectors
// (Deployment spec.selector, Service spec.selector, TopologySpreadConstraints, PodAffinity).
// This must NOT change after initial resource creation.
func (feast *FeastServices) getSelectorLabels() map[string]string {
return map[string]string{
NameLabelKey: feast.Handler.FeatureStore.Name,
}
}

// getLabels returns the full label set for mutable metadata (ObjectMeta.Labels).
// Includes the managed-by label used by the informer cache filter.
func (feast *FeastServices) getLabels() map[string]string {
return map[string]string{
NameLabelKey: feast.Handler.FeatureStore.Name,
ManagedByLabelKey: ManagedByLabelValue,
}
}

func (feast *FeastServices) setServiceHostnames() error {
feast.Handler.FeatureStore.Status.ServiceHostnames = feastdevv1.ServiceHostnames{}
domain := svcDomain + ":"
Expand Down Expand Up @@ -1438,10 +1455,10 @@ func IsDeploymentAvailable(conditions []appsv1.DeploymentCondition) bool {
// container that is in a failing state. Returns empty string if no failure found.
func (feast *FeastServices) GetPodContainerFailureMessage(deploy appsv1.Deployment) string {
podList := corev1.PodList{}
labels := feast.getLabels()
selectorLabels := feast.getSelectorLabels()
if err := feast.Handler.Client.List(feast.Handler.Context, &podList,
client.InNamespace(deploy.Namespace),
client.MatchingLabels(labels),
client.MatchingLabels(selectorLabels),
); err != nil {
return ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ const (
OidcMissingSecretError string = "missing OIDC secret: %s"
)

const (
ManagedByLabelKey = "app.kubernetes.io/managed-by"
ManagedByLabelValue = "feast-operator"
)

var (
DefaultImage = "quay.io/feastdev/feature-server:" + feastversion.FeastVersion
DefaultCronJobImage = "quay.io/openshift/origin-cli:4.17"
Expand Down
Loading