44 "context"
55 "crypto/x509"
66 "fmt"
7+ "net/url"
78 "os"
89 "path/filepath"
910 "strings"
@@ -18,12 +19,17 @@ import (
1819 "k8s.io/apimachinery/pkg/util/sets"
1920 "k8s.io/apimachinery/pkg/util/wait"
2021 "k8s.io/apimachinery/pkg/watch"
22+ "k8s.io/client-go/informers"
2123 "k8s.io/client-go/kubernetes"
2224 "k8s.io/client-go/rest"
2325 "k8s.io/client-go/tools/cache"
2426 "k8s.io/client-go/tools/clientcmd"
2527 clientwatch "k8s.io/client-go/tools/watch"
28+ "k8s.io/utils/ptr"
2629
30+ configv2 "github.com/aws/aws-sdk-go-v2/config"
31+ elbv2 "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
32+ elbv2types "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types"
2733 configv1 "github.com/openshift/api/config/v1"
2834 configclient "github.com/openshift/client-go/config/clientset/versioned"
2935 configinformers "github.com/openshift/client-go/config/informers/externalversions"
@@ -63,6 +69,12 @@ var SkipPasswordPrintFlag bool
6369// WaitForInstallComplete waits for cluster to complete installation, checks for operator stability
6470// and logs cluster information when successful.
6571func WaitForInstallComplete (ctx context.Context , config * rest.Config , assetstore asset.Store ) error {
72+ // FIXME: Register the worker nodes to target group of ingress LB.
73+ // Remove after CCM support dualstack NLB.
74+ if err := waitForWorkerNodesAvailability (ctx , config , assetstore ); err != nil {
75+ return err
76+ }
77+
6678 if err := waitForInitializedCluster (ctx , config , assetstore ); err != nil {
6779 return err
6880 }
@@ -83,6 +95,153 @@ func WaitForInstallComplete(ctx context.Context, config *rest.Config, assetstore
8395 return logComplete (RootOpts .Dir , consoleURL )
8496}
8597
98+ // waitForWorkerNodesAvailability waits for worker nodes to be running and register them with the TargetGroup of the ingress NLB.
99+ // NOTE: This should be handled by the CCM, not the installer.
100+ func waitForWorkerNodesAvailability (ctx context.Context , config * rest.Config , assetstore asset.Store ) error {
101+ timer .StartTimer ("CCM: Worker nodes Available" )
102+ icAsset , err := assetstore .Load (& installconfig.InstallConfig {})
103+ if err != nil {
104+ return fmt .Errorf ("failed to load installconfig: %w" , err )
105+ }
106+ if icAsset == nil {
107+ return fmt .Errorf ("failed to installconfig: received nil" )
108+ }
109+
110+ ic := icAsset .(* installconfig.InstallConfig ).Config
111+ region := ic .Platform .AWS .Region
112+
113+ // FIXME: Ignore edge compute pool
114+ numOfNodes := ptr .Deref (ic .Compute [0 ].Replicas , 0 ) + ptr .Deref (ic .ControlPlane .Replicas , 0 )
115+ if numOfNodes == 0 {
116+ // nothing to do
117+ return nil
118+ }
119+
120+ nodeCheckDuration := 10 * time .Minute
121+ nodeContext , cancel := context .WithTimeout (ctx , nodeCheckDuration )
122+ defer cancel ()
123+
124+ untilTime := time .Now ().Add (nodeCheckDuration )
125+ timezone , _ := untilTime .Zone ()
126+ logrus .Infof ("CCM: Waiting up to %v (until %v %s) to ensure worker nodes are available and registered with ingress LB..." ,
127+ nodeCheckDuration , untilTime .Format (time .Kitchen ), timezone )
128+
129+ cc , err := kubernetes .NewForConfig (config )
130+ if err != nil {
131+ return fmt .Errorf ("failed to create a config client: %w" , err )
132+ }
133+ configInformers := informers .NewSharedInformerFactory (cc , 0 )
134+ nodeInformer := configInformers .Core ().V1 ().Nodes ().Informer ()
135+ nodeLister := configInformers .Core ().V1 ().Nodes ().Lister ()
136+ configInformers .Start (ctx .Done ())
137+ if ! cache .WaitForCacheSync (ctx .Done (), nodeInformer .HasSynced ) {
138+ return fmt .Errorf ("informers never started" )
139+ }
140+
141+ // Create clients to call AWS API
142+ // FIXME: Let's ignore the custom endpoints for now
143+ cfg , err := configv2 .LoadDefaultConfig (ctx , configv2 .WithRegion (region ))
144+ if err != nil {
145+ return fmt .Errorf ("failed to load AWS config: %w" , err )
146+ }
147+ elbv2Client := elbv2 .NewFromConfig (cfg )
148+
149+ waitErr := wait .PollUntilContextCancel (nodeContext , 1 * time .Second , true , func (ctx context.Context ) (done bool , err error ) {
150+ // If the expected number of nodes are running, proceed.
151+ // Otherwise, requeue.
152+ nodes , err := nodeLister .List (labels .Everything ())
153+ if err != nil {
154+ return false , fmt .Errorf ("failed to get nodes: %w" , err )
155+ }
156+ if len (nodes ) < int (numOfNodes ) {
157+ return false , nil
158+ }
159+
160+ // Convert nodes to EC2 instance IDs
161+ var instanceIDs []string
162+ for _ , node := range nodes {
163+ url , err := url .Parse (node .Spec .ProviderID )
164+ if err != nil {
165+ return false , fmt .Errorf ("invalid node provider ID (%s): %w" , node .Spec .ProviderID , err )
166+ }
167+ if url .Scheme != "aws" {
168+ return false , fmt .Errorf ("invalid scheme for AWS instance (%s)" , node .Spec .ProviderID )
169+ }
170+
171+ awsID := ""
172+ tokens := strings .Split (strings .Trim (url .Path , "/" ), "/" )
173+ // last token in the providerID is the aws resource ID for both EC2 and Fargate nodes
174+ if len (tokens ) > 0 {
175+ awsID = tokens [len (tokens )- 1 ]
176+ }
177+ instanceIDs = append (instanceIDs , awsID )
178+ }
179+
180+ // Get the NodePort service for default ingress
181+ // Reference: oc -n openshift-ingress get svc router-nodeport-default -o=wide
182+ svc , err := cc .CoreV1 ().Services ("openshift-ingress" ).Get (nodeContext , "router-nodeport-default" , metav1.GetOptions {})
183+ if err != nil {
184+ return false , fmt .Errorf ("failed to get service openshift-ingress/router-nodeport-default: %w" , err )
185+ }
186+
187+ // Check if nodes are already registered. If true, nothing to do.
188+ targetGrp , err := elbv2Client .DescribeTargetGroups (nodeContext , & elbv2.DescribeTargetGroupsInput {
189+ Names : []string {"ingress-secure" , "ingress-insecure" },
190+ })
191+ if err != nil {
192+ return false , fmt .Errorf ("failed to describe target group: %w" , err )
193+ }
194+
195+ for _ , tg := range targetGrp .TargetGroups {
196+ // Get registered targets by querying target health API.
197+ targetDesc , err := elbv2Client .DescribeTargetHealth (nodeContext , & elbv2.DescribeTargetHealthInput {
198+ TargetGroupArn : tg .TargetGroupArn ,
199+ })
200+ if err != nil {
201+ return false , fmt .Errorf ("failed to get target health for target group %s: %w" , * tg .TargetGroupArn , err )
202+ }
203+
204+ // No targets found. We will register nodes as targets.
205+ if len (targetDesc .TargetHealthDescriptions ) == 0 {
206+ var nodeport int32
207+ for _ , port := range svc .Spec .Ports {
208+ if port .Port == * tg .Port {
209+ nodeport = port .NodePort
210+ break
211+ }
212+ }
213+
214+ var targets []elbv2types.TargetDescription
215+ for _ , instanceID := range instanceIDs {
216+ targets = append (targets , elbv2types.TargetDescription {
217+ Port : & nodeport ,
218+ Id : & instanceID ,
219+ })
220+ }
221+
222+ // Register them and controlplane nodes with TargetGroup of ingres NLB.
223+ _ , err = elbv2Client .RegisterTargets (nodeContext , & elbv2.RegisterTargetsInput {
224+ TargetGroupArn : tg .TargetGroupArn ,
225+ Targets : targets ,
226+ })
227+ if err != nil {
228+ return false , fmt .Errorf ("failed to register nodes to ingress LB: %w" , err )
229+ }
230+ }
231+ }
232+
233+ return true , nil
234+ })
235+ if waitErr != nil {
236+ return fmt .Errorf ("failed to wait for worker node availability: %w" , err )
237+ }
238+
239+ timer .StopTimer ("CCM: Worker nodes Available" )
240+
241+ logrus .Info ("CCM: Worker nodes are available and registered with ingress LB" )
242+ return nil
243+ }
244+
86245// waitForInitializedCluster watches the ClusterVersion waiting for confirmation
87246// that the cluster has been initialized.
88247func waitForInitializedCluster (ctx context.Context , config * rest.Config , assetstore asset.Store ) error {
0 commit comments