Skip to content

Commit

Permalink
add aeraki health ready
Browse files Browse the repository at this point in the history
Signed-off-by: chentanjun <[email protected]>
  • Loading branch information
tanjunchen authored and zhaohuabing committed Jun 10, 2023
1 parent 4b8f210 commit c884f7d
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 36 deletions.
1 change: 1 addition & 0 deletions cmd/aeraki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func main() {
"Generate Envoy Filters in the service namespace")
flag.StringVar(&args.KubeDomainSuffix, "domain", defaultKubernetesDomain, "Kubernetes DNS domain suffix")
flag.StringVar(&args.HTTPSAddr, "httpsAddr", ":15017", "validation service HTTPS address")
flag.StringVar(&args.HTTPAddr, "httpAddr", ":8080", "Aeraki readiness service HTTP address")

flag.Parse()
if args.ServerID == "" {
Expand Down
3 changes: 3 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,8 @@ ARG AERAKI_ROOT_BIN_DIR
ARG ARCH
ARG OS

RUN apk update && \
apk add curl

COPY ${AERAKI_ROOT_BIN_DIR}/${ARCH}/${OS}/aeraki /usr/local/bin/
ENTRYPOINT /usr/local/bin/aeraki
17 changes: 17 additions & 0 deletions k8s/aeraki.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ spec:
image: ${AERAKI_IMAGE}:${AERAKI_TAG}
# imagePullPolicy should be set to Never so Minikube can use local image for e2e testing
imagePullPolicy: ${AERAKI_IMG_PULL_POLICY}
ports:
- containerPort: 8080
protocol: TCP
- containerPort: 15010
protocol: TCP
- containerPort: 15017
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /ready
port: 8080
scheme: HTTP
initialDelaySeconds: 1
periodSeconds: 3
successThreshold: 1
timeoutSeconds: 5
resources:
requests:
memory: "1Gi"
Expand Down
30 changes: 16 additions & 14 deletions manifests/charts/aeraki/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ spec:
- name: {{ .Chart.Name }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- containerPort: 8080
protocol: TCP
- containerPort: 15010
protocol: TCP
- containerPort: 15017
protocol: TCP
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 1
periodSeconds: 3
timeoutSeconds: 5
resources:
{ { - toYaml .Values.resources | nindent 12 } }
env:
- name: AERAKI_IS_MASTER
value: {{ .Values.AERAKI_ENV.AERAKI_IS_MASTER }}
Expand Down Expand Up @@ -71,20 +87,6 @@ spec:
- name: istiod-ca-cert
mountPath: /var/run/secrets/istio
readOnly: true
resources:
{{- toYaml .Values.resources | nindent 12 }}
# ports:
# - name: http
# containerPort: 80
# protocol: TCP
# livenessProbe:
# httpGet:
# path: /
# port: http
# readinessProbe:
# httpGet:
# path: /
# port: http
volumes:
- name: istiod-ca-cert
configMap:
Expand Down
16 changes: 8 additions & 8 deletions pkg/bootstrap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (

// AerakiArgs provides all of the configuration parameters for the Aeraki service.
type AerakiArgs struct {
Master bool
IstiodAddr string
AerakiXdsAddr string
AerakiXdsPort string
PodName string
IstioConfigMapName string
// The listening address for HTTPS (webhooks).
HTTPSAddr string
Master bool
IstiodAddr string
AerakiXdsAddr string
AerakiXdsPort string
PodName string
IstioConfigMapName string
HTTPSAddr string // The listening address for HTTPS (webhooks).
HTTPAddr string // The listening address for HTTP (health).
RootNamespace string
ClusterID string
ConfigStoreSecret string
Expand Down
96 changes: 82 additions & 14 deletions pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net"
"net/http"
"sync"
"sync/atomic"
"time"

//nolint: gosec
_ "net/http/pprof" // pprof
Expand All @@ -48,6 +50,7 @@ import (
"github.com/aeraki-mesh/aeraki/pkg/envoyfilter"
"github.com/aeraki-mesh/aeraki/pkg/leaderelection"
"github.com/aeraki-mesh/aeraki/pkg/model/protocol"
"github.com/aeraki-mesh/aeraki/pkg/util"
"github.com/aeraki-mesh/aeraki/pkg/xds"
"github.com/aeraki-mesh/aeraki/plugin/dubbo"
"github.com/aeraki-mesh/aeraki/plugin/redis"
Expand All @@ -57,6 +60,9 @@ var (
aerakiLog = log.RegisterScope("aeraki-server", "aeraki-server debugging", 0)
)

// readinessProbe defines a function that will be used indicate whether a server is ready.
type readinessProbe func() bool

// Server contains the runtime configuration for the Aeraki service.
type Server struct {
args *AerakiArgs
Expand All @@ -75,6 +81,13 @@ type Server struct {
istiodCert *tls.Certificate
CABundle *bytes.Buffer
stopControllers func()
// serverReady indicates server is ready to process requests.
serverReady atomic.Bool
readinessProbes map[string]readinessProbe
// httpMux listens on the httpAddr (8080).
// monitoring and readiness Server.
httpServer *http.Server
httpMux *http.ServeMux

// internalStop is closed when the server is shutdown. This should be avoided as much as possible, in
// favor of AddStartFunc. This is only required if we *must* start something outside of this process.
Expand Down Expand Up @@ -115,7 +128,6 @@ func NewServer(args *AerakiArgs) (*Server, error) {
})
// xdsServer is the RDS server for metaProtocol proxy
xdsServer := xds.NewServer(args.AerakiXdsPort, routeCacheMgr)

// crdCtrlMgr watches Aeraki CRDs, such as MetaRouter, ApplicationProtocol, etc.
scalableCtrlMgr, err := createScalableControllers(args, kubeConfig, envoyFilterController, routeCacheMgr)
if err != nil {
Expand All @@ -125,12 +137,10 @@ func NewServer(args *AerakiArgs) (*Server, error) {
routeCacheMgr.MetaRouterControllerClient = scalableCtrlMgr.GetClient()
// envoyFilterController uses controller manager client to get the rate limit configuration in MetaRouters
envoyFilterController.MetaRouterControllerClient = scalableCtrlMgr.GetClient()

// todo replace config with cached client
cfg := scalableCtrlMgr.GetConfig()
args.Protocols[protocol.Dubbo] = dubbo.NewGenerator(scalableCtrlMgr.GetConfig())
args.Protocols[protocol.Redis] = redis.New(cfg, configController.Store)

// singletonCtrlMgr
singletonCtrlMgr, err := createSingletonControllers(args, kubeConfig)
if err != nil {
Expand All @@ -145,6 +155,7 @@ func NewServer(args *AerakiArgs) (*Server, error) {
xdsCacheMgr: routeCacheMgr,
xdsServer: xdsServer,
internalStop: make(chan struct{}),
readinessProbes: make(map[string]readinessProbe),
}
if err := server.initKubeClient(); err != nil {
return nil, fmt.Errorf("error initializing kube client: %v", err)
Expand All @@ -162,9 +173,59 @@ func NewServer(args *AerakiArgs) (*Server, error) {
envoyFilterController.ConfigUpdated(model.EventUpdate)
})
envoyFilterController.InitMeshConfig(server.configMapWatcher)
server.initAerakiServer(args)
return server, err
}

func (s *Server) initAerakiServer(args *AerakiArgs) {
// make sure we have a readiness probe before serving HTTP to avoid marking ready too soon
s.initReadinessProbes()
s.initServers(args)
// Readiness Handler.
s.httpMux.HandleFunc("/ready", s.aerakiReadyHandler)
}

// aerakiReadyHandler handler readiness event
func (s *Server) aerakiReadyHandler(w http.ResponseWriter, _ *http.Request) {
for name, fn := range s.readinessProbes {
if ready := fn(); !ready {
log.Warnf("%s is not ready", name)
w.WriteHeader(http.StatusServiceUnavailable)
return
}
}
w.WriteHeader(http.StatusOK)
}

func (s *Server) initReadinessProbes() {
probes := map[string]readinessProbe{
"aeraki": func() bool {
return s.serverReady.Load()
},
}
for name, probe := range probes {
s.addReadinessProbe(name, probe)
}
}

// adds a readiness probe for Aeraki Server.
func (s *Server) addReadinessProbe(name string, fn readinessProbe) {
s.readinessProbes[name] = fn
}

// initHttpServer init servers
func (s *Server) initServers(args *AerakiArgs) {
aerakiLog.Info("initializing HTTP server for aeraki")
s.httpMux = http.NewServeMux()
s.httpServer = &http.Server{
Addr: args.HTTPAddr,
Handler: s.httpMux,
WriteTimeout: 30 * time.Second,
ReadTimeout: 30 * time.Second,
ReadHeaderTimeout: 30 * time.Second,
}
}

// These controllers are horizontally scalable, multiple instances can be deployed to share the load
func createScalableControllers(args *AerakiArgs, kubeConfig *rest.Config,
envoyFilterController *envoyfilter.Controller, xdsCacheMgr *xds.CacheMgr) (manager.Manager, error) {
Expand Down Expand Up @@ -299,25 +360,32 @@ func (s *Server) Start(stop <-chan struct{}) {
}
go func() {
log.Infof("starting webhook service at %s", httpsListener.Addr())
if err := s.httpsServer.ServeTLS(httpsListener, "", ""); isUnexpectedListenerError(err) {
if err := s.httpsServer.ServeTLS(httpsListener, "", ""); util.IsUnexpectedListenerError(err) {
log.Errorf("error serving https server: %v", err)
}
}()

if err = s.serveHTTP(); err != nil {
aerakiLog.Errorf("failed to http server: %v", err)
}
s.serverReady.Store(true)
s.waitForShutdown(stop)
}

func isUnexpectedListenerError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, net.ErrClosed) {
return false
}
if errors.Is(err, http.ErrServerClosed) {
return false
// serveHTTP starts Http Listener so that it can respond to readiness events.
func (s *Server) serveHTTP() error {
log.Infof("starting HTTP service at %s", s.httpServer.Addr)
httpListener, err := net.Listen("tcp", s.httpServer.Addr)
if err != nil {
return err
}
return true
go func() {
log.Infof("starting HTTP service at %s", httpListener.Addr())
if err := s.httpServer.Serve(httpListener); util.IsUnexpectedListenerError(err) {
log.Errorf("error serving http server: %v", err)
}
}()
return nil
}

// Wait for the stop, and do cleanups
Expand Down
35 changes: 35 additions & 0 deletions pkg/util/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright Aeraki Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"errors"
"net"
"net/http"
)

// IsUnexpectedListenerError handles the error returned by the listener
func IsUnexpectedListenerError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, net.ErrClosed) {
return false
}
if errors.Is(err, http.ErrServerClosed) {
return false
}
return true
}

0 comments on commit c884f7d

Please sign in to comment.