Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func makePodListFunc(ds datastore.Datastore) func() []types.NamespacedName {
names := make([]types.NamespacedName, 0, len(pods))

for _, p := range pods {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume a future PR will remove all references to Pod?
For example for _, p := range pods of the function name/type (makePodListFunc)

names = append(names, p.GetPod().NamespacedName)
names = append(names, p.GetMetadata().NamespacedName)
}
return names
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/epp/backend/metrics/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ type FakePodMetrics struct {
}

func (fpm *FakePodMetrics) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v", fpm.GetPod(), fpm.GetMetrics())
return fmt.Sprintf("Metadata: %v; Metrics: %v", fpm.GetMetadata(), fpm.GetMetrics())
}

func (fpm *FakePodMetrics) GetPod() *backend.Pod {
func (fpm *FakePodMetrics) GetMetadata() *backend.Pod {
return fpm.Pod
}

func (fpm *FakePodMetrics) GetMetrics() *MetricsState {
return fpm.Metrics
}

func (fpm *FakePodMetrics) UpdatePod(pod *datalayer.PodInfo) {
fpm.Pod = pod
func (fpm *FakePodMetrics) UpdateMetadata(metadata *datalayer.EndpointMetadata) {
fpm.Pod = metadata
}
func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes {
return fpm.Attributes
Expand Down
23 changes: 11 additions & 12 deletions pkg/epp/backend/metrics/pod_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/go-logr/logr"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)
Expand All @@ -35,7 +34,7 @@ const (
)

type podMetrics struct {
pod atomic.Pointer[backend.Pod]
metadata atomic.Pointer[datalayer.EndpointMetadata]
metrics atomic.Pointer[MetricsState]
pmc PodMetricsClient
ds datalayer.PoolInfo
Expand All @@ -49,31 +48,31 @@ type podMetrics struct {
}

type PodMetricsClient interface {
FetchMetrics(ctx context.Context, pod *backend.Pod, existing *MetricsState) (*MetricsState, error)
FetchMetrics(ctx context.Context, pod *datalayer.EndpointMetadata, existing *MetricsState) (*MetricsState, error)
}

func (pm *podMetrics) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v", pm.GetPod(), pm.GetMetrics())
return fmt.Sprintf("Metadata: %v; Metrics: %v", pm.GetMetadata(), pm.GetMetrics())
}

func (pm *podMetrics) GetPod() *backend.Pod {
return pm.pod.Load()
func (pm *podMetrics) GetMetadata() *datalayer.EndpointMetadata {
return pm.metadata.Load()
}

func (pm *podMetrics) GetMetrics() *MetricsState {
return pm.metrics.Load()
}

func (pm *podMetrics) UpdatePod(pod *datalayer.PodInfo) {
pm.pod.Store(pod)
func (pm *podMetrics) UpdateMetadata(pod *datalayer.EndpointMetadata) {
pm.metadata.Store(pod)
}

// start starts a goroutine exactly once to periodically update metrics. The goroutine will be
// stopped either when stop() is called, or the given ctx is cancelled.
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
pm.startOnce.Do(func() {
go func() {
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "metadata", pm.GetMetadata())
ticker := time.NewTicker(pm.interval)
defer ticker.Stop()
for {
Expand All @@ -84,7 +83,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
return
case <-ticker.C: // refresh metrics periodically
if err := pm.refreshMetrics(); err != nil {
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "pod", pm.GetPod())
pm.logger.V(logutil.TRACE).Error(err, "Failed to refresh metrics", "metadata", pm.GetMetadata())
}
}
}
Expand All @@ -95,7 +94,7 @@ func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
func (pm *podMetrics) refreshMetrics() error {
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
defer cancel()
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetPod(), pm.GetMetrics())
updated, err := pm.pmc.FetchMetrics(ctx, pm.GetMetadata(), pm.GetMetrics())
if err != nil {
pm.logger.V(logutil.TRACE).Info("Failed to refreshed metrics:", "err", err)
}
Expand All @@ -115,7 +114,7 @@ func (pm *podMetrics) refreshMetrics() error {
}

func (pm *podMetrics) stopRefreshLoop() {
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "metadata", pm.GetMetadata())
pm.stopOnce.Do(func() {
close(pm.done)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/backend/metrics/pod_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

var (
pod1Info = &datalayer.PodInfo{
pod1Info = &datalayer.EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod1-rank-0",
Namespace: "default",
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ type PodMetricsFactory struct {
refreshMetricsInterval time.Duration
}

func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, pod *datalayer.PodInfo, ds datalayer.PoolInfo) PodMetrics {
func (f *PodMetricsFactory) NewEndpoint(parentCtx context.Context, metadata *datalayer.EndpointMetadata, ds datalayer.PoolInfo) datalayer.Endpoint {
pm := &podMetrics{
pmc: f.pmc,
ds: ds,
interval: f.refreshMetricsInterval,
startOnce: sync.Once{},
stopOnce: sync.Once{},
done: make(chan struct{}),
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
logger: log.FromContext(parentCtx).WithValues("endpoint", metadata.NamespacedName),
}
pm.pod.Store(pod)
pm.metadata.Store(metadata)
pm.metrics.Store(NewMetricsState())

pm.startRefreshLoop(parentCtx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/backend/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

type Pod = datalayer.PodInfo
type Pod = datalayer.EndpointMetadata
28 changes: 14 additions & 14 deletions pkg/epp/controller/inferencepool_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
endpointPool1 := pool.InferencePoolToEndpointPool(pool1)
if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: endpointPool1, wantEndpoints: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -141,7 +141,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 := pool.InferencePoolToEndpointPool(newPool1)
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -157,7 +157,7 @@ func TestInferencePoolReconciler(t *testing.T) {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
newEndpointPool1 = pool.InferencePoolToEndpointPool(newPool1)
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantPool: newEndpointPool1, wantEndpoints: []string{"pod5-rank-0"}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}

Expand All @@ -171,14 +171,14 @@ func TestInferencePoolReconciler(t *testing.T) {
if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil {
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
}
if diff := diffStore(ds, diffStoreParams{wantPods: []string{}}); diff != "" {
if diff := diffStore(ds, diffStoreParams{wantEndpoints: []string{}}); diff != "" {
t.Errorf("Unexpected diff (+got/-want): %s", diff)
}
}

type diffStoreParams struct {
wantPool *datalayer.EndpointPool
wantPods []string
wantEndpoints []string
wantObjectives []*v1alpha2.InferenceObjective
}

Expand All @@ -189,15 +189,15 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
}

// Default wantPods if not set because PodGetAll returns an empty slice when empty.
if params.wantPods == nil {
params.wantPods = []string{}
if params.wantEndpoints == nil {
params.wantEndpoints = []string{}
}
gotPods := []string{}
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
gotEndpoints := []string{}
for _, em := range datastore.PodList(backendmetrics.AllPodsPredicate) {
gotEndpoints = append(gotEndpoints, em.GetMetadata().NamespacedName.Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "pods:" + diff
if diff := cmp.Diff(params.wantEndpoints, gotEndpoints, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "endpoints:" + diff
}

// Default wantModels if not set because ModelGetAll returns an empty slice when empty.
Expand Down Expand Up @@ -343,8 +343,8 @@ func xDiffStore(datastore datastore.Datastore, params xDiffStoreParams) string {
params.wantPods = []string{}
}
gotPods := []string{}
for _, pm := range datastore.PodList(backendmetrics.AllPodsPredicate) {
gotPods = append(gotPods, pm.GetPod().NamespacedName.Name)
for _, em := range datastore.PodList(backendmetrics.AllPodsPredicate) {
gotPods = append(gotPods, em.GetMetadata().NamespacedName.Name)
}
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {
return "pods:" + diff
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/controller/pod_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func TestPodReconciler(t *testing.T) {
}

var gotPods []*corev1.Pod
for _, pm := range store.PodList(backendmetrics.AllPodsPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pm.GetPod().PodName, Namespace: pm.GetPod().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: pm.GetPod().GetIPAddress()}}
for _, em := range store.PodList(backendmetrics.AllPodsPredicate) {
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: em.GetMetadata().PodName, Namespace: em.GetMetadata().NamespacedName.Namespace}, Status: corev1.PodStatus{PodIP: em.GetMetadata().GetIPAddress()}}
gotPods = append(gotPods, pod)
}
if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b *corev1.Pod) bool { return a.Name < b.Name })) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/datalayer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
started := false

c.startOnce.Do(func() {
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetMetadata().GetIPAddress())
c.ctx, c.cancel = context.WithCancel(ctx)
started = true
ready = make(chan struct{})
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/datalayer/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func (d *DummySource) Collect(ctx context.Context, ep Endpoint) error {
}

func defaultEndpoint() Endpoint {
pod := &PodInfo{
meta := &EndpointMetadata{
NamespacedName: types.NamespacedName{
Name: "pod-name",
Namespace: "default",
},
Address: "1.2.3.4:5678",
}
ms := NewEndpoint(pod, nil)
ms := NewEndpoint(meta, nil)
return ms
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/epp/datalayer/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"sync/atomic"
)

// EndpointPodState allows management of the Pod related attributes.
type EndpointPodState interface {
GetPod() *PodInfo
UpdatePod(*PodInfo)
// EndpointMetaState allows management of the EndpointMetadata related attributes.
type EndpointMetaState interface {
GetMetadata() *EndpointMetadata
UpdateMetadata(*EndpointMetadata)
GetAttributes() *Attributes
}

Expand All @@ -37,45 +37,45 @@ type EndpointMetricsState interface {
// Endpoint represents an inference serving endpoint and its related attributes.
type Endpoint interface {
fmt.Stringer
EndpointPodState
EndpointMetaState
EndpointMetricsState
AttributeMap
}

// ModelServer is an implementation of the Endpoint interface.
type ModelServer struct {
pod atomic.Pointer[PodInfo]
pod atomic.Pointer[EndpointMetadata]
metrics atomic.Pointer[Metrics]
attributes *Attributes
}

// NewEndpoint returns a new ModelServer with the given PodInfo and Metrics.
func NewEndpoint(pod *PodInfo, metrics *Metrics) *ModelServer {
if pod == nil {
pod = &PodInfo{}
// NewEndpoint returns a new ModelServer with the given EndpointMetadata and Metrics.
func NewEndpoint(meta *EndpointMetadata, metrics *Metrics) *ModelServer {
if meta == nil {
meta = &EndpointMetadata{}
}
if metrics == nil {
metrics = NewMetrics()
}
ep := &ModelServer{
attributes: NewAttributes(),
}
ep.UpdatePod(pod)
ep.UpdateMetadata(meta)
ep.UpdateMetrics(metrics)
return ep
}

// String returns a representation of the ModelServer. For brevity, only names of
// extended attributes are returned and not their values.
func (srv *ModelServer) String() string {
return fmt.Sprintf("Pod: %v; Metrics: %v; Attributes: %v", srv.GetPod(), srv.GetMetrics(), srv.Keys())
return fmt.Sprintf("Metadata: %v; Metrics: %v; Attributes: %v", srv.GetMetadata(), srv.GetMetrics(), srv.Keys())
}

func (srv *ModelServer) GetPod() *PodInfo {
func (srv *ModelServer) GetMetadata() *EndpointMetadata {
return srv.pod.Load()
}

func (srv *ModelServer) UpdatePod(pod *PodInfo) {
func (srv *ModelServer) UpdateMetadata(pod *EndpointMetadata) {
srv.pod.Store(pod)
}

Expand Down
Loading