Skip to content

Commit

Permalink
Propagate image building and model deployment errors (#524)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->
# Description
<!-- Briefly describe the motivation for the change. Please include
illustrations where appropriate. -->

Returns more information when failures happen on image building and
model deployment process by propagating related kubernetes resources.

For image building, returns:
1. Kubernetes job conditions
2. Related pod's container status (in this case pyfunc-image-builder
container)
3. Related pod's last termination message

For model deployment, returns:
1. Kserve inference service conditions
2. Related pod's container status
3. Related pod's last termination message

## Screenshots

- When image building is OOMKilled:

<img width="1200" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/6725760b-4643-4e38-b94a-71d14527169b">

- When docker image for custom model not exist:
   
<img width="1200" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/4d3213bd-b8a2-4bf0-a06f-a6bb4aa8370e">

- When deployment timeout because no node available:

<img width="1200" alt="image"
src="https://github.com/caraml-dev/merlin/assets/8122852/eda55f3f-91b6-4917-b9f4-d3f76faaf5a2">

# Modifications
<!-- Summarize the key code changes. -->

1. Add ParsePodContainerStatuses utility functions
2. Add related parsing function to parse inference service and
kubernetes job.

# Tests
<!-- Besides the existing / updated automated tests, what specific
scenarios should be tested? Consider the backward compatibility of the
changes, whether corner cases are covered, etc. Please describe the
tests and check the ones that have been completed. Eg:
- [x] Deploying new and existing standard models
- [ ] Deploying PyFunc models
-->

# Checklist
- [ ] Added PR label
- [ ] Added unit test, integration, and/or e2e tests
- [ ] Tested locally
- [ ] Updated documentation
- [ ] Update Swagger spec if the PR introduce API changes
- [ ] Regenerated Golang and Python client if the PR introduces API
changes

# Release Notes
<!--
Does this PR introduce a user-facing change?
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```
  • Loading branch information
ariefrahmansyah authored Jan 30, 2024
1 parent 0857c40 commit 10e5498
Show file tree
Hide file tree
Showing 12 changed files with 585 additions and 78 deletions.
110 changes: 98 additions & 12 deletions api/cluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io"
"sort"
"time"

kservev1beta1 "github.com/kserve/kserve/pkg/apis/serving/v1beta1"
Expand All @@ -32,6 +33,7 @@ import (
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
policyv1client "k8s.io/client-go/kubernetes/typed/policy/v1"
"k8s.io/client-go/rest"
duckv1 "knative.dev/pkg/apis/duck/v1"
knservingclientset "knative.dev/serving/pkg/client/clientset/versioned"
knservingclient "knative.dev/serving/pkg/client/clientset/versioned/typed/serving/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -41,6 +43,7 @@ import (
"github.com/caraml-dev/merlin/log"
"github.com/caraml-dev/merlin/models"
"github.com/caraml-dev/merlin/pkg/deployment"
"github.com/caraml-dev/merlin/utils"
mlpcluster "github.com/caraml-dev/mlp/api/pkg/cluster"
)

Expand Down Expand Up @@ -254,7 +257,7 @@ func (c *controller) Deploy(ctx context.Context, modelService *models.Service) (
log.Errorf("unable to delete inference service %s with error %v", isvcName, err)
}

return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToGetInferenceServiceStatus, isvcName))
return nil, errors.Wrapf(err, fmt.Sprintf("%v (%s)", ErrUnableToCreateInferenceService, isvcName))
}

inferenceURL := models.GetInferenceURL(s.Status.URL, isvcName, modelService.Protocol)
Expand Down Expand Up @@ -335,28 +338,111 @@ func (c *controller) deleteInferenceService(serviceName string, namespace string
return nil
}

func (c *controller) waitInferenceServiceReady(service *kservev1beta1.InferenceService) (*kservev1beta1.InferenceService, error) {
func (c *controller) waitInferenceServiceReady(service *kservev1beta1.InferenceService) (isvc *kservev1beta1.InferenceService, err error) {
ctx := context.Background()

timeout := time.After(c.deploymentConfig.DeploymentTimeout)
ticker := time.NewTicker(time.Second * tickDurationSecond)

isvcConditionTable := ""
podContainerTable := ""
podLastTerminationMessage := ""
podLastTerminationReason := ""

defer func() {
if err == nil {
return
}

if isvcConditionTable != "" {
err = fmt.Errorf("%w\n\nModel service conditions:\n%s", err, isvcConditionTable)
}

if podContainerTable != "" {
err = fmt.Errorf("%w\n\nPod container status:\n%s", err, podContainerTable)
}

if podLastTerminationMessage != "" {
err = fmt.Errorf("%w\n\nPod last termination message:\n%s", err, podLastTerminationMessage)
}
}()

isvcWatcher, err := c.kserveClient.InferenceServices(service.Namespace).Watch(metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", service.Name),
})
if err != nil {
return nil, errors.Wrapf(err, "unable to initialize isvc watcher: %s", service.Name)
}

podWatcher, err := c.clusterClient.Pods(service.Namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("serving.kserve.io/inferenceservice=%s", service.Name),
})
if err != nil {
return nil, errors.Wrapf(err, "unable to initialize isvc's pods watcher: %s", service.Name)
}

for {
select {
case <-timeout:
log.Errorf("timeout waiting for inference service to be ready %s", service.Name)
return nil, ErrTimeoutCreateInferenceService
case <-ticker.C:
s, err := c.kserveClient.InferenceServices(service.Namespace).Get(service.Name, metav1.GetOptions{})
if err != nil {
log.Errorf("unable to get inference service status %s %v", service.Name, err)
return nil, ErrUnableToGetInferenceServiceStatus

case isvcEvent := <-isvcWatcher.ResultChan():
isvc, ok := isvcEvent.Object.(*kservev1beta1.InferenceService)
if !ok {
return nil, errors.New("unable to cast isvc object")
}
log.Debugf("isvc event received [%s]", isvc.Name)

if s.Status.IsReady() {
// Inference service is completely ready
return s, nil
if isvc.Status.Status.Conditions != nil {
// Update isvc condition table with latest conditions
isvcConditionTable, err = parseInferenceServiceConditions(isvc.Status.Status.Conditions)
}

if isvc.Status.IsReady() {
return isvc, nil
}

case podEvent := <-podWatcher.ResultChan():
pod, ok := podEvent.Object.(*corev1.Pod)
if !ok {
return nil, errors.New("unable to cast pod object")
}
log.Debugf("pod event received [%s]", pod.Name)

if len(pod.Status.ContainerStatuses) > 0 {
// Update pod container table with latest container statuses
podContainerTable, podLastTerminationMessage, podLastTerminationReason = utils.ParsePodContainerStatuses(pod.Status.ContainerStatuses)
err = errors.New(podLastTerminationReason)
}
}
}
}

func parseInferenceServiceConditions(isvcConditions duckv1.Conditions) (string, error) {
var err error

isvcConditionHeaders := []string{"TYPE", "STATUS", "REASON", "MESSAGE"}
isvcConditionRows := [][]string{}

sort.Slice(isvcConditions, func(i, j int) bool {
return isvcConditions[i].LastTransitionTime.Inner.Before(&isvcConditions[j].LastTransitionTime.Inner)
})

for _, condition := range isvcConditions {
isvcConditionRows = append(isvcConditionRows, []string{
string(condition.Type),
string(condition.Status),
condition.Reason,
condition.Message,
})

err = errors.New(condition.Reason)
if condition.Message != "" {
err = fmt.Errorf("%w: %s", err, condition.Message)
}
}

isvcTable := utils.LogTable(isvcConditionHeaders, isvcConditionRows)
return isvcTable, err
}

func (c *controller) ListPods(ctx context.Context, namespace, labelSelector string) (*corev1.PodList, error) {
Expand Down
62 changes: 41 additions & 21 deletions api/cluster/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake"
fakepolicyv1 "k8s.io/client-go/kubernetes/typed/policy/v1/fake"
k8stesting "k8s.io/client-go/testing"
ktesting "k8s.io/client-go/testing"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -98,6 +98,24 @@ type vsReactor struct {
err error
}

type isvcWatchReactor struct {
result chan watch.Event
}

func newIsvcWatchReactor(isvc *kservev1beta1.InferenceService) *isvcWatchReactor {
w := &isvcWatchReactor{result: make(chan watch.Event, 1)}
w.result <- watch.Event{Type: watch.Added, Object: isvc}
return w
}

func (w *isvcWatchReactor) Handles(action ktesting.Action) bool {
return action.GetResource().Resource == inferenceServiceResource
}

func (w *isvcWatchReactor) React(action ktesting.Action) (handled bool, ret watch.Interface, err error) {
return true, watch.NewProxyWatcher(w.result), nil
}

var (
clusterMetadata = Metadata{GcpProject: "my-gcp", ClusterName: "my-cluster"}

Expand Down Expand Up @@ -282,12 +300,11 @@ func TestController_DeployInferenceService_NamespaceCreation(t *testing.T) {
knClient := knservingfake.NewSimpleClientset().ServingV1()

kfClient := fakekserve.NewSimpleClientset().ServingV1beta1().(*fakekservev1beta1.FakeServingV1beta1)
kfClient.PrependReactor(getMethod, inferenceServiceResource, func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, isvc, nil
})
kfClient.PrependReactor(createMethod, inferenceServiceResource, func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, isvc, nil
})
isvcWatchReactor := newIsvcWatchReactor(isvc)
kfClient.WatchReactionChain = []ktesting.WatchReactor{isvcWatchReactor}

v1Client := fake.NewSimpleClientset().CoreV1()

Expand Down Expand Up @@ -564,7 +581,7 @@ func TestController_DeployInferenceService(t *testing.T) {
wantError: true,
},
{
name: "error: timeout",
name: "error: isvc timeout",
modelService: modelSvc,
createResult: &inferenceServiceReactor{
&kservev1beta1.InferenceService{ObjectMeta: metav1.ObjectMeta{Name: isvcName, Namespace: project.Name}},
Expand All @@ -577,7 +594,7 @@ func TestController_DeployInferenceService(t *testing.T) {
},
nil,
},
deployTimeout: 1 * time.Millisecond,
deployTimeout: 1 * time.Microsecond,
wantError: true,
},
{
Expand Down Expand Up @@ -645,9 +662,10 @@ func TestController_DeployInferenceService(t *testing.T) {
knClient := knservingfake.NewSimpleClientset()

kfClient := fakekserve.NewSimpleClientset().ServingV1beta1().(*fakekservev1beta1.FakeServingV1beta1)
kfClient.PrependReactor(getMethod, inferenceServiceResource, func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, tt.checkResult.isvc, tt.checkResult.err
})
if tt.checkResult != nil && tt.checkResult.isvc != nil {
isvcWatchReactor := newIsvcWatchReactor(tt.checkResult.isvc)
kfClient.WatchReactionChain = []ktesting.WatchReactor{isvcWatchReactor}
}
kfClient.PrependReactor(createMethod, inferenceServiceResource, func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, tt.createResult.isvc, tt.createResult.err
})
Expand All @@ -667,9 +685,11 @@ func TestController_DeployInferenceService(t *testing.T) {
})

istioClient := fakeistio.NewSimpleClientset().NetworkingV1beta1().(*fakeistionetworking.FakeNetworkingV1beta1)
istioClient.PrependReactor(patchMethod, virtualServiceResource, func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, tt.createVsResult.vs, tt.createVsResult.err
})
if tt.createVsResult != nil && tt.createVsResult.vs != nil {
istioClient.PrependReactor(patchMethod, virtualServiceResource, func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
return true, tt.createVsResult.vs, tt.createVsResult.err
})
}

deployConfig := config.DeploymentConfig{
DeploymentTimeout: tt.deployTimeout,
Expand Down Expand Up @@ -721,7 +741,7 @@ func TestGetCurrentDeploymentScale(t *testing.T) {

tests := map[string]struct {
components map[kservev1beta1.ComponentType]kservev1beta1.ComponentStatusSpec
rFunc func(action k8stesting.Action) (bool, runtime.Object, error)
rFunc func(action ktesting.Action) (bool, runtime.Object, error)
expectedScale clusterresource.DeploymentScale
}{
"failure | revision not found": {
Expand All @@ -730,8 +750,8 @@ func TestGetCurrentDeploymentScale(t *testing.T) {
LatestCreatedRevision: "test-predictor-0",
},
},
rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) {
expAction := k8stesting.NewGetAction(resourceItem, testNamespace, "test-predictor-0")
rFunc: func(action ktesting.Action) (bool, runtime.Object, error) {
expAction := ktesting.NewGetAction(resourceItem, testNamespace, "test-predictor-0")
// Check that the method is called with the expected action
assert.Equal(t, expAction, action)
// Return nil object and error to indicate non existent object
Expand All @@ -745,8 +765,8 @@ func TestGetCurrentDeploymentScale(t *testing.T) {
LatestCreatedRevision: "test-predictor-0",
},
},
rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) {
expAction := k8stesting.NewGetAction(resourceItem, testNamespace, "test-predictor-0")
rFunc: func(action ktesting.Action) (bool, runtime.Object, error) {
expAction := ktesting.NewGetAction(resourceItem, testNamespace, "test-predictor-0")
// Check that the method is called with the expected action
assert.Equal(t, expAction, action)
// Return test response
Expand All @@ -764,8 +784,8 @@ func TestGetCurrentDeploymentScale(t *testing.T) {
LatestCreatedRevision: "test-predictor-0",
},
},
rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) {
expAction := k8stesting.NewGetAction(resourceItem, testNamespace, "test-predictor-0")
rFunc: func(action ktesting.Action) (bool, runtime.Object, error) {
expAction := ktesting.NewGetAction(resourceItem, testNamespace, "test-predictor-0")
// Check that the method is called with the expected action
assert.Equal(t, expAction, action)
// Return test response
Expand All @@ -789,8 +809,8 @@ func TestGetCurrentDeploymentScale(t *testing.T) {
LatestCreatedRevision: "test-svc-0",
},
},
rFunc: func(action k8stesting.Action) (bool, runtime.Object, error) {
expAction := k8stesting.NewGetAction(resourceItem, testNamespace, "test-svc-0")
rFunc: func(action ktesting.Action) (bool, runtime.Object, error) {
expAction := ktesting.NewGetAction(resourceItem, testNamespace, "test-svc-0")
// Check that the method is called with the expected action
assert.Equal(t, expAction, action)
// Return test response
Expand Down
8 changes: 4 additions & 4 deletions api/cluster/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ var (
ErrUnableToCreateNamespace = errors.New("error creating namespace")
ErrUnableToGetNamespaceStatus = errors.New("error retrieving namespace status")
ErrUnableToGetInferenceServiceStatus = errors.New("error retrieving inference service status")
ErrUnableToCreateInferenceService = errors.New("error creating inference service")
ErrUnableToCreateInferenceService = errors.New("error deploying inference service")
ErrUnableToUpdateInferenceService = errors.New("error updating inference service")
ErrUnableToDeleteInferenceService = errors.New("error deleting inference service")
ErrUnableToDeletePreviousInferenceService = errors.New("error deleting previous inference service")
ErrTimeoutCreateInferenceService = errors.New("timeout creating inference service")
ErrUnableToCreatePDB = errors.New("error creating pod disruption budget")
ErrTimeoutCreateInferenceService = errors.New("timeout waiting inference service to be ready")
ErrUnableToCreatePDB = errors.New("error deploying pod disruption budget")
ErrUnableToDeletePDB = errors.New("error deleting pod disruption budget")
ErrUnableToCreateVirtualService = errors.New("error creating virtual service")
ErrUnableToCreateVirtualService = errors.New("error deploying virtual service")
ErrUnableToDeleteVirtualService = errors.New("error deleting virtual service")
)
9 changes: 5 additions & 4 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/gorilla/schema v1.1.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/heptiolabs/healthcheck v0.0.0-20180807145615-6ff867650f40
github.com/jedib0t/go-pretty/v6 v6.5.3
github.com/jinzhu/copier v0.3.5
github.com/kelseyhightower/envconfig v1.4.0
github.com/kserve/kserve v0.8.0
Expand All @@ -63,7 +64,7 @@ require (
github.com/rs/cors v1.8.2
github.com/soheilhy/cmux v0.1.4
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.8.3
github.com/stretchr/testify v1.8.4
github.com/xanzy/go-gitlab v0.32.0
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
Expand Down Expand Up @@ -179,7 +180,7 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-runewidth v0.0.12 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
Expand Down Expand Up @@ -222,8 +223,8 @@ require (
golang.org/x/net v0.10.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
gonum.org/v1/gonum v0.9.1 // indirect
Expand Down
Loading

0 comments on commit 10e5498

Please sign in to comment.