Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency safe client factory #4950

Merged
merged 6 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/k0sproject/k0s/pkg/token"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/rest"
)

type command config.CLIOptions
Expand Down Expand Up @@ -162,7 +163,20 @@ func (c *command) start(ctx context.Context) error {
}()

// common factory to get the admin kube client that's needed in many components
adminClientFactory := kubernetes.NewAdminClientFactory(c.K0sVars.AdminKubeConfigPath)
adminClientFactory := &kubernetes.ClientFactory{LoadRESTConfig: func() (*rest.Config, error) {
config, err := kubernetes.ClientConfig(kubernetes.KubeconfigFromFile(c.K0sVars.AdminKubeConfigPath))
if err != nil {
return nil, err
}

// We're always running the client on the same host as the API, no need to compress
config.DisableCompression = true
// To mitigate stack applier bursts in startup
config.QPS = 40.0
config.Burst = 400.0

return config, nil
}}

certificateManager := certificate.Manager{K0sVars: c.K0sVars}

Expand Down Expand Up @@ -561,7 +575,11 @@ func (c *command) start(ctx context.Context) error {
EnableWorker: c.EnableWorker,
})

apClientFactory, err := apclient.NewClientFactory(adminClientFactory.GetRESTConfig())
restConfig, err := adminClientFactory.GetRESTConfig()
if err != nil {
return err
}
apClientFactory, err := apclient.NewClientFactory(restConfig)
if err != nil {
return err
}
Expand Down
36 changes: 18 additions & 18 deletions internal/testutil/kube_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package testutil

import (
"errors"
"reflect"
"strings"

"github.com/k0sproject/k0s/internal/testutil/fakeclient"
k0sclientset "github.com/k0sproject/k0s/pkg/client/clientset"
k0sfake "github.com/k0sproject/k0s/pkg/client/clientset/fake"
k0sscheme "github.com/k0sproject/k0s/pkg/client/clientset/scheme"
etcdv1beta1 "github.com/k0sproject/k0s/pkg/client/clientset/typed/etcd/v1beta1"
Expand Down Expand Up @@ -69,20 +69,18 @@ func NewFakeClientFactory(objects ...runtime.Object) *FakeClientFactory {
k0sClients := fakeclient.NewClientset[k0sfake.Clientset](fakeDiscovery, tracker)

return &FakeClientFactory{
DynamicClient: fakeDynamic,
Client: kubeClients,
DiscoveryClient: memory.NewMemCacheClient(fakeDiscovery),
ConfigClient: k0sClients.K0sV1beta1().ClusterConfigs(constant.ClusterConfigNamespace),
EtcdMemberClient: k0sClients.EtcdV1beta1().EtcdMembers(),
DynamicClient: fakeDynamic,
Client: kubeClients,
DiscoveryClient: memory.NewMemCacheClient(fakeDiscovery),
K0sClient: k0sClients,
}
}

type FakeClientFactory struct {
DynamicClient *dynamicfake.FakeDynamicClient
Client kubernetes.Interface
DiscoveryClient discovery.CachedDiscoveryInterface
ConfigClient k0sv1beta1.ClusterConfigInterface
EtcdMemberClient etcdv1beta1.EtcdMemberInterface
DynamicClient *dynamicfake.FakeDynamicClient
Client kubernetes.Interface
DiscoveryClient discovery.CachedDiscoveryInterface
K0sClient k0sclientset.Interface
}

func (f *FakeClientFactory) GetClient() (kubernetes.Interface, error) {
Expand All @@ -97,20 +95,22 @@ func (f *FakeClientFactory) GetDiscoveryClient() (discovery.CachedDiscoveryInter
return f.DiscoveryClient, nil
}

func (f *FakeClientFactory) GetConfigClient() (k0sv1beta1.ClusterConfigInterface, error) {
return f.ConfigClient, nil
func (f *FakeClientFactory) GetK0sClient() (k0sclientset.Interface, error) {
return f.K0sClient, nil
}

func (f *FakeClientFactory) GetRESTClient() (rest.Interface, error) {
return nil, errors.ErrUnsupported
// Deprecated: Use [FakeClientFactory.GetK0sClient] instead.
func (f *FakeClientFactory) GetConfigClient() (k0sv1beta1.ClusterConfigInterface, error) {
return f.K0sClient.K0sV1beta1().ClusterConfigs(constant.ClusterConfigNamespace), nil
}

func (f FakeClientFactory) GetRESTConfig() *rest.Config {
return &rest.Config{}
func (f FakeClientFactory) GetRESTConfig() (*rest.Config, error) {
panic("GetRESTConfig not implemented for FakeClientFactory")
}

// Deprecated: Use [FakeClientFactory.GetK0sClient] instead.
func (f FakeClientFactory) GetEtcdMemberClient() (etcdv1beta1.EtcdMemberInterface, error) {
return f.EtcdMemberClient, nil
return f.K0sClient.EtcdV1beta1().EtcdMembers(), nil
}

// Extracts all kinds from scheme and builds API resource lists for fake discovery clients.
Expand Down
6 changes: 5 additions & 1 deletion pkg/applier/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error {

// waitForCRD waits 5 seconds for a CRD to become established on a best-effort basis.
func (s *Stack) waitForCRD(ctx context.Context, crdName string) {
client, err := extensionsclient.NewForConfig(s.Clients.GetRESTConfig())
config, err := s.Clients.GetRESTConfig()
if err != nil {
return
}
client, err := extensionsclient.NewForConfig(config)
if err != nil {
return
}
Expand Down
17 changes: 12 additions & 5 deletions pkg/autopilot/checks/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ func CanUpdate(ctx context.Context, log logrus.FieldLogger, clientFactory kubern
}
}

metaClient, err := metadata.NewForConfig(clientFactory.GetRESTConfig())
if err != nil {
return err
}

var metaClient metadata.Interface
for _, r := range resources {
gv, err := schema.ParseGroupVersion(r.GroupVersion)
if err != nil {
Expand All @@ -69,6 +65,17 @@ func CanUpdate(ctx context.Context, log logrus.FieldLogger, clientFactory kubern
continue
}

if metaClient == nil {
restConfig, err := clientFactory.GetRESTConfig()
if err != nil {
return err
}

if metaClient, err = metadata.NewForConfig(restConfig); err != nil {
return err
}
}

metas, err := metaClient.Resource(gv.WithResource(ar.Name)).
Namespace(metav1.NamespaceAll).
List(ctx, metav1.ListOptions{})
Expand Down
6 changes: 5 additions & 1 deletion pkg/component/controller/autopilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func (a *Autopilot) Init(ctx context.Context) error {
func (a *Autopilot) Start(ctx context.Context) error {
log := logrus.WithFields(logrus.Fields{"component": "autopilot"})

autopilotClientFactory, err := apcli.NewClientFactory(a.AdminClientFactory.GetRESTConfig())
restConfig, err := a.AdminClientFactory.GetRESTConfig()
if err != nil {
return fmt.Errorf("creating autopilot client factory error: %w", err)
}
autopilotClientFactory, err := apcli.NewClientFactory(restConfig)
if err != nil {
return fmt.Errorf("creating autopilot client factory error: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/component/controller/etcd_member_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ func (e *EtcdMemberReconciler) Stop() error {
}

func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error {
rc := e.clientFactory.GetRESTConfig()

rc, err := e.clientFactory.GetRESTConfig()
if err != nil {
return err
}
ec, err := extclient.NewForConfig(rc)
if err != nil {
return err
Expand Down
14 changes: 10 additions & 4 deletions pkg/component/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/manager"
"github.com/k0sproject/k0s/pkg/config"
"github.com/k0sproject/k0s/pkg/kubernetes"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -60,15 +61,20 @@ var _ manager.Component = (*Metrics)(nil)
var _ manager.Reconciler = (*Metrics)(nil)

// NewMetrics creates new Metrics reconciler
func NewMetrics(k0sVars *config.CfgVars, saver manifestsSaver, clientCF kubernetes.ClientFactoryInterface, storageType v1beta1.StorageType) (*Metrics, error) {
func NewMetrics(k0sVars *config.CfgVars, saver manifestsSaver, clientCF kubeutil.ClientFactoryInterface, storageType v1beta1.StorageType) (*Metrics, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}

restClient, err := clientCF.GetRESTClient()
if err != nil {
var restClient rest.Interface
if client, err := clientCF.GetDiscoveryClient(); err != nil {
return nil, fmt.Errorf("error getting REST client for metrics: %w", err)
} else {
restClient = client.RESTClient()
}
if restClient == nil {
return nil, errors.New("no REST client for metrics")
}

return &Metrics{
Expand Down
Loading
Loading