Skip to content

Commit

Permalink
Merge pull request #35 from datainfrahq/Fix-23-Reload-Segments
Browse files Browse the repository at this point in the history
Fix-23-Reload-Segments
  • Loading branch information
AdheipSingh authored May 4, 2023
2 parents b68cc27 + 2243e84 commit 7edd325
Show file tree
Hide file tree
Showing 18 changed files with 250 additions and 77 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ Based on Kubernetes operators, this control plane for apache pinot is responsibl
- Schema Management
- Tenant Management (experimental)

## Getting Started With Helm
```
helm repo add datainfra https://charts.datainfra.io
helm upgrade --install pinot-control-plane datainfra/pinot-control-plane
```

## :books: Documentation

- [Getting Started With Heterogeneous Pinot Clusters](./examples/01-pinot-hetero/)
Expand Down
1 change: 1 addition & 0 deletions api/v1beta1/pinottable_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type PinotTableStatus struct {
Message string `json:"message,omitempty"`
LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"`
CurrentTableJson string `json:"currentTable.json"`
ReloadStatus []string `json:"reloadStatus"`
}

// +kubebuilder:object:root=true
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions config/crd/bases/datainfra.io_pinottables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,17 @@ spec:
type: string
reason:
type: string
reloadStatus:
items:
type: string
type: array
status:
type: string
type:
type: string
required:
- currentTable.json
- reloadStatus
type: object
type: object
served: true
Expand Down
1 change: 1 addition & 0 deletions examples/00-pinot-basic/pinottable-basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ spec:
pinotCluster: pinot-basic
pinotSchema: airlinestats
pinotTableType: REALTIME
segmentReload: true
tables.json: |-
{
"tableName": "airlineStats",
Expand Down
4 changes: 2 additions & 2 deletions helm/pinot-control-plane/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ apiVersion: v2
name: pinot-control-plane
description: A Helm chart for Kubernetes
type: application
version: 0.0.8
appVersion: "v0.0.8"
version: 0.0.9
appVersion: "v0.0.9"
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,17 @@ spec:
type: string
reason:
type: string
reloadStatus:
items:
type: string
type: array
status:
type: string
type:
type: string
required:
- currentTable.json
- reloadStatus
type: object
type: object
served: true
Expand Down
2 changes: 1 addition & 1 deletion internal/schema_controller/pinotschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type PinotSchemaReconciler struct {
}

func NewPinotSchemaReconciler(mgr ctrl.Manager) *PinotSchemaReconciler {
initLogger := ctrl.Log.WithName("controllers").WithName("pinot")
initLogger := ctrl.Log.WithName("controllers").WithName("pinot-schema")
return &PinotSchemaReconciler{
Client: mgr.GetClient(),
Log: initLogger,
Expand Down
22 changes: 7 additions & 15 deletions internal/schema_controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package schemacontroller

import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -57,6 +56,10 @@ const (
PinotControllerPort = "9000"
)

const (
schemaName = "schemaName"
)

func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSchema) error {

build := builder.NewBuilder(
Expand Down Expand Up @@ -96,7 +99,7 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch
return err
}

schemaName, err := getSchemaName(schema.Spec.PinotSchemaJson)
schemaName, err := utils.GetValueFromJson(schema.Spec.PinotSchemaJson, schemaName)
if err != nil {
return err
}
Expand Down Expand Up @@ -148,7 +151,7 @@ func (r *PinotSchemaReconciler) CreateOrUpdate(
) (controllerutil.OperationResult, error) {

// get schema name
schemaName, err := getSchemaName(schema.Spec.PinotSchemaJson)
schemaName, err := utils.GetValueFromJson(schema.Spec.PinotSchemaJson, schemaName)
if err != nil {
return controllerutil.OperationResultNone, err
}
Expand Down Expand Up @@ -315,7 +318,7 @@ func (r *PinotSchemaReconciler) CreateOrUpdate(
build.Recorder.GenericEvent(
schema,
v1.EventTypeWarning,
fmt.Sprintf("Resp [%s]", string(respGetSchema.ResponseBody)),
fmt.Sprintf("Resp [%s]", string(respUpdateSchema.ResponseBody)),
PinotSchemaControllerUpdateFail,
)
return controllerutil.OperationResultNone, err
Expand All @@ -328,17 +331,6 @@ func (r *PinotSchemaReconciler) CreateOrUpdate(
return controllerutil.OperationResultNone, nil
}

func getSchemaName(schemaJson string) (string, error) {
var err error

schema := make(map[string]json.RawMessage)
if err = json.Unmarshal([]byte(schemaJson), &schema); err != nil {
return "", err
}

return utils.TrimQuote(string(schema["schemaName"])), nil
}

func makeControllerCreateSchemaPath(svcName string) string { return svcName + "/schemas" }

func makeControllerGetUpdateDeleteSchemaPath(svcName, schemaName string) string {
Expand Down
28 changes: 28 additions & 0 deletions internal/table_controller/paths.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tablecontroller

func makeControllerCreateTablePath(svcName string) string {
return svcName + "/tables"
}

func makeControllerGetUpdateDeleteTablePath(svcName, tableName string) string {
return svcName + "/tables/" + tableName
}

func makeControllerReloadTable(svcName, tableName string) string {
return svcName + "/segments/" + tableName + "/reload"
}
128 changes: 127 additions & 1 deletion internal/table_controller/pinottable_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,30 @@ package tablecontroller

import (
"context"
"net/http"
"os"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1"
datainfraiov1beta1 "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1"
internalHTTP "github.com/datainfrahq/pinot-control-plane-k8s/internal/http"
schemacontroller "github.com/datainfrahq/pinot-control-plane-k8s/internal/schema_controller"

"github.com/datainfrahq/pinot-control-plane-k8s/internal/utils"
"github.com/go-logr/logr"
)

Expand All @@ -44,7 +56,7 @@ type PinotTableReconciler struct {
}

func NewPinotTableReconciler(mgr ctrl.Manager) *PinotTableReconciler {
initLogger := ctrl.Log.WithName("controllers").WithName("pinot")
initLogger := ctrl.Log.WithName("controllers").WithName("pinot-table")
return &PinotTableReconciler{
Client: mgr.GetClient(),
Log: initLogger,
Expand Down Expand Up @@ -80,8 +92,122 @@ func (r *PinotTableReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// SetupWithManager sets up the controller with the Manager.
func (r *PinotTableReconciler) SetupWithManager(mgr ctrl.Manager) error {

p := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {

if !e.ObjectNew.GetDeletionTimestamp().IsZero() {
return false
}
r.Log.Info("Update Event Recieved, Wait for couple of seconds for schema controller to update status - table controller")

time.Sleep(time.Second * time.Duration(2))

schema := v1beta1.PinotSchema{}

if err := r.Client.Get(context.TODO(), types.NamespacedName{
Namespace: e.ObjectNew.GetNamespace(),
Name: e.ObjectNew.GetName(),
}, &schema); err != nil {
r.Log.Error(err, "Error getting schema - table controller")
return false
}

if time.Now().After(schema.Status.LastUpdateTime.Time) {
listOpts := []client.ListOption{
client.InNamespace(e.ObjectNew.GetNamespace()),
}

tableList := v1beta1.PinotTableList{}
if err := r.Client.List(context.TODO(), &tableList, listOpts...); err != nil {
return false
}

for _, table := range tableList.Items {
if table.Spec.SegmentReload {
if schema.Status.Message == schemacontroller.PinotSchemaControllerUpdateSuccess {
svcName, err := r.getControllerSvcUrl(table.Namespace, table.Spec.PinotCluster)
if err != nil {
r.Log.Error(err, "Error getting serviceName - table controller")
return false
}

segmentsConfig, err := utils.GetValueFromJson(table.Spec.PinotTablesJson, utils.SegmentsConfig)
if err != nil {
r.Log.Error(err, "Error getting schemaName - table controller")
return false
}
schemaNameinTable, err := utils.GetValueFromJson(segmentsConfig, utils.SchemaName)
if err != nil {
r.Log.Error(err, "Error getting schemaName in table - table controller")
return false
}

schemaNameinEvent, err := utils.GetValueFromJson(schema.Spec.PinotSchemaJson, utils.SchemaName)
if err != nil {
r.Log.Error(err, "Error getting schemaName in Update Event - table controller")
return false
}

getTableName, err := utils.GetValueFromJson(table.Spec.PinotTablesJson, utils.TableName)
if err != nil {
r.Log.Error(err, "Error getting tableName in Update Event - table controller")
return false
}

basicAuth, err := r.getAuthCreds(context.TODO(), &table)
if err != nil {
r.Log.Error(err, "Error getting authCreds in Update Event - table controller")
return false
}

if schemaNameinTable == schemaNameinEvent {
postHttp := internalHTTP.NewHTTPClient(
http.MethodPost,
makeControllerReloadTable(svcName, getTableName),
http.Client{},
[]byte{},
internalHTTP.Auth{BasicAuth: basicAuth},
)

reloadHttp, err := postHttp.Do()
if err != nil {
r.Log.Error(err, "Error getting reloading segments in Update Event - table controller")
return false
}
r.Recorder.Event(&table, v1.EventTypeNormal, reloadHttp.ResponseBody, PinotTableReloadAllSegments)

if _, _, err := utils.PatchStatus(context.Background(), r.Client, &table, func(obj client.Object) client.Object {
in := obj.(*v1beta1.PinotTable)
in.Status.ReloadStatus = append(in.Status.ReloadStatus, reloadHttp.ResponseBody)
return in
}); err != nil {
r.Log.Error(err, "Error patching reloading segments in Update Event - table controller")
return false
}

}

}
} else {
r.Log.Info("No suitable condition found for reload segments - update controller")
return false
}

}
}

return false
},
}

return ctrl.NewControllerManagedBy(mgr).
For(&datainfraiov1beta1.PinotTable{}).
Watches(
&source.Kind{Type: &v1beta1.PinotSchema{}},
&handler.EnqueueRequestForObject{},
builder.WithPredicates(p),
).
WithEventFilter(
GenericPredicates{},
).
Expand Down
Loading

0 comments on commit 7edd325

Please sign in to comment.