From 45724cfe9797f4e95c054f1bb25b5a060ff7ba90 Mon Sep 17 00:00:00 2001 From: dandawg <12484302+dandawg@users.noreply.github.com> Date: Wed, 11 Dec 2024 09:25:21 -0700 Subject: [PATCH] added replica support to online/offline store services Signed-off-by: dandawg <12484302+dandawg@users.noreply.github.com> --- .../api/v1alpha1/featurestore_types.go | 17 ++- .../api/v1alpha1/zz_generated.deepcopy.go | 25 +++- .../crd/bases/feast.dev_featurestores.yaml | 12 ++ infra/feast-operator/dist/install.yaml | 12 ++ .../featurestore_controller_db_store_test.go | 3 +- .../featurestore_controller_ephemeral_test.go | 3 +- ...restore_controller_kubernetes_auth_test.go | 3 +- ...eaturestore_controller_objectstore_test.go | 3 +- .../featurestore_controller_oidc_auth_test.go | 3 +- .../featurestore_controller_pvc_test.go | 3 +- .../featurestore_controller_test.go | 132 ++++++++++++++++-- .../internal/controller/services/services.go | 21 ++- .../internal/controller/services/util.go | 11 +- 13 files changed, 221 insertions(+), 27 deletions(-) diff --git a/infra/feast-operator/api/v1alpha1/featurestore_types.go b/infra/feast-operator/api/v1alpha1/featurestore_types.go index 0b516630cd2..847526a1b51 100644 --- a/infra/feast-operator/api/v1alpha1/featurestore_types.go +++ b/infra/feast-operator/api/v1alpha1/featurestore_types.go @@ -74,9 +74,9 @@ type FeatureStoreServices struct { // OfflineStore configures the deployed offline store service type OfflineStore struct { - ServiceConfigs `json:",inline"` - Persistence *OfflineStorePersistence `json:"persistence,omitempty"` - TLS *OfflineTlsConfigs `json:"tls,omitempty"` + StoreServiceConfigs `json:",inline"` + Persistence *OfflineStorePersistence `json:"persistence,omitempty"` + TLS *OfflineTlsConfigs `json:"tls,omitempty"` // LogLevel sets the logging level for the offline store service // Allowed values: "debug", "info", "warning", "error", "critical". // +kubebuilder:validation:Enum=debug;info;warning;error;critical @@ -131,9 +131,9 @@ var ValidOfflineStoreDBStorePersistenceTypes = []string{ // OnlineStore configures the deployed online store service type OnlineStore struct { - ServiceConfigs `json:",inline"` - Persistence *OnlineStorePersistence `json:"persistence,omitempty"` - TLS *TlsConfigs `json:"tls,omitempty"` + StoreServiceConfigs `json:",inline"` + Persistence *OnlineStorePersistence `json:"persistence,omitempty"` + TLS *TlsConfigs `json:"tls,omitempty"` // LogLevel sets the logging level for the online store service // Allowed values: "debug", "info", "warning", "error", "critical". // +kubebuilder:validation:Enum=debug;info;warning;error;critical @@ -290,6 +290,11 @@ type DefaultConfigs struct { Image *string `json:"image,omitempty"` } +type StoreServiceConfigs struct { + Replicas *int32 `json:"replicas,omitempty"` + ServiceConfigs `json:",inline"` +} + // OptionalConfigs k8s container settings that are optional type OptionalConfigs struct { Env *[]corev1.EnvVar `json:"env,omitempty"` diff --git a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go index 3f317c650e9..bccf9ec5378 100644 --- a/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/infra/feast-operator/api/v1alpha1/zz_generated.deepcopy.go @@ -273,7 +273,7 @@ func (in *LocalRegistryConfig) DeepCopy() *LocalRegistryConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OfflineStore) DeepCopyInto(out *OfflineStore) { *out = *in - in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + in.StoreServiceConfigs.DeepCopyInto(&out.StoreServiceConfigs) if in.Persistence != nil { in, out := &in.Persistence, &out.Persistence *out = new(OfflineStorePersistence) @@ -397,7 +397,7 @@ func (in *OidcAuthz) DeepCopy() *OidcAuthz { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *OnlineStore) DeepCopyInto(out *OnlineStore) { *out = *in - in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) + in.StoreServiceConfigs.DeepCopyInto(&out.StoreServiceConfigs) if in.Persistence != nil { in, out := &in.Persistence, &out.Persistence *out = new(OnlineStorePersistence) @@ -737,6 +737,27 @@ func (in *ServiceHostnames) DeepCopy() *ServiceHostnames { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StoreServiceConfigs) DeepCopyInto(out *StoreServiceConfigs) { + *out = *in + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } + in.ServiceConfigs.DeepCopyInto(&out.ServiceConfigs) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StoreServiceConfigs. +func (in *StoreServiceConfigs) DeepCopy() *StoreServiceConfigs { + if in == nil { + return nil + } + out := new(StoreServiceConfigs) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TlsConfigs) DeepCopyInto(out *TlsConfigs) { *out = *in diff --git a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml index 6929796d34f..c3284bc2fa8 100644 --- a/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml +++ b/infra/feast-operator/config/crd/bases/feast.dev_featurestores.yaml @@ -366,6 +366,9 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -738,6 +741,9 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -1604,6 +1610,9 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -1983,6 +1992,9 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/dist/install.yaml b/infra/feast-operator/dist/install.yaml index 9e213994eba..247b4cd73b9 100644 --- a/infra/feast-operator/dist/install.yaml +++ b/infra/feast-operator/dist/install.yaml @@ -374,6 +374,9 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -746,6 +749,9 @@ spec: x-kubernetes-validations: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -1612,6 +1618,9 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. @@ -1991,6 +2000,9 @@ spec: - message: One selection required between file or store. rule: '[has(self.file), has(self.store)].exists_one(c, c)' + replicas: + format: int32 + type: integer resources: description: ResourceRequirements describes the compute resource requirements. diff --git a/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go b/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go index 60235fe687e..a7bec761d3c 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_db_store_test.go @@ -127,6 +127,7 @@ var _ = Describe("FeatureStore Controller - db storage services", func() { Context("When deploying a resource with all db storage services", func() { const resourceName = "cr-name" var pullPolicy = corev1.PullAlways + var replicas = int32(1) ctx := context.Background() @@ -205,7 +206,7 @@ var _ = Describe("FeatureStore Controller - db storage services", func() { By("creating the custom resource for the Kind FeatureStore") err = k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{}) + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{}) resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ DBPersistence: &feastdevv1alpha1.OfflineStoreDBStorePersistence{ Type: string(offlineType), diff --git a/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go b/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go index 796de8e5260..a762faa5a21 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_ephemeral_test.go @@ -48,6 +48,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { const resourceName = "services-ephemeral" const offlineType = "duckdb" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -65,7 +66,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ diff --git a/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go b/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go index 4930f3fc590..57dd3a290df 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_kubernetes_auth_test.go @@ -48,6 +48,7 @@ var _ = Describe("FeatureStore Controller-Kubernetes authorization", func() { Context("When deploying a resource with all ephemeral services and Kubernetes authorization", func() { const resourceName = "kubernetes-authorization" var pullPolicy = corev1.PullAlways + var replicas = int32(1) ctx := context.Background() @@ -62,7 +63,7 @@ var _ = Describe("FeatureStore Controller-Kubernetes authorization", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{}) + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{}) resource.Spec.AuthzConfig = &feastdevv1alpha1.AuthzConfig{KubernetesAuthz: &feastdevv1alpha1.KubernetesAuthz{ Roles: roles, }} diff --git a/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go index db07418c92b..f4a21a28f17 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_objectstore_test.go @@ -46,6 +46,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { Context("When deploying a resource with all ephemeral services", func() { const resourceName = "services-object-store" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -67,7 +68,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) resource.Spec.Services.OnlineStore = nil resource.Spec.Services.OfflineStore = nil diff --git a/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go b/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go index c062a573df2..eb320c5bb39 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_oidc_auth_test.go @@ -49,6 +49,7 @@ var _ = Describe("FeatureStore Controller-OIDC authorization", func() { const resourceName = "oidc-authorization" const oidcSecretName = "oidc-secret" var pullPolicy = corev1.PullAlways + var replicas = int32(1) ctx := context.Background() @@ -73,7 +74,7 @@ var _ = Describe("FeatureStore Controller-OIDC authorization", func() { By("creating the custom resource for the Kind FeatureStore") err = k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{}) + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{}) resource.Spec.AuthzConfig = &feastdevv1alpha1.AuthzConfig{OidcAuthz: &feastdevv1alpha1.OidcAuthz{ SecretRef: corev1.LocalObjectReference{ Name: oidcSecretName, diff --git a/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go b/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go index d0adc62c7c8..fe0caa38e63 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_pvc_test.go @@ -50,6 +50,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { Context("When deploying a resource with all ephemeral services", func() { const resourceName = "services-pvc" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -77,7 +78,7 @@ var _ = Describe("FeatureStore Controller-Ephemeral services", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) resource.Spec.Services.OfflineStore.Persistence = &feastdevv1alpha1.OfflineStorePersistence{ FilePersistence: &feastdevv1alpha1.OfflineStoreFilePersistence{ diff --git a/infra/feast-operator/internal/controller/featurestore_controller_test.go b/infra/feast-operator/internal/controller/featurestore_controller_test.go index 44c81eca59a..debd63300b2 100644 --- a/infra/feast-operator/internal/controller/featurestore_controller_test.go +++ b/infra/feast-operator/internal/controller/featurestore_controller_test.go @@ -404,6 +404,7 @@ var _ = Describe("FeatureStore Controller", func() { Context("When reconciling a resource with all services enabled", func() { const resourceName = "services" var pullPolicy = corev1.PullAlways + var replicas = int32(1) var testEnvVarName = "testEnvVarName" var testEnvVarValue = "testEnvVarValue" @@ -419,7 +420,7 @@ var _ = Describe("FeatureStore Controller", func() { By("creating the custom resource for the Kind FeatureStore") err := k8sClient.Get(ctx, typeNamespacedName, featurestore) if err != nil && errors.IsNotFound(err) { - resource := createFeatureStoreResource(resourceName, image, pullPolicy, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, + resource := createFeatureStoreResource(resourceName, image, pullPolicy, replicas, &[]corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}}) Expect(k8sClient.Create(ctx, resource)).To(Succeed()) } @@ -870,6 +871,114 @@ var _ = Describe("FeatureStore Controller", func() { Expect(areEnvVarArraysEqual(deploy.Spec.Template.Spec.Containers[0].Env, []corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue + "1"}, {Name: services.FeatureStoreYamlEnvVar, Value: fsYamlStr}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.name"}}}})).To(BeTrue()) }) + It("Should scale online/offline store service", func() { + By("Reconciling the created resource") + controllerReconciler := &FeatureStoreReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + } + + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + resource := &feastdevv1alpha1.FeatureStore{} + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + + req, err := labels.NewRequirement(services.NameLabelKey, selection.Equals, []string{resource.Name}) + Expect(err).NotTo(HaveOccurred()) + labelSelector := labels.NewSelector().Add(*req) + listOpts := &client.ListOptions{Namespace: resource.Namespace, LabelSelector: labelSelector} + deployList := appsv1.DeploymentList{} + err = k8sClient.List(ctx, &deployList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(deployList.Items).To(HaveLen(3)) + + svcList := corev1.ServiceList{} + err = k8sClient.List(ctx, &svcList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(svcList.Items).To(HaveLen(3)) + + cmList := corev1.ConfigMapList{} + err = k8sClient.List(ctx, &cmList, listOpts) + Expect(err).NotTo(HaveOccurred()) + Expect(cmList.Items).To(HaveLen(1)) + + feast := services.FeastServices{ + Handler: handler.FeastHandler{ + Client: controllerReconciler.Client, + Context: ctx, + Scheme: controllerReconciler.Scheme, + FeatureStore: resource, + }, + } + + fsYamlStr := "" + fsYamlStr, err = feast.GetServiceFeatureStoreYamlBase64(services.OnlineFeastType) + Expect(err).NotTo(HaveOccurred()) + + // check online config + deploy_online := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy_online) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy_online.Spec.Template.Spec.ServiceAccountName).To(Equal(deploy_online.Name)) + Expect(deploy_online.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy_online.Spec.Template.Spec.Containers[0].Env).To(HaveLen(3)) + Expect(areEnvVarArraysEqual(deploy_online.Spec.Template.Spec.Containers[0].Env, []corev1.EnvVar{{Name: testEnvVarName, Value: testEnvVarValue}, {Name: services.FeatureStoreYamlEnvVar, Value: fsYamlStr}, {Name: "fieldRefName", ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{APIVersion: "v1", FieldPath: "metadata.namespace"}}}})).To(BeTrue()) + Expect(deploy_online.Spec.Template.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullAlways)) + + // check offline config + deploy_offline := &appsv1.Deployment{} + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy_offline) + Expect(err).NotTo(HaveOccurred()) + Expect(deploy_offline.Spec.Template.Spec.ServiceAccountName).To(Equal(deploy_offline.Name)) + Expect(deploy_offline.Spec.Template.Spec.Containers).To(HaveLen(1)) + Expect(deploy_offline.Spec.Template.Spec.Containers[0].Env).To(HaveLen(1)) + Expect(deploy_offline.Spec.Template.Spec.Containers[0].ImagePullPolicy).To(Equal(corev1.PullIfNotPresent)) + + // change feast project and reconcile + // scale online replicas to 2 + resourceNew := resource.DeepCopy() + new_replicas := int32(2) + resourceNew.Spec.Services.OnlineStore.Replicas = &new_replicas + resourceNew.Spec.Services.OfflineStore.Replicas = &new_replicas + + err = k8sClient.Update(ctx, resourceNew) + Expect(err).NotTo(HaveOccurred()) + _, err = controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: typeNamespacedName, + }) + Expect(err).NotTo(HaveOccurred()) + + err = k8sClient.Get(ctx, typeNamespacedName, resource) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OnlineFeastType), + Namespace: resource.Namespace, + }, + deploy_online) + Expect(err).NotTo(HaveOccurred()) + err = k8sClient.Get(ctx, types.NamespacedName{ + Name: feast.GetFeastServiceName(services.OfflineFeastType), + Namespace: resource.Namespace, + }, + deploy_offline) + Expect(err).NotTo(HaveOccurred()) + + Expect(deploy_online.Spec.Replicas).To(Equal(&new_replicas)) + Expect(deploy_offline.Spec.Replicas).To(Equal(&new_replicas)) + }) + It("Should delete k8s objects owned by the FeatureStore CR", func() { By("changing which feast services are configured in the CR") controllerReconciler := &FeatureStoreReconciler{ @@ -1253,7 +1362,7 @@ var _ = Describe("FeatureStore Controller", func() { }) }) -func createFeatureStoreResource(resourceName string, image string, pullPolicy corev1.PullPolicy, envVars *[]corev1.EnvVar) *feastdevv1alpha1.FeatureStore { +func createFeatureStoreResource(resourceName string, image string, pullPolicy corev1.PullPolicy, replicas int32, envVars *[]corev1.EnvVar) *feastdevv1alpha1.FeatureStore { return &feastdevv1alpha1.FeatureStore{ ObjectMeta: metav1.ObjectMeta{ Name: resourceName, @@ -1264,14 +1373,17 @@ func createFeatureStoreResource(resourceName string, image string, pullPolicy co Services: &feastdevv1alpha1.FeatureStoreServices{ OfflineStore: &feastdevv1alpha1.OfflineStore{}, OnlineStore: &feastdevv1alpha1.OnlineStore{ - ServiceConfigs: feastdevv1alpha1.ServiceConfigs{ - DefaultConfigs: feastdevv1alpha1.DefaultConfigs{ - Image: &image, - }, - OptionalConfigs: feastdevv1alpha1.OptionalConfigs{ - Env: envVars, - ImagePullPolicy: &pullPolicy, - Resources: &corev1.ResourceRequirements{}, + StoreServiceConfigs: feastdevv1alpha1.StoreServiceConfigs{ + Replicas: &replicas, + ServiceConfigs: feastdevv1alpha1.ServiceConfigs{ + DefaultConfigs: feastdevv1alpha1.DefaultConfigs{ + Image: &image, + }, + OptionalConfigs: feastdevv1alpha1.OptionalConfigs{ + Env: envVars, + ImagePullPolicy: &pullPolicy, + Resources: &corev1.ResourceRequirements{}, + }, }, }, }, diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 60aabebe024..9661a1f500e 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -293,7 +293,7 @@ func (feast *FeastServices) setDeployment(deploy *appsv1.Deployment, feastType F probeHandler := getProbeHandler(feastType, tls) deploy.Spec = appsv1.DeploymentSpec{ - Replicas: &DefaultReplicas, + Replicas: feast.getServiceReplicas(feastType), Selector: metav1.SetAsLabelSelector(deploy.GetLabels()), Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -485,6 +485,25 @@ func (feast *FeastServices) getServiceConfigs(feastType FeastServiceType) feastd return feastdevv1alpha1.ServiceConfigs{} } +func (feast *FeastServices) getServiceReplicas(feastType FeastServiceType) *int32 { + appliedServices := feast.Handler.FeatureStore.Status.Applied.Services + switch feastType { + case OfflineFeastType: + if feast.isOfflinStore() { + return appliedServices.OfflineStore.Replicas + } + case OnlineFeastType: + if feast.isOnlinStore() { + return appliedServices.OnlineStore.Replicas + } + case RegistryFeastType: + if feast.isLocalRegistry() { + return &DefaultReplicas + } + } + return &DefaultReplicas +} + func (feast *FeastServices) getLogLevelForType(feastType FeastServiceType) *string { services := feast.Handler.FeatureStore.Status.Applied.Services switch feastType { diff --git a/infra/feast-operator/internal/controller/services/util.go b/infra/feast-operator/internal/controller/services/util.go index 85bd02e653a..631709d6ba0 100644 --- a/infra/feast-operator/internal/controller/services/util.go +++ b/infra/feast-operator/internal/controller/services/util.go @@ -122,7 +122,7 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { } } - setServiceDefaultConfigs(&services.OfflineStore.ServiceConfigs.DefaultConfigs) + setStoreServiceDefaultConfigs(&services.OfflineStore.StoreServiceConfigs) } if services.OnlineStore != nil { @@ -147,7 +147,7 @@ func ApplyDefaultsToStatus(cr *feastdevv1alpha1.FeatureStore) { } } - setServiceDefaultConfigs(&services.OnlineStore.ServiceConfigs.DefaultConfigs) + setStoreServiceDefaultConfigs(&services.OnlineStore.StoreServiceConfigs) } // overwrite status.applied with every reconcile applied.DeepCopyInto(&cr.Status.Applied) @@ -159,6 +159,13 @@ func setServiceDefaultConfigs(defaultConfigs *feastdevv1alpha1.DefaultConfigs) { } } +func setStoreServiceDefaultConfigs(storeServiceConfigs *feastdevv1alpha1.StoreServiceConfigs) { + if storeServiceConfigs.Replicas == nil { + storeServiceConfigs.Replicas = &DefaultReplicas + } + setServiceDefaultConfigs(&storeServiceConfigs.ServiceConfigs.DefaultConfigs) +} + func checkOfflineStoreFilePersistenceType(value string) error { if slices.Contains(feastdevv1alpha1.ValidOfflineStoreFilePersistenceTypes, value) { return nil