From 1b729b95e4e4affe97de34711d9b2c15d290bfda Mon Sep 17 00:00:00 2001 From: "xingchi.jin" Date: Fri, 15 Mar 2024 11:46:30 -0700 Subject: [PATCH 1/3] delegate-shell lib --- delegateshell/README.md | 43 +++ delegateshell/client/client.go | 127 +++++++++ delegateshell/client/http.go | 369 ++++++++++++++++++++++++++ delegateshell/delegate/config.go | 28 ++ delegateshell/delegate/token.go | 42 +++ delegateshell/delegate/token_cache.go | 52 ++++ delegateshell/example/main.go | 44 +++ delegateshell/heartbeat/keepalive.go | 148 +++++++++++ delegateshell/heartbeat/writer.go | 29 ++ delegateshell/logger/logger.go | 53 ++++ delegateshell/poller/events_poller.go | 111 ++++++++ go.mod | 20 +- go.sum | 43 +++ 13 files changed, 1108 insertions(+), 1 deletion(-) create mode 100644 delegateshell/README.md create mode 100644 delegateshell/client/client.go create mode 100644 delegateshell/client/http.go create mode 100644 delegateshell/delegate/config.go create mode 100644 delegateshell/delegate/token.go create mode 100644 delegateshell/delegate/token_cache.go create mode 100644 delegateshell/example/main.go create mode 100644 delegateshell/heartbeat/keepalive.go create mode 100644 delegateshell/heartbeat/writer.go create mode 100644 delegateshell/logger/logger.go create mode 100644 delegateshell/poller/events_poller.go diff --git a/delegateshell/README.md b/delegateshell/README.md new file mode 100644 index 0000000..e71480c --- /dev/null +++ b/delegateshell/README.md @@ -0,0 +1,43 @@ +# About + +A golang client which can interact and acquire tasks from the Harness manager (or any task management system which implements the same interface) + +delegate-shell library provides two utlities. +1. Register Runner with Runner Manager and sending heartbeats +2. Poll Runner task events + +The idea is `delegate-shell` library should be a standalone library. Runner can use this library to handle all lifecycle events and interations with Harness. + +# Usage +Example codes in `delegateshell/example` folder. + +A way to use this client would be: +1. Registeration & heartbeat +``` + // Create a manager http client + managerClient := client.New(...) + + keepAlive := heartbeat.New(..., managerClient) + + // Register & heartbeat + ctx := context.Background() + resp, _ := keepAlive.Register(ctx) +``` +2. Poll tasks +``` + requestsChan := make(chan *client.RunnerRequest, 3) + + // Start polling for events + eventsServer := poller.New(managerClient, requestsChan) + eventsServer.PollRunnerEvents(ctx, 3, info.ID, time.Second*10) + // next: process requests from 'requestsChan' +``` + +## TODOs +1. It uses register call for heartbeat. Maybe we should use polling tasks as heartbeat. This reduces network payload on Harness gateway significantly +2. thread pool abstraction should be there for handle thread&resource allocation&isolation +3. Create a top level package for logger. Use that everywhere +4. Shutdownhook is not there +5. CPU memory related rejecting tasks is not there +6. Add error library for all places +7. Central Config: implement a global context where it has all the delegate configurations diff --git a/delegateshell/client/client.go b/delegateshell/client/client.go new file mode 100644 index 0000000..1e18f93 --- /dev/null +++ b/delegateshell/client/client.go @@ -0,0 +1,127 @@ +package client + +import ( + "context" + "encoding/json" +) + +// TODO: Make the structs more generic and remove Harness specific stuff +type ( + // Taken from existing manager API + RegisterRequest struct { + AccountID string `json:"accountId,omitempty"` + RunnerName string `json:"delegateName,omitempty"` + // Token string `json:"delegateRandomToken,omitempty"` + LastHeartbeat int64 `json:"lastHeartBeat,omitempty"` + ID string `json:"delegateId,omitempty"` + Type string `json:"delegateType,omitempty"` + NG bool `json:"ng,omitempty"` + Polling bool `json:"pollingModeEnabled,omitempty"` // why Runner needs type ?? maybe should remove + HostName string `json:"hostName,omitempty"` + Connected bool `json:"connected,omitempty"` + KeepAlivePacket bool `json:"keepAlivePacket,omitempty"` + IP string `json:"ip,omitempty"` + Tags []string `json:"tags,omitempty"` + HeartbeatAsObject bool `json:"heartbeatAsObject,omitempty"` // TODO: legacy to remove + } + + // Used in the java codebase :'( + RegisterResponse struct { + Resource RegistrationData `json:"resource"` + } + + RegistrationData struct { + DelegateID string `json:"delegateId"` + } + + TaskEventsResponse struct { + TaskEvents []*TaskEvent `json:"delegateTaskEvents"` + } + + RunnerEvent struct { + AccountID string `json:"accountId"` + TaskID string `json:"taskId"` + RunnerType string `json:"runnerType"` + TaskType string `json:"taskType"` + } + + RunnerEventsResponse struct { + RunnerEvents []*RunnerEvent `json:"delegateRunnerEvents"` + } + + TaskEvent struct { + AccountID string `json:"accountId"` + TaskID string `json:"delegateTaskId"` + Sync bool `json:"sync"` + TaskType string `json:"taskType"` + } + + Task struct { + ID string `json:"id"` + Type string `json:"type"` + Data json.RawMessage `json:"data"` + Async bool `json:"async"` + Timeout int `json:"timeout"` + Logging LogInfo `json:"logging"` + DelegateInfo DelegateInfo `json:"delegate"` + Capabilities json.RawMessage `json:"capabilities"` + } + + LogInfo struct { + Token string `json:"token"` + Abstractions map[string]string `json:"abstractions"` + } + + DelegateInfo struct { + ID string `json:"id"` + InstanceID string `json:"instance_id"` + Token string `json:"token"` + } + + TaskResponse struct { + ID string `json:"id"` + Data json.RawMessage `json:"data"` + Type string `json:"type"` + Code string `json:"code"` // OK, FAILED, RETRY_ON_OTHER_DELEGATE + } + + DelegateCapacity struct { + MaxBuilds int `json:"maximumNumberOfBuilds"` + } + + RunnerAcquiredTasks struct { + Requests []*RunnerRequest `json:"requests"` + } + + RunnerRequest struct { + TaskId string `json:"id"` + AccountId string `json:"account_id"` + Task *RunnerTask `json:"task"` + Secrets []*RunnerTask `json:"secrets"` + } + + RunnerTask struct { + Type string `json:"type"` + Driver string `json:"driver"` + Data []byte `json:"data"` + Config []byte `json:"config"` + } +) + +// Client is an interface which defines methods on interacting with a task managing system. +type Client interface { + // Register registers the runner with the task server + Register(ctx context.Context, r *RegisterRequest) (*RegisterResponse, error) + + // Heartbeat pings the task server to let it know that the runner is still alive + Heartbeat(ctx context.Context, r *RegisterRequest) error + + // GetTaskEvents gets a list of pending tasks that need to be executed for this runner + GetRunnerEvents(ctx context.Context, delegateID string) (*RunnerEventsResponse, error) + + // Acquire tells the task server that the runner is ready to execute a task ID + GetExecutionPayload(ctx context.Context, delegateID, taskID string) (*RunnerAcquiredTasks, error) + + // SendStatus sends a response to the task server for a task ID + SendStatus(ctx context.Context, delegateID, taskID string, req *TaskResponse) error +} diff --git a/delegateshell/client/http.go b/delegateshell/client/http.go new file mode 100644 index 0000000..7a52709 --- /dev/null +++ b/delegateshell/client/http.go @@ -0,0 +1,369 @@ +package client + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" + + pb "google.golang.org/protobuf/proto" + + "github.com/cenkalti/backoff/v4" + "github.com/drone/go-task/delegateshell/delegate" + "github.com/sirupsen/logrus" + + "github.com/wings-software/dlite/logger" +) + +const ( + registerEndpoint = "/api/agent/delegates/register?accountId=%s" + heartbeatEndpoint = "/api/agent/delegates/heartbeat-with-polling?accountId=%s" + taskStatusEndpoint = "/api/agent/v2/tasks/%s/delegates/%s?accountId=%s" + runnerEventsPollEndpoint = "/api/agent/delegates/%s/runner-events?accountId=%s" + executionPayloadEndpoint = "/api/executions/%s/payload?delegateId=%s&accountId=%s&delegateInstanceId=%s" +) + +var ( + registerTimeout = 30 * time.Second + taskEventsTimeout = 60 * time.Second + sendStatusRetryTimes = 5 +) + +// defaultClient is the default http.Client. +var defaultClient = &http.Client{ + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + }, +} + +// New returns a new client. +func New(endpoint, id, secret string, skipverify bool, additionalCertsDir string) *HTTPClient { + return getClient(endpoint, id, "", delegate.NewTokenCache(id, secret), skipverify, additionalCertsDir) +} + +func NewFromToken(endpoint, id, token string, skipverify bool, additionalCertsDir string) *HTTPClient { + return getClient(endpoint, id, token, nil, skipverify, additionalCertsDir) +} + +func getClient(endpoint, id, token string, cache *delegate.TokenCache, skipverify bool, additionalCertsDir string) *HTTPClient { + log := logrus.New() + c := &HTTPClient{ + Logger: log, + Endpoint: endpoint, + SkipVerify: skipverify, + AccountID: id, + Client: defaultClient, + AccountTokenCache: cache, + Token: token, + } + if skipverify { + c.Client = &http.Client{ + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + }, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: skipverify, //nolint:gosec + }, + }, + } + } else if additionalCertsDir != "" { + // If additional certs are specified, we append them to the existing cert chain + + // Use the system certs if possible + rootCAs, _ := x509.SystemCertPool() + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + + log.Infof("additional certs dir to allow: %s\n", additionalCertsDir) + + files, err := os.ReadDir(additionalCertsDir) + if err != nil { + log.Errorf("could not read directory %s, error: %s", additionalCertsDir, err) + c.Client = clientWithRootCAs(skipverify, rootCAs) + return c + } + + // Go through all certs in this directory and add them to the global certs + for _, f := range files { + path := filepath.Join(additionalCertsDir, f.Name()) + log.Infof("trying to add certs at: %s to root certs\n", path) + // Create TLS config using cert PEM + rootPem, err := os.ReadFile(path) + if err != nil { + log.Errorf("could not read certificate file (%s), error: %s", path, err.Error()) + continue + } + // Append certs to the global certs + ok := rootCAs.AppendCertsFromPEM(rootPem) + if !ok { + log.Errorf("error adding cert (%s) to pool, please check format of the certs provided.", path) + continue + } + log.Infof("successfully added cert at: %s to root certs", path) + } + c.Client = clientWithRootCAs(skipverify, rootCAs) + } + return c +} + +func clientWithRootCAs(skipverify bool, rootCAs *x509.CertPool) *http.Client { + // Create the HTTP Client with certs + config := &tls.Config{ + //nolint:gosec + InsecureSkipVerify: skipverify, + } + if rootCAs != nil { + config.RootCAs = rootCAs + } + return &http.Client{ + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + }, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: config, + }, + } +} + +// An HTTPClient manages communication with the runner API. +type HTTPClient struct { + Client *http.Client + Logger logger.Logger + Endpoint string + AccountID string + AccountTokenCache *delegate.TokenCache + SkipVerify bool + Token string +} + +// Register registers the runner with the manager +func (p *HTTPClient) Register(ctx context.Context, r *RegisterRequest) (*RegisterResponse, error) { + req := r + resp := &RegisterResponse{} + path := fmt.Sprintf(registerEndpoint, p.AccountID) + _, err := p.retry(ctx, path, "POST", req, resp, createBackoff(ctx, registerTimeout), true) //nolint: bodyclose + return resp, err +} + +// Heartbeat sends a periodic heartbeat to the server +func (p *HTTPClient) Heartbeat(ctx context.Context, r *RegisterRequest) error { + req := r + path := fmt.Sprintf(heartbeatEndpoint, p.AccountID) + _, err := p.doJson(ctx, path, "POST", req, nil) + return err +} + +// GetRunnerEvents gets a list of events which can be executed on this runner +func (p *HTTPClient) GetRunnerEvents(ctx context.Context, id string) (*RunnerEventsResponse, error) { + path := fmt.Sprintf(runnerEventsPollEndpoint, id, p.AccountID) + events := &RunnerEventsResponse{} + _, err := p.doJson(ctx, path, "GET", nil, events) + return events, err +} + +// Acquire tries to acquire a specific task +func (p *HTTPClient) GetExecutionPayload(ctx context.Context, delegateID, taskID string) (*RunnerAcquiredTasks, error) { + path := fmt.Sprintf(executionPayloadEndpoint, taskID, delegateID, p.AccountID, delegateID) + payload := &RunnerAcquiredTasks{} + _, err := p.doJson(ctx, path, "GET", nil, payload) + if err != nil { + logrus.WithError(err).Error("Error making http call") + } + return payload, err +} + +// SendStatus updates the status of a task +func (p *HTTPClient) SendStatus(ctx context.Context, delegateID, taskID string, r *TaskResponse) error { + path := fmt.Sprintf(taskStatusEndpoint, taskID, delegateID, p.AccountID) + logrus.Info("response: ", path) + req := r + retryNumber := 0 + var err error + for retryNumber < sendStatusRetryTimes { + _, err = p.retry(ctx, path, "POST", req, nil, createBackoff(ctx, taskEventsTimeout), true) //nolint: bodyclose + if err == nil { + return nil + } + retryNumber++ + } + return err +} + +func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}, b backoff.BackOffContext, ignoreStatusCode bool) (*http.Response, error) { //nolint: unparam + for { + res, err := p.doJson(ctx, path, method, in, out) + // do not retry on Canceled or DeadlineExceeded + if ctxErr := ctx.Err(); ctxErr != nil { + p.logger().Errorf("http: context canceled") + return res, ctxErr + } + + duration := b.NextBackOff() + + if res != nil { + // Check the response code. We retry on 500-range + // responses to allow the server time to recover, as + // 500's are typically not permanent errors and may + // relate to outages on the server side. + if (ignoreStatusCode && err != nil) || res.StatusCode > 501 { + p.logger().Errorf("http: server error: re-connect and re-try: %s", err) + if duration == backoff.Stop { + p.logger().Errorf("max retry limit reached, task status won't be updated") + return nil, err + } + time.Sleep(duration) + continue + } + } else if err != nil { + p.logger().Errorf("http: request error: %s", err) + if duration == backoff.Stop { + p.logger().Errorf("max retry limit reached, task status won't be updated") + return nil, err + } + time.Sleep(duration) + continue + } + return res, err + } +} + +func (p *HTTPClient) doJson(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) { + var buf = &bytes.Buffer{} + // marshal the input payload into json format and copy + // to an io.ReadCloser. + if in != nil { + if err := json.NewEncoder(buf).Encode(in); err != nil { + p.logger().Errorf("could not encode input payload: %s", err) + } + } + res, body, err := p.do(ctx, path, method, buf) + if err != nil { + return res, err + } + if nil == out { + return res, nil + } + if jsonErr := json.Unmarshal(body, out); jsonErr != nil { + return res, jsonErr + } + + return res, nil +} + +func (p *HTTPClient) doProto(ctx context.Context, path, method string, in pb.Message, out pb.Message) (*http.Response, error) { + // marshal the input payload into proto format and copy + // to an io.ReadCloser. + input, err := pb.Marshal(in) + if err != nil { + return nil, err + } + buf := bytes.NewBuffer(input) + res, body, err := p.do(ctx, path, method, buf) + if err != nil { + return res, err + } + if nil == out { + return res, nil + } + if protoErr := pb.Unmarshal(body, out); protoErr != nil { + return res, protoErr + } + + return res, nil +} + +// do is a helper function that posts a signed http request with +// the input encoded and response decoded from json. +func (p *HTTPClient) do(ctx context.Context, path, method string, in *bytes.Buffer) (*http.Response, []byte, error) { + endpoint := p.Endpoint + path + req, err := http.NewRequest(method, endpoint, in) + if err != nil { + return nil, nil, err + } + req = req.WithContext(ctx) + + // the request should include the secret shared between + // the agent and server for authorization. + token := "" + if p.Token != "" { + token = p.Token + } else { + token, err = p.AccountTokenCache.Get() + if err != nil { + p.logger().Errorf("could not generate account token: %s", err) + return nil, nil, err + } + } + req.Header.Add("Authorization", "Delegate "+token) + req.Header.Add("Content-Type", "application/json") + res, err := p.Client.Do(req) + if res != nil { + defer func() { + // drain the response body so we can reuse + // this connection. + if _, err = io.Copy(io.Discard, io.LimitReader(res.Body, 4096)); err != nil { + p.logger().Errorf("could not drain response body: %s", err) + } + res.Body.Close() + }() + } + if err != nil { + return res, nil, err + } + + // if the response body return no content we exit + // immediately. We do not read or unmarshal the response + // and we do not return an error. + if res.StatusCode == 204 { + return res, nil, nil + } + + // else read the response body into a byte slice. + body, err := io.ReadAll(res.Body) + if err != nil { + return res, nil, err + } + + if res.StatusCode > 299 { + // if the response body includes an error message + // we should return the error string. + if len(body) != 0 { + return res, body, errors.New( + string(body), + ) + } + // if the response body is empty we should return + // the default status code text. + return res, body, errors.New( + http.StatusText(res.StatusCode), + ) + } + return res, body, nil +} + +// logger is a helper function that returns the default logger +// if a custom logger is not defined. +func (p *HTTPClient) logger() logger.Logger { + if p.Logger == nil { + return logger.Discard() + } + return p.Logger +} + +func createBackoff(ctx context.Context, maxElapsedTime time.Duration) backoff.BackOffContext { + exp := backoff.NewExponentialBackOff() + exp.MaxElapsedTime = maxElapsedTime + return backoff.WithContext(exp, ctx) +} diff --git a/delegateshell/delegate/config.go b/delegateshell/delegate/config.go new file mode 100644 index 0000000..7fa429d --- /dev/null +++ b/delegateshell/delegate/config.go @@ -0,0 +1,28 @@ +package delegate + +import ( + "github.com/kelseyhightower/envconfig" +) + +// Sample config +type Config struct { + Debug bool `envconfig:"DRONE_DEBUG"` + Trace bool `envconfig:"DRONE_TRACE"` + + Delegate struct { + AccountID string `envconfig:"DRONE_DELEGATE_ACCOUNT_ID"` + AccountSecret string `envconfig:"DRONE_DELEGATE_ACCOUNT_SECRET"` + ManagerEndpoint string `envconfig:"DRONE_DELEGATE_MANAGER_ENDPOINT"` + Name string `envconfig:"DRONE_DELEGATE_NAME"` + } +} + +func FromEnviron() (Config, error) { + var config Config + err := envconfig.Process("", &config) + if err != nil { + return config, err + } + + return config, nil +} diff --git a/delegateshell/delegate/token.go b/delegateshell/delegate/token.go new file mode 100644 index 0000000..67324e6 --- /dev/null +++ b/delegateshell/delegate/token.go @@ -0,0 +1,42 @@ +package delegate + +import ( + "encoding/hex" + "time" + + "github.com/google/uuid" + "gopkg.in/square/go-jose.v2" + "gopkg.in/square/go-jose.v2/jwt" +) + +// Token generates a token with the given expiry to interact with the Harness manager +func Token(audience, issuer, subject, secret string, expiry time.Duration) (string, error) { + bytes, err := hex.DecodeString(secret) + if err != nil { + return "", err + } + + enc, err := jose.NewEncrypter( + jose.A128GCM, + jose.Recipient{Algorithm: jose.DIRECT, Key: bytes}, + (&jose.EncrypterOptions{}).WithType("JWT"), + ) + if err != nil { + return "", err + } + + cl := jwt.Claims{ + Subject: subject, + Issuer: issuer, + Audience: []string{audience}, + Expiry: jwt.NewNumericDate(time.Now().Add(expiry)), + IssuedAt: jwt.NewNumericDate(time.Now()), + ID: uuid.New().String(), + } + raw, err := jwt.Encrypted(enc).Claims(cl).CompactSerialize() + if err != nil { + return "", err + } + + return raw, nil +} diff --git a/delegateshell/delegate/token_cache.go b/delegateshell/delegate/token_cache.go new file mode 100644 index 0000000..610fe2c --- /dev/null +++ b/delegateshell/delegate/token_cache.go @@ -0,0 +1,52 @@ +package delegate + +import ( + "time" + + "github.com/patrickmn/go-cache" + "github.com/sirupsen/logrus" +) + +var ( + audience = "audience" + issuer = "issuer" + expirationTime = 20 * time.Minute +) + +type TokenCache struct { + id string + secret string + expiry time.Duration + c *cache.Cache +} + +// NewTokenCache creates a token cache which creates a new token +// after the expiry time is over +func NewTokenCache(id, secret string) *TokenCache { + // purge expired tokens from the cache at expirationTime/3 intervals + c := cache.New(cache.DefaultExpiration, expirationTime/3) + return &TokenCache{ + id: id, + secret: secret, + expiry: expirationTime, + c: c, + } +} + +// Get returns the value of the account token. +// If the token is cached, it returns from there. Otherwise +// it creates a new token with a new expiration time. +func (t *TokenCache) Get() (string, error) { + tv, found := t.c.Get(t.id) + if found { + return tv.(string), nil + } + logrus.WithField("id", t.id).Infoln("refreshing token") + token, err := Token(audience, issuer, t.id, t.secret, t.expiry) + if err != nil { + return "", err + } + // refresh token before the expiration time to give some buffer + t.c.Set(t.id, token, t.expiry/2) + return token, nil +} diff --git a/delegateshell/example/main.go b/delegateshell/example/main.go new file mode 100644 index 0000000..13cca4c --- /dev/null +++ b/delegateshell/example/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "context" + "net/http" + "time" + + "github.com/sirupsen/logrus" + + "github.com/drone/go-task/delegateshell/client" + "github.com/drone/go-task/delegateshell/heartbeat" + "github.com/drone/go-task/delegateshell/poller" +) + +func main() { + // Create a delegate client + managerClient := client.New("https://localhost:9090", "kmpySmUISimoRrJL6NL73w", "2f6b0988b6fb3370073c3d0505baee59", true, "") + + // The poller needs a client that interacts with the task management system and a router to route the tasks + keepAlive := heartbeat.New("kmpySmUISimoRrJL6NL73w", "2f6b0988b6fb3370073c3d0505baee59", "dlite-xingchi", []string{"k8s-runner"}, managerClient) + + // // Register the poller + ctx := context.Background() + info, _ := keepAlive.Register(ctx) + + logrus.Info("Runner registered") + + requestsChan := make(chan *client.RunnerRequest, 3) + + // Start polling for bijou events + eventsServer := poller.New(managerClient, requestsChan) + // TODO: we don't need hb if we poll for task. Isn't it ? : ) + eventsServer.PollRunnerEvents(ctx, 3, info.ID, time.Second*10) + + // Just to keep it running + http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) { + logrus.Info(w, *info) + }) + + logrus.Info("Starting server at port 8080\n") + if err := http.ListenAndServe(":8080", nil); err != nil { + logrus.Fatal(err) + } +} diff --git a/delegateshell/heartbeat/keepalive.go b/delegateshell/heartbeat/keepalive.go new file mode 100644 index 0000000..a2e9540 --- /dev/null +++ b/delegateshell/heartbeat/keepalive.go @@ -0,0 +1,148 @@ +package heartbeat + +import ( + "context" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/icrowley/fake" + "github.com/drone/go-task/delegateshell/client" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + // Time period between sending heartbeats to the server + hearbeatInterval = 10 * time.Second + heartbeatTimeout = 15 * time.Second + taskEventsTimeout = 30 * time.Second +) + +type FilterFn func(*client.TaskEvent) bool + +type KeepAlive struct { + AccountID string + AccountSecret string + Name string // name of the runner + Tags []string // list of tags that the runner accepts + Client client.Client + Filter FilterFn + // The Harness manager allows two task acquire calls with the same delegate ID to go through (by design). + // We need to make sure two different threads do not acquire the same task. + // This map makes sure Acquire() is called only once per task ID. The mapping is removed once the status + // for the task has been sent. + m sync.Map +} + +type DelegateInfo struct { + Host string + IP string + ID string + Name string +} + +func New(accountID, accountSecret, name string, tags []string, c client.Client) *KeepAlive { + return &KeepAlive{ + AccountID: accountID, + AccountSecret: accountSecret, + Tags: tags, + Name: name, + Client: c, + m: sync.Map{}, + } +} + +func (p *KeepAlive) SetFilter(filter FilterFn) { + p.Filter = filter +} + +// Register registers the runner with the server. The server generates a delegate ID +// which is returned to the client. +func (p *KeepAlive) Register(ctx context.Context) (*DelegateInfo, error) { + host, err := os.Hostname() + if err != nil { + return nil, errors.Wrap(err, "could not get host name") + } + host = "dlite-" + strings.ReplaceAll(host, " ", "-") + ip := getOutboundIP() + id, err := p.register(ctx, hearbeatInterval, ip, host) + if err != nil { + logrus.WithField("ip", ip).WithField("host", host).WithError(err).Error("could not register runner") + return nil, err + } + return &DelegateInfo{ + ID: id, + Host: host, + IP: ip, + Name: p.Name, + }, nil +} + +// Register registers the runner and runs a background thread which keeps pinging the server +// at a period of interval. It returns the delegate ID. +func (p *KeepAlive) register(ctx context.Context, interval time.Duration, ip, host string) (string, error) { + req := &client.RegisterRequest{ + AccountID: p.AccountID, + RunnerName: p.Name, + LastHeartbeat: time.Now().UnixMilli(), + //Token: p.AccountSecret, + NG: true, + Type: "DOCKER", + Polling: true, + HostName: host, + IP: ip, + // SupportedTaskTypes: p.Router.Routes(), // Ignore this because for new Runner tasks, this SupportedTaskTypes feature doesn't apply + Tags: p.Tags, + HeartbeatAsObject: true, + } + resp, err := p.Client.Register(ctx, req) + if err != nil { + return "", errors.Wrap(err, "could not register the runner") + } + req.ID = resp.Resource.DelegateID + logrus.WithField("id", req.ID).WithField("host", req.HostName). + WithField("ip", req.IP).Info("registered delegate successfully") + p.heartbeat(ctx, req, interval) + return resp.Resource.DelegateID, nil +} + +// heartbeat starts a periodic thread in the background which continually pings the server +func (p *KeepAlive) heartbeat(ctx context.Context, req *client.RegisterRequest, interval time.Duration) { + go func() { + msgDelayTimer := time.NewTimer(interval) + defer msgDelayTimer.Stop() + for { + msgDelayTimer.Reset(interval) + select { + case <-ctx.Done(): + logrus.Error("context canceled") + return + case <-msgDelayTimer.C: + req.LastHeartbeat = time.Now().UnixMilli() + heartbeatCtx, cancelFn := context.WithTimeout(ctx, heartbeatTimeout) + err := p.Client.Heartbeat(heartbeatCtx, req) + if err != nil { + logrus.WithError(err).Errorf("could not send heartbeat") + } + cancelFn() + } + } + }() +} + +// Get preferred outbound ip of this machine. It returns a fake IP in case of errors. +func getOutboundIP() string { + conn, err := net.Dial("udp", "8.8.8.8:80") + if err != nil { + logrus.WithError(err).Error("could not figure out an IP, using a randomly generated IP") + return "fake-" + fake.IPv4() + } + defer conn.Close() + + localAddr := conn.LocalAddr().(*net.UDPAddr) + return localAddr.IP.String() +} diff --git a/delegateshell/heartbeat/writer.go b/delegateshell/heartbeat/writer.go new file mode 100644 index 0000000..15bc5d6 --- /dev/null +++ b/delegateshell/heartbeat/writer.go @@ -0,0 +1,29 @@ +package heartbeat + +import ( + "bytes" + "net/http" +) + +// response implements http.ResponseWriter +type response struct { + buf bytes.Buffer + header http.Header + status int +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) WriteHeader(statusCode int) { + r.status = statusCode +} + +func (r *response) Write(p []byte) (n int, err error) { + return r.buf.Write(p) +} + +func NewResponseWriter() *response { //nolint:revive + return &response{header: map[string][]string{}, buf: bytes.Buffer{}} +} diff --git a/delegateshell/logger/logger.go b/delegateshell/logger/logger.go new file mode 100644 index 0000000..0024885 --- /dev/null +++ b/delegateshell/logger/logger.go @@ -0,0 +1,53 @@ +package logger + +// A Logger represents an active logging object that generates +// lines of output to an io.Writer. +type Logger interface { + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Debugln(args ...interface{}) + + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Errorln(args ...interface{}) + + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Infoln(args ...interface{}) + + Trace(args ...interface{}) + Tracef(format string, args ...interface{}) + Traceln(args ...interface{}) + + Warn(args ...interface{}) + Warnf(format string, args ...interface{}) + Warnln(args ...interface{}) +} + +// Default returns the default logger. +var Default = Discard() + +// Discard returns a no-op logger +func Discard() Logger { + return &discard{} +} + +type discard struct{} + +func (*discard) Debug(args ...interface{}) {} +func (*discard) Debugf(format string, args ...interface{}) {} +func (*discard) Debugln(args ...interface{}) {} +func (*discard) Error(args ...interface{}) {} +func (*discard) Errorf(format string, args ...interface{}) {} +func (*discard) Errorln(args ...interface{}) {} +func (*discard) Info(args ...interface{}) {} +func (*discard) Infof(format string, args ...interface{}) {} +func (*discard) Infoln(args ...interface{}) {} +func (*discard) Trace(args ...interface{}) {} +func (*discard) Tracef(format string, args ...interface{}) {} +func (*discard) Traceln(args ...interface{}) {} +func (*discard) Warn(args ...interface{}) {} +func (*discard) Warnf(format string, args ...interface{}) {} +func (*discard) Warnln(args ...interface{}) {} +func (d *discard) WithError(error) Logger { return d } +func (d *discard) WithField(string, interface{}) Logger { return d } diff --git a/delegateshell/poller/events_poller.go b/delegateshell/poller/events_poller.go new file mode 100644 index 0000000..7beea13 --- /dev/null +++ b/delegateshell/poller/events_poller.go @@ -0,0 +1,111 @@ +package poller + +import ( + "context" + "sync" + "time" + + "github.com/drone/go-task/delegateshell/client" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + taskEventsTimeout = 30 * time.Second +) + +type FilterFn func(*client.RunnerEvent) bool + +type EventsServer struct { + Client client.Client + RequestsStream chan<- *client.RunnerRequest + Filter FilterFn + // The Harness manager allows two task acquire calls with the same delegate ID to go through (by design). + // We need to make sure two different threads do not acquire the same task. + // This map makes sure Acquire() is called only once per task ID. The mapping is removed once the status + // for the task has been sent. + m sync.Map +} + +func New(c client.Client, requestsChan chan<- *client.RunnerRequest) *EventsServer { + return &EventsServer{ + Client: c, + RequestsStream: requestsChan, + m: sync.Map{}, + } +} + +func (p *EventsServer) SetFilter(filter FilterFn) { + p.Filter = filter +} + +// Poll continually asks the task server for tasks to execute. +func (p *EventsServer) PollRunnerEvents(ctx context.Context, n int, id string, interval time.Duration) error { + var wg sync.WaitGroup + events := make(chan *client.RunnerEvent, n) + // Task event poller + go func() { + pollTimer := time.NewTimer(interval) + for { + pollTimer.Reset(interval) + select { + case <-ctx.Done(): + logrus.Error("context canceled") + return + case <-pollTimer.C: + taskEventsCtx, cancelFn := context.WithTimeout(ctx, taskEventsTimeout) + tasks, err := p.Client.GetRunnerEvents(taskEventsCtx, id) + if err != nil { + logrus.WithError(err).Errorf("could not query for task events") + } + cancelFn() + + for _, e := range tasks.RunnerEvents { + events <- e + } + } + } + }() + // Task event processor. Start n threads to process events from the channel + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + for { + select { + case <-ctx.Done(): + wg.Done() + return + case task := <-events: + logrus.Info(*task) + err := p.queueRunnerRequest(ctx, id, *task) + if err != nil { + logrus.WithError(err).WithField("task_id", task.TaskID).Errorf("[Thread %d]: delegate [%s] could not queue runner request", i, id) + } + } + } + }(i) + } + logrus.Infof("initialized %d threads successfully and starting polling for tasks", n) + wg.Wait() + return nil +} + +// execute tries to acquire the task and executes the handler for it +func (p *EventsServer) queueRunnerRequest(ctx context.Context, delegateID string, rv client.RunnerEvent) error { + taskID := rv.TaskID + if _, loaded := p.m.LoadOrStore(taskID, true); loaded { + return nil + } + defer p.m.Delete(taskID) + payloads, err := p.Client.GetExecutionPayload(ctx, delegateID, taskID) + logrus.Info("hey here") + logrus.Info(payloads.Requests) + if err != nil { + return errors.Wrap(err, "failed to get payload") + } + for _, request := range payloads.Requests { + p.RequestsStream <- request + } + return nil +} diff --git a/go.mod b/go.mod index a87e9a5..2e58159 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,22 @@ module github.com/drone/go-task go 1.20 -require github.com/google/go-cmp v0.6.0 +require ( + github.com/cenkalti/backoff/v4 v4.2.1 + github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 + github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.9.3 + github.com/wings-software/dlite v1.0.0-rc.10 + google.golang.org/protobuf v1.33.0 + gopkg.in/square/go-jose.v2 v2.6.0 +) + +require ( + github.com/corpix/uarand v0.0.0-20170723150923-031be390f409 // indirect + golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 // indirect + golang.org/x/sys v0.0.0-20220727055044-e65921a090b8 // indirect +) diff --git a/go.sum b/go.sum index 5a8d551..2ed802a 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,45 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/corpix/uarand v0.0.0-20170723150923-031be390f409 h1:9A+mfQmwzZ6KwUXPc8nHxFtKgn9VIvO3gXAOspIcE3s= +github.com/corpix/uarand v0.0.0-20170723150923-031be390f409/go.mod h1:JSm890tOkDN+M1jqN8pUGDKnzJrsVbJwSMHBY4zwz7M= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2 h1:qU3v73XG4QAqCPHA4HOpfC1EfUvtLIDvQK4mNQ0LvgI= +github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2/go.mod h1:dQ6TM/OGAe+cMws81eTe4Btv1dKxfPZ2CX+YaAFAPN4= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q= +github.com/wings-software/dlite v1.0.0-rc.10 h1:epasWALCQSsJt9J8go7jJ4JeXVlqJ30Ch8X9pHjS9vc= +github.com/wings-software/dlite v1.0.0-rc.10/go.mod h1:zZd6iaMk8Av1QSABGuDWdxBFO82MxE0r6PRoDsLDvCE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 h1:ydJNl0ENAG67pFbB+9tfhiL2pYqLhfoaZFw/cjLhY4A= +golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220727055044-e65921a090b8 h1:dyU22nBWzrmTQxtNrr4dzVOvaw35nUYE279vF9UmsI8= +golang.org/x/sys v0.0.0-20220727055044-e65921a090b8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= +gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= From 9bbb1eea21bb045bbfd64ff139ebb42c44ac1293 Mon Sep 17 00:00:00 2001 From: "xingchi.jin" Date: Fri, 15 Mar 2024 12:02:10 -0700 Subject: [PATCH 2/3] some minor fixes abt readme --- delegateshell/README.md | 12 ++++++------ delegateshell/client/client.go | 1 - delegateshell/example/main.go | 4 ++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/delegateshell/README.md b/delegateshell/README.md index e71480c..5f3002e 100644 --- a/delegateshell/README.md +++ b/delegateshell/README.md @@ -9,7 +9,7 @@ delegate-shell library provides two utlities. The idea is `delegate-shell` library should be a standalone library. Runner can use this library to handle all lifecycle events and interations with Harness. # Usage -Example codes in `delegateshell/example` folder. +Example codes in `delegateshell/example` folder. You can run it by `go run main.go` A way to use this client would be: 1. Registeration & heartbeat @@ -25,12 +25,12 @@ A way to use this client would be: ``` 2. Poll tasks ``` - requestsChan := make(chan *client.RunnerRequest, 3) + requestsChan := make(chan *client.RunnerRequest, 3) - // Start polling for events - eventsServer := poller.New(managerClient, requestsChan) - eventsServer.PollRunnerEvents(ctx, 3, info.ID, time.Second*10) - // next: process requests from 'requestsChan' + // Start polling for events + eventsServer := poller.New(managerClient, requestsChan) + eventsServer.PollRunnerEvents(ctx, 3, info.ID, time.Second*10) + // next: process requests from 'requestsChan' ``` ## TODOs diff --git a/delegateshell/client/client.go b/delegateshell/client/client.go index 1e18f93..706f9bc 100644 --- a/delegateshell/client/client.go +++ b/delegateshell/client/client.go @@ -11,7 +11,6 @@ type ( RegisterRequest struct { AccountID string `json:"accountId,omitempty"` RunnerName string `json:"delegateName,omitempty"` - // Token string `json:"delegateRandomToken,omitempty"` LastHeartbeat int64 `json:"lastHeartBeat,omitempty"` ID string `json:"delegateId,omitempty"` Type string `json:"delegateType,omitempty"` diff --git a/delegateshell/example/main.go b/delegateshell/example/main.go index 13cca4c..26169da 100644 --- a/delegateshell/example/main.go +++ b/delegateshell/example/main.go @@ -14,10 +14,10 @@ import ( func main() { // Create a delegate client - managerClient := client.New("https://localhost:9090", "kmpySmUISimoRrJL6NL73w", "2f6b0988b6fb3370073c3d0505baee59", true, "") + managerClient := client.New("https://localhost:9090", "kmpySmUISimoRrJL6NL73w" /* account id */, "use your token", true, "") // The poller needs a client that interacts with the task management system and a router to route the tasks - keepAlive := heartbeat.New("kmpySmUISimoRrJL6NL73w", "2f6b0988b6fb3370073c3d0505baee59", "dlite-xingchi", []string{"k8s-runner"}, managerClient) + keepAlive := heartbeat.New("kmpySmUISimoRrJL6NL73w"/* account id */, "use your token", "new-runner", []string{"k8s-runner"}, managerClient) // // Register the poller ctx := context.Background() From a4d9af2264bbe97dab9a505c3c30ed5256047314 Mon Sep 17 00:00:00 2001 From: "xingchi.jin" Date: Fri, 15 Mar 2024 12:44:12 -0700 Subject: [PATCH 3/3] minor fix to readme --- delegateshell/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/delegateshell/README.md b/delegateshell/README.md index 5f3002e..fa2fd5e 100644 --- a/delegateshell/README.md +++ b/delegateshell/README.md @@ -34,7 +34,7 @@ A way to use this client would be: ``` ## TODOs -1. It uses register call for heartbeat. Maybe we should use polling tasks as heartbeat. This reduces network payload on Harness gateway significantly +1. It uses register call for heartbeat. Maybe we should use polling tasks as heartbeat. This reduces network load on Harness gateway significantly 2. thread pool abstraction should be there for handle thread&resource allocation&isolation 3. Create a top level package for logger. Use that everywhere 4. Shutdownhook is not there