44 "context"
55 "crypto/x509"
66 "fmt"
7+ "net/url"
78 "os"
89 "path/filepath"
910 "strings"
@@ -18,12 +19,17 @@ import (
1819 "k8s.io/apimachinery/pkg/util/sets"
1920 "k8s.io/apimachinery/pkg/util/wait"
2021 "k8s.io/apimachinery/pkg/watch"
22+ "k8s.io/client-go/informers"
2123 "k8s.io/client-go/kubernetes"
2224 "k8s.io/client-go/rest"
2325 "k8s.io/client-go/tools/cache"
2426 "k8s.io/client-go/tools/clientcmd"
2527 clientwatch "k8s.io/client-go/tools/watch"
28+ "k8s.io/utils/ptr"
2629
30+ configv2 "github.com/aws/aws-sdk-go-v2/config"
31+ elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
32+ elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types"
2733 configv1 "github.com/openshift/api/config/v1"
2834 configclient "github.com/openshift/client-go/config/clientset/versioned"
2935 configinformers "github.com/openshift/client-go/config/informers/externalversions"
@@ -63,6 +69,12 @@ var SkipPasswordPrintFlag bool
6369// WaitForInstallComplete waits for cluster to complete installation, checks for operator stability
6470// and logs cluster information when successful.
6571func WaitForInstallComplete (ctx context.Context , config * rest.Config , assetstore asset.Store ) error {
72+ // FIXME: Register the worker nodes to target group of ingress LB.
73+ // Remove after CCM support dualstack NLB.
74+ if err := waitForWorkerNodesAvailability (ctx , config , assetstore ); err != nil {
75+ return err
76+ }
77+
6678 if err := waitForInitializedCluster (ctx , config , assetstore ); err != nil {
6779 return err
6880 }
@@ -83,6 +95,158 @@ func WaitForInstallComplete(ctx context.Context, config *rest.Config, assetstore
8395 return logComplete (RootOpts .Dir , consoleURL )
8496}
8597
98+ // waitForWorkerNodesAvailability waits for worker nodes to be running and register them with the TargetGroup of the ingress NLB.
99+ // NOTE: This should be handled by the CCM, not the installer.
100+ func waitForWorkerNodesAvailability (ctx context.Context , config * rest.Config , assetstore asset.Store ) error {
101+ timer .StartTimer ("CCM: Worker nodes Available" )
102+ icAsset , err := assetstore .Load (& installconfig.InstallConfig {})
103+ if err != nil {
104+ return fmt .Errorf ("failed to load installconfig: %w" , err )
105+ }
106+ if icAsset == nil {
107+ return fmt .Errorf ("failed to installconfig: received nil" )
108+ }
109+
110+ ic := icAsset .(* installconfig.InstallConfig ).Config
111+ region := ic .Platform .AWS .Region
112+
113+ // Nothing to do!
114+ if ic .Platform .AWS == nil {
115+ return nil
116+ }
117+
118+ // FIXME: Ignore edge compute pool
119+ numOfNodes := ptr .Deref (ic .Compute [0 ].Replicas , 0 ) + ptr .Deref (ic .ControlPlane .Replicas , 0 )
120+ if numOfNodes == 0 {
121+ // nothing to do
122+ return nil
123+ }
124+
125+ nodeCheckDuration := 10 * time .Minute
126+ nodeContext , cancel := context .WithTimeout (ctx , nodeCheckDuration )
127+ defer cancel ()
128+
129+ untilTime := time .Now ().Add (nodeCheckDuration )
130+ timezone , _ := untilTime .Zone ()
131+ logrus .Infof ("CCM: Waiting up to %v (until %v %s) to ensure worker nodes are available and registered with ingress LB..." ,
132+ nodeCheckDuration , untilTime .Format (time .Kitchen ), timezone )
133+
134+ cc , err := kubernetes .NewForConfig (config )
135+ if err != nil {
136+ return fmt .Errorf ("failed to create a config client: %w" , err )
137+ }
138+ configInformers := informers .NewSharedInformerFactory (cc , 0 )
139+ nodeInformer := configInformers .Core ().V1 ().Nodes ().Informer ()
140+ nodeLister := configInformers .Core ().V1 ().Nodes ().Lister ()
141+ configInformers .Start (ctx .Done ())
142+ if ! cache .WaitForCacheSync (ctx .Done (), nodeInformer .HasSynced ) {
143+ return fmt .Errorf ("informers never started" )
144+ }
145+
146+ // Create clients to call AWS API
147+ // FIXME: Let's ignore the custom endpoints for now
148+ cfg , err := configv2 .LoadDefaultConfig (ctx , configv2 .WithRegion (region ))
149+ if err != nil {
150+ return fmt .Errorf ("failed to load AWS config: %w" , err )
151+ }
152+ elbv2Client := elbv2 .NewFromConfig (cfg )
153+
154+ waitErr := wait .PollUntilContextCancel (nodeContext , 1 * time .Second , true , func (ctx context.Context ) (done bool , err error ) {
155+ // If the expected number of nodes are running, proceed.
156+ // Otherwise, requeue.
157+ nodes , err := nodeLister .List (labels .Everything ())
158+ if err != nil {
159+ return false , fmt .Errorf ("failed to get nodes: %w" , err )
160+ }
161+ if len (nodes ) < int (numOfNodes ) {
162+ return false , nil
163+ }
164+
165+ // Convert nodes to EC2 instance IDs
166+ var instanceIDs []string
167+ for _ , node := range nodes {
168+ url , err := url .Parse (node .Spec .ProviderID )
169+ if err != nil {
170+ return false , fmt .Errorf ("invalid node provider ID (%s): %w" , node .Spec .ProviderID , err )
171+ }
172+ if url .Scheme != "aws" {
173+ return false , fmt .Errorf ("invalid scheme for AWS instance (%s)" , node .Spec .ProviderID )
174+ }
175+
176+ awsID := ""
177+ tokens := strings .Split (strings .Trim (url .Path , "/" ), "/" )
178+ // last token in the providerID is the aws resource ID for both EC2 and Fargate nodes
179+ if len (tokens ) > 0 {
180+ awsID = tokens [len (tokens )- 1 ]
181+ }
182+ instanceIDs = append (instanceIDs , awsID )
183+ }
184+
185+ // Get the NodePort service for default ingress
186+ // Reference: oc -n openshift-ingress get svc router-nodeport-default -o=wide
187+ svc , err := cc .CoreV1 ().Services ("openshift-ingress" ).Get (nodeContext , "router-nodeport-default" , metav1.GetOptions {})
188+ if err != nil {
189+ return false , fmt .Errorf ("failed to get service openshift-ingress/router-nodeport-default: %w" , err )
190+ }
191+
192+ // Check if nodes are already registered. If true, nothing to do.
193+ targetGrp , err := elbv2Client .DescribeTargetGroups (nodeContext , & elbv2.DescribeTargetGroupsInput {
194+ Names : []string {"ingress-secure" , "ingress-insecure" },
195+ })
196+ if err != nil {
197+ return false , fmt .Errorf ("failed to describe target group: %w" , err )
198+ }
199+
200+ for _ , tg := range targetGrp .TargetGroups {
201+ // Get registered targets by querying target health API.
202+ targetDesc , err := elbv2Client .DescribeTargetHealth (nodeContext , & elbv2.DescribeTargetHealthInput {
203+ TargetGroupArn : tg .TargetGroupArn ,
204+ })
205+ if err != nil {
206+ return false , fmt .Errorf ("failed to get target health for target group %s: %w" , * tg .TargetGroupArn , err )
207+ }
208+
209+ // No targets found. We will register nodes as targets.
210+ if len (targetDesc .TargetHealthDescriptions ) == 0 {
211+ var nodeport int32
212+ for _ , port := range svc .Spec .Ports {
213+ if port .Port == * tg .Port {
214+ nodeport = port .NodePort
215+ break
216+ }
217+ }
218+
219+ var targets []elbv2types.TargetDescription
220+ for _ , instanceID := range instanceIDs {
221+ targets = append (targets , elbv2types.TargetDescription {
222+ Port : & nodeport ,
223+ Id : & instanceID ,
224+ })
225+ }
226+
227+ // Register them and controlplane nodes with TargetGroup of ingres NLB.
228+ _ , err = elbv2Client .RegisterTargets (nodeContext , & elbv2.RegisterTargetsInput {
229+ TargetGroupArn : tg .TargetGroupArn ,
230+ Targets : targets ,
231+ })
232+ if err != nil {
233+ return false , fmt .Errorf ("failed to register nodes to ingress LB: %w" , err )
234+ }
235+ }
236+ }
237+
238+ return true , nil
239+ })
240+ if waitErr != nil {
241+ return fmt .Errorf ("failed to wait for worker node availability: %w" , waitErr )
242+ }
243+
244+ timer .StopTimer ("CCM: Worker nodes Available" )
245+
246+ logrus .Info ("CCM: Worker nodes are available and registered with ingress LB" )
247+ return nil
248+ }
249+
86250// waitForInitializedCluster watches the ClusterVersion waiting for confirmation
87251// that the cluster has been initialized.
88252func waitForInitializedCluster (ctx context.Context , config * rest.Config , assetstore asset.Store ) error {
0 commit comments