diff --git a/delegateshell/README.md b/delegateshell/README.md new file mode 100644 index 0000000..fa2fd5e --- /dev/null +++ b/delegateshell/README.md @@ -0,0 +1,43 @@ +# About + +A golang client which can interact and acquire tasks from the Harness manager (or any task management system which implements the same interface) + +delegate-shell library provides two utlities. +1. Register Runner with Runner Manager and sending heartbeats +2. Poll Runner task events + +The idea is `delegate-shell` library should be a standalone library. Runner can use this library to handle all lifecycle events and interations with Harness. + +# Usage +Example codes in `delegateshell/example` folder. You can run it by `go run main.go` + +A way to use this client would be: +1. Registeration & heartbeat +``` + // Create a manager http client + managerClient := client.New(...) + + keepAlive := heartbeat.New(..., managerClient) + + // Register & heartbeat + ctx := context.Background() + resp, _ := keepAlive.Register(ctx) +``` +2. Poll tasks +``` + requestsChan := make(chan *client.RunnerRequest, 3) + + // Start polling for events + eventsServer := poller.New(managerClient, requestsChan) + eventsServer.PollRunnerEvents(ctx, 3, info.ID, time.Second*10) + // next: process requests from 'requestsChan' +``` + +## TODOs +1. It uses register call for heartbeat. Maybe we should use polling tasks as heartbeat. This reduces network load on Harness gateway significantly +2. thread pool abstraction should be there for handle thread&resource allocation&isolation +3. Create a top level package for logger. Use that everywhere +4. Shutdownhook is not there +5. CPU memory related rejecting tasks is not there +6. Add error library for all places +7. Central Config: implement a global context where it has all the delegate configurations diff --git a/delegateshell/client/client.go b/delegateshell/client/client.go new file mode 100644 index 0000000..706f9bc --- /dev/null +++ b/delegateshell/client/client.go @@ -0,0 +1,126 @@ +package client + +import ( + "context" + "encoding/json" +) + +// TODO: Make the structs more generic and remove Harness specific stuff +type ( + // Taken from existing manager API + RegisterRequest struct { + AccountID string `json:"accountId,omitempty"` + RunnerName string `json:"delegateName,omitempty"` + LastHeartbeat int64 `json:"lastHeartBeat,omitempty"` + ID string `json:"delegateId,omitempty"` + Type string `json:"delegateType,omitempty"` + NG bool `json:"ng,omitempty"` + Polling bool `json:"pollingModeEnabled,omitempty"` // why Runner needs type ?? maybe should remove + HostName string `json:"hostName,omitempty"` + Connected bool `json:"connected,omitempty"` + KeepAlivePacket bool `json:"keepAlivePacket,omitempty"` + IP string `json:"ip,omitempty"` + Tags []string `json:"tags,omitempty"` + HeartbeatAsObject bool `json:"heartbeatAsObject,omitempty"` // TODO: legacy to remove + } + + // Used in the java codebase :'( + RegisterResponse struct { + Resource RegistrationData `json:"resource"` + } + + RegistrationData struct { + DelegateID string `json:"delegateId"` + } + + TaskEventsResponse struct { + TaskEvents []*TaskEvent `json:"delegateTaskEvents"` + } + + RunnerEvent struct { + AccountID string `json:"accountId"` + TaskID string `json:"taskId"` + RunnerType string `json:"runnerType"` + TaskType string `json:"taskType"` + } + + RunnerEventsResponse struct { + RunnerEvents []*RunnerEvent `json:"delegateRunnerEvents"` + } + + TaskEvent struct { + AccountID string `json:"accountId"` + TaskID string `json:"delegateTaskId"` + Sync bool `json:"sync"` + TaskType string `json:"taskType"` + } + + Task struct { + ID string `json:"id"` + Type string `json:"type"` + Data json.RawMessage `json:"data"` + Async bool `json:"async"` + Timeout int `json:"timeout"` + Logging LogInfo `json:"logging"` + DelegateInfo DelegateInfo `json:"delegate"` + Capabilities json.RawMessage `json:"capabilities"` + } + + LogInfo struct { + Token string `json:"token"` + Abstractions map[string]string `json:"abstractions"` + } + + DelegateInfo struct { + ID string `json:"id"` + InstanceID string `json:"instance_id"` + Token string `json:"token"` + } + + TaskResponse struct { + ID string `json:"id"` + Data json.RawMessage `json:"data"` + Type string `json:"type"` + Code string `json:"code"` // OK, FAILED, RETRY_ON_OTHER_DELEGATE + } + + DelegateCapacity struct { + MaxBuilds int `json:"maximumNumberOfBuilds"` + } + + RunnerAcquiredTasks struct { + Requests []*RunnerRequest `json:"requests"` + } + + RunnerRequest struct { + TaskId string `json:"id"` + AccountId string `json:"account_id"` + Task *RunnerTask `json:"task"` + Secrets []*RunnerTask `json:"secrets"` + } + + RunnerTask struct { + Type string `json:"type"` + Driver string `json:"driver"` + Data []byte `json:"data"` + Config []byte `json:"config"` + } +) + +// Client is an interface which defines methods on interacting with a task managing system. +type Client interface { + // Register registers the runner with the task server + Register(ctx context.Context, r *RegisterRequest) (*RegisterResponse, error) + + // Heartbeat pings the task server to let it know that the runner is still alive + Heartbeat(ctx context.Context, r *RegisterRequest) error + + // GetTaskEvents gets a list of pending tasks that need to be executed for this runner + GetRunnerEvents(ctx context.Context, delegateID string) (*RunnerEventsResponse, error) + + // Acquire tells the task server that the runner is ready to execute a task ID + GetExecutionPayload(ctx context.Context, delegateID, taskID string) (*RunnerAcquiredTasks, error) + + // SendStatus sends a response to the task server for a task ID + SendStatus(ctx context.Context, delegateID, taskID string, req *TaskResponse) error +} diff --git a/delegateshell/client/http.go b/delegateshell/client/http.go new file mode 100644 index 0000000..7a52709 --- /dev/null +++ b/delegateshell/client/http.go @@ -0,0 +1,369 @@ +package client + +import ( + "bytes" + "context" + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "time" + + pb "google.golang.org/protobuf/proto" + + "github.com/cenkalti/backoff/v4" + "github.com/drone/go-task/delegateshell/delegate" + "github.com/sirupsen/logrus" + + "github.com/wings-software/dlite/logger" +) + +const ( + registerEndpoint = "/api/agent/delegates/register?accountId=%s" + heartbeatEndpoint = "/api/agent/delegates/heartbeat-with-polling?accountId=%s" + taskStatusEndpoint = "/api/agent/v2/tasks/%s/delegates/%s?accountId=%s" + runnerEventsPollEndpoint = "/api/agent/delegates/%s/runner-events?accountId=%s" + executionPayloadEndpoint = "/api/executions/%s/payload?delegateId=%s&accountId=%s&delegateInstanceId=%s" +) + +var ( + registerTimeout = 30 * time.Second + taskEventsTimeout = 60 * time.Second + sendStatusRetryTimes = 5 +) + +// defaultClient is the default http.Client. +var defaultClient = &http.Client{ + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + }, +} + +// New returns a new client. +func New(endpoint, id, secret string, skipverify bool, additionalCertsDir string) *HTTPClient { + return getClient(endpoint, id, "", delegate.NewTokenCache(id, secret), skipverify, additionalCertsDir) +} + +func NewFromToken(endpoint, id, token string, skipverify bool, additionalCertsDir string) *HTTPClient { + return getClient(endpoint, id, token, nil, skipverify, additionalCertsDir) +} + +func getClient(endpoint, id, token string, cache *delegate.TokenCache, skipverify bool, additionalCertsDir string) *HTTPClient { + log := logrus.New() + c := &HTTPClient{ + Logger: log, + Endpoint: endpoint, + SkipVerify: skipverify, + AccountID: id, + Client: defaultClient, + AccountTokenCache: cache, + Token: token, + } + if skipverify { + c.Client = &http.Client{ + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + }, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: skipverify, //nolint:gosec + }, + }, + } + } else if additionalCertsDir != "" { + // If additional certs are specified, we append them to the existing cert chain + + // Use the system certs if possible + rootCAs, _ := x509.SystemCertPool() + if rootCAs == nil { + rootCAs = x509.NewCertPool() + } + + log.Infof("additional certs dir to allow: %s\n", additionalCertsDir) + + files, err := os.ReadDir(additionalCertsDir) + if err != nil { + log.Errorf("could not read directory %s, error: %s", additionalCertsDir, err) + c.Client = clientWithRootCAs(skipverify, rootCAs) + return c + } + + // Go through all certs in this directory and add them to the global certs + for _, f := range files { + path := filepath.Join(additionalCertsDir, f.Name()) + log.Infof("trying to add certs at: %s to root certs\n", path) + // Create TLS config using cert PEM + rootPem, err := os.ReadFile(path) + if err != nil { + log.Errorf("could not read certificate file (%s), error: %s", path, err.Error()) + continue + } + // Append certs to the global certs + ok := rootCAs.AppendCertsFromPEM(rootPem) + if !ok { + log.Errorf("error adding cert (%s) to pool, please check format of the certs provided.", path) + continue + } + log.Infof("successfully added cert at: %s to root certs", path) + } + c.Client = clientWithRootCAs(skipverify, rootCAs) + } + return c +} + +func clientWithRootCAs(skipverify bool, rootCAs *x509.CertPool) *http.Client { + // Create the HTTP Client with certs + config := &tls.Config{ + //nolint:gosec + InsecureSkipVerify: skipverify, + } + if rootCAs != nil { + config.RootCAs = rootCAs + } + return &http.Client{ + CheckRedirect: func(*http.Request, []*http.Request) error { + return http.ErrUseLastResponse + }, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: config, + }, + } +} + +// An HTTPClient manages communication with the runner API. +type HTTPClient struct { + Client *http.Client + Logger logger.Logger + Endpoint string + AccountID string + AccountTokenCache *delegate.TokenCache + SkipVerify bool + Token string +} + +// Register registers the runner with the manager +func (p *HTTPClient) Register(ctx context.Context, r *RegisterRequest) (*RegisterResponse, error) { + req := r + resp := &RegisterResponse{} + path := fmt.Sprintf(registerEndpoint, p.AccountID) + _, err := p.retry(ctx, path, "POST", req, resp, createBackoff(ctx, registerTimeout), true) //nolint: bodyclose + return resp, err +} + +// Heartbeat sends a periodic heartbeat to the server +func (p *HTTPClient) Heartbeat(ctx context.Context, r *RegisterRequest) error { + req := r + path := fmt.Sprintf(heartbeatEndpoint, p.AccountID) + _, err := p.doJson(ctx, path, "POST", req, nil) + return err +} + +// GetRunnerEvents gets a list of events which can be executed on this runner +func (p *HTTPClient) GetRunnerEvents(ctx context.Context, id string) (*RunnerEventsResponse, error) { + path := fmt.Sprintf(runnerEventsPollEndpoint, id, p.AccountID) + events := &RunnerEventsResponse{} + _, err := p.doJson(ctx, path, "GET", nil, events) + return events, err +} + +// Acquire tries to acquire a specific task +func (p *HTTPClient) GetExecutionPayload(ctx context.Context, delegateID, taskID string) (*RunnerAcquiredTasks, error) { + path := fmt.Sprintf(executionPayloadEndpoint, taskID, delegateID, p.AccountID, delegateID) + payload := &RunnerAcquiredTasks{} + _, err := p.doJson(ctx, path, "GET", nil, payload) + if err != nil { + logrus.WithError(err).Error("Error making http call") + } + return payload, err +} + +// SendStatus updates the status of a task +func (p *HTTPClient) SendStatus(ctx context.Context, delegateID, taskID string, r *TaskResponse) error { + path := fmt.Sprintf(taskStatusEndpoint, taskID, delegateID, p.AccountID) + logrus.Info("response: ", path) + req := r + retryNumber := 0 + var err error + for retryNumber < sendStatusRetryTimes { + _, err = p.retry(ctx, path, "POST", req, nil, createBackoff(ctx, taskEventsTimeout), true) //nolint: bodyclose + if err == nil { + return nil + } + retryNumber++ + } + return err +} + +func (p *HTTPClient) retry(ctx context.Context, path, method string, in, out interface{}, b backoff.BackOffContext, ignoreStatusCode bool) (*http.Response, error) { //nolint: unparam + for { + res, err := p.doJson(ctx, path, method, in, out) + // do not retry on Canceled or DeadlineExceeded + if ctxErr := ctx.Err(); ctxErr != nil { + p.logger().Errorf("http: context canceled") + return res, ctxErr + } + + duration := b.NextBackOff() + + if res != nil { + // Check the response code. We retry on 500-range + // responses to allow the server time to recover, as + // 500's are typically not permanent errors and may + // relate to outages on the server side. + if (ignoreStatusCode && err != nil) || res.StatusCode > 501 { + p.logger().Errorf("http: server error: re-connect and re-try: %s", err) + if duration == backoff.Stop { + p.logger().Errorf("max retry limit reached, task status won't be updated") + return nil, err + } + time.Sleep(duration) + continue + } + } else if err != nil { + p.logger().Errorf("http: request error: %s", err) + if duration == backoff.Stop { + p.logger().Errorf("max retry limit reached, task status won't be updated") + return nil, err + } + time.Sleep(duration) + continue + } + return res, err + } +} + +func (p *HTTPClient) doJson(ctx context.Context, path, method string, in, out interface{}) (*http.Response, error) { + var buf = &bytes.Buffer{} + // marshal the input payload into json format and copy + // to an io.ReadCloser. + if in != nil { + if err := json.NewEncoder(buf).Encode(in); err != nil { + p.logger().Errorf("could not encode input payload: %s", err) + } + } + res, body, err := p.do(ctx, path, method, buf) + if err != nil { + return res, err + } + if nil == out { + return res, nil + } + if jsonErr := json.Unmarshal(body, out); jsonErr != nil { + return res, jsonErr + } + + return res, nil +} + +func (p *HTTPClient) doProto(ctx context.Context, path, method string, in pb.Message, out pb.Message) (*http.Response, error) { + // marshal the input payload into proto format and copy + // to an io.ReadCloser. + input, err := pb.Marshal(in) + if err != nil { + return nil, err + } + buf := bytes.NewBuffer(input) + res, body, err := p.do(ctx, path, method, buf) + if err != nil { + return res, err + } + if nil == out { + return res, nil + } + if protoErr := pb.Unmarshal(body, out); protoErr != nil { + return res, protoErr + } + + return res, nil +} + +// do is a helper function that posts a signed http request with +// the input encoded and response decoded from json. +func (p *HTTPClient) do(ctx context.Context, path, method string, in *bytes.Buffer) (*http.Response, []byte, error) { + endpoint := p.Endpoint + path + req, err := http.NewRequest(method, endpoint, in) + if err != nil { + return nil, nil, err + } + req = req.WithContext(ctx) + + // the request should include the secret shared between + // the agent and server for authorization. + token := "" + if p.Token != "" { + token = p.Token + } else { + token, err = p.AccountTokenCache.Get() + if err != nil { + p.logger().Errorf("could not generate account token: %s", err) + return nil, nil, err + } + } + req.Header.Add("Authorization", "Delegate "+token) + req.Header.Add("Content-Type", "application/json") + res, err := p.Client.Do(req) + if res != nil { + defer func() { + // drain the response body so we can reuse + // this connection. + if _, err = io.Copy(io.Discard, io.LimitReader(res.Body, 4096)); err != nil { + p.logger().Errorf("could not drain response body: %s", err) + } + res.Body.Close() + }() + } + if err != nil { + return res, nil, err + } + + // if the response body return no content we exit + // immediately. We do not read or unmarshal the response + // and we do not return an error. + if res.StatusCode == 204 { + return res, nil, nil + } + + // else read the response body into a byte slice. + body, err := io.ReadAll(res.Body) + if err != nil { + return res, nil, err + } + + if res.StatusCode > 299 { + // if the response body includes an error message + // we should return the error string. + if len(body) != 0 { + return res, body, errors.New( + string(body), + ) + } + // if the response body is empty we should return + // the default status code text. + return res, body, errors.New( + http.StatusText(res.StatusCode), + ) + } + return res, body, nil +} + +// logger is a helper function that returns the default logger +// if a custom logger is not defined. +func (p *HTTPClient) logger() logger.Logger { + if p.Logger == nil { + return logger.Discard() + } + return p.Logger +} + +func createBackoff(ctx context.Context, maxElapsedTime time.Duration) backoff.BackOffContext { + exp := backoff.NewExponentialBackOff() + exp.MaxElapsedTime = maxElapsedTime + return backoff.WithContext(exp, ctx) +} diff --git a/delegateshell/delegate/config.go b/delegateshell/delegate/config.go new file mode 100644 index 0000000..7fa429d --- /dev/null +++ b/delegateshell/delegate/config.go @@ -0,0 +1,28 @@ +package delegate + +import ( + "github.com/kelseyhightower/envconfig" +) + +// Sample config +type Config struct { + Debug bool `envconfig:"DRONE_DEBUG"` + Trace bool `envconfig:"DRONE_TRACE"` + + Delegate struct { + AccountID string `envconfig:"DRONE_DELEGATE_ACCOUNT_ID"` + AccountSecret string `envconfig:"DRONE_DELEGATE_ACCOUNT_SECRET"` + ManagerEndpoint string `envconfig:"DRONE_DELEGATE_MANAGER_ENDPOINT"` + Name string `envconfig:"DRONE_DELEGATE_NAME"` + } +} + +func FromEnviron() (Config, error) { + var config Config + err := envconfig.Process("", &config) + if err != nil { + return config, err + } + + return config, nil +} diff --git a/delegateshell/delegate/token.go b/delegateshell/delegate/token.go new file mode 100644 index 0000000..67324e6 --- /dev/null +++ b/delegateshell/delegate/token.go @@ -0,0 +1,42 @@ +package delegate + +import ( + "encoding/hex" + "time" + + "github.com/google/uuid" + "gopkg.in/square/go-jose.v2" + "gopkg.in/square/go-jose.v2/jwt" +) + +// Token generates a token with the given expiry to interact with the Harness manager +func Token(audience, issuer, subject, secret string, expiry time.Duration) (string, error) { + bytes, err := hex.DecodeString(secret) + if err != nil { + return "", err + } + + enc, err := jose.NewEncrypter( + jose.A128GCM, + jose.Recipient{Algorithm: jose.DIRECT, Key: bytes}, + (&jose.EncrypterOptions{}).WithType("JWT"), + ) + if err != nil { + return "", err + } + + cl := jwt.Claims{ + Subject: subject, + Issuer: issuer, + Audience: []string{audience}, + Expiry: jwt.NewNumericDate(time.Now().Add(expiry)), + IssuedAt: jwt.NewNumericDate(time.Now()), + ID: uuid.New().String(), + } + raw, err := jwt.Encrypted(enc).Claims(cl).CompactSerialize() + if err != nil { + return "", err + } + + return raw, nil +} diff --git a/delegateshell/delegate/token_cache.go b/delegateshell/delegate/token_cache.go new file mode 100644 index 0000000..610fe2c --- /dev/null +++ b/delegateshell/delegate/token_cache.go @@ -0,0 +1,52 @@ +package delegate + +import ( + "time" + + "github.com/patrickmn/go-cache" + "github.com/sirupsen/logrus" +) + +var ( + audience = "audience" + issuer = "issuer" + expirationTime = 20 * time.Minute +) + +type TokenCache struct { + id string + secret string + expiry time.Duration + c *cache.Cache +} + +// NewTokenCache creates a token cache which creates a new token +// after the expiry time is over +func NewTokenCache(id, secret string) *TokenCache { + // purge expired tokens from the cache at expirationTime/3 intervals + c := cache.New(cache.DefaultExpiration, expirationTime/3) + return &TokenCache{ + id: id, + secret: secret, + expiry: expirationTime, + c: c, + } +} + +// Get returns the value of the account token. +// If the token is cached, it returns from there. Otherwise +// it creates a new token with a new expiration time. +func (t *TokenCache) Get() (string, error) { + tv, found := t.c.Get(t.id) + if found { + return tv.(string), nil + } + logrus.WithField("id", t.id).Infoln("refreshing token") + token, err := Token(audience, issuer, t.id, t.secret, t.expiry) + if err != nil { + return "", err + } + // refresh token before the expiration time to give some buffer + t.c.Set(t.id, token, t.expiry/2) + return token, nil +} diff --git a/delegateshell/example/main.go b/delegateshell/example/main.go new file mode 100644 index 0000000..26169da --- /dev/null +++ b/delegateshell/example/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "context" + "net/http" + "time" + + "github.com/sirupsen/logrus" + + "github.com/drone/go-task/delegateshell/client" + "github.com/drone/go-task/delegateshell/heartbeat" + "github.com/drone/go-task/delegateshell/poller" +) + +func main() { + // Create a delegate client + managerClient := client.New("https://localhost:9090", "kmpySmUISimoRrJL6NL73w" /* account id */, "use your token", true, "") + + // The poller needs a client that interacts with the task management system and a router to route the tasks + keepAlive := heartbeat.New("kmpySmUISimoRrJL6NL73w"/* account id */, "use your token", "new-runner", []string{"k8s-runner"}, managerClient) + + // // Register the poller + ctx := context.Background() + info, _ := keepAlive.Register(ctx) + + logrus.Info("Runner registered") + + requestsChan := make(chan *client.RunnerRequest, 3) + + // Start polling for bijou events + eventsServer := poller.New(managerClient, requestsChan) + // TODO: we don't need hb if we poll for task. Isn't it ? : ) + eventsServer.PollRunnerEvents(ctx, 3, info.ID, time.Second*10) + + // Just to keep it running + http.HandleFunc("/hello", func(w http.ResponseWriter, r *http.Request) { + logrus.Info(w, *info) + }) + + logrus.Info("Starting server at port 8080\n") + if err := http.ListenAndServe(":8080", nil); err != nil { + logrus.Fatal(err) + } +} diff --git a/delegateshell/heartbeat/keepalive.go b/delegateshell/heartbeat/keepalive.go new file mode 100644 index 0000000..a2e9540 --- /dev/null +++ b/delegateshell/heartbeat/keepalive.go @@ -0,0 +1,148 @@ +package heartbeat + +import ( + "context" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/icrowley/fake" + "github.com/drone/go-task/delegateshell/client" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + // Time period between sending heartbeats to the server + hearbeatInterval = 10 * time.Second + heartbeatTimeout = 15 * time.Second + taskEventsTimeout = 30 * time.Second +) + +type FilterFn func(*client.TaskEvent) bool + +type KeepAlive struct { + AccountID string + AccountSecret string + Name string // name of the runner + Tags []string // list of tags that the runner accepts + Client client.Client + Filter FilterFn + // The Harness manager allows two task acquire calls with the same delegate ID to go through (by design). + // We need to make sure two different threads do not acquire the same task. + // This map makes sure Acquire() is called only once per task ID. The mapping is removed once the status + // for the task has been sent. + m sync.Map +} + +type DelegateInfo struct { + Host string + IP string + ID string + Name string +} + +func New(accountID, accountSecret, name string, tags []string, c client.Client) *KeepAlive { + return &KeepAlive{ + AccountID: accountID, + AccountSecret: accountSecret, + Tags: tags, + Name: name, + Client: c, + m: sync.Map{}, + } +} + +func (p *KeepAlive) SetFilter(filter FilterFn) { + p.Filter = filter +} + +// Register registers the runner with the server. The server generates a delegate ID +// which is returned to the client. +func (p *KeepAlive) Register(ctx context.Context) (*DelegateInfo, error) { + host, err := os.Hostname() + if err != nil { + return nil, errors.Wrap(err, "could not get host name") + } + host = "dlite-" + strings.ReplaceAll(host, " ", "-") + ip := getOutboundIP() + id, err := p.register(ctx, hearbeatInterval, ip, host) + if err != nil { + logrus.WithField("ip", ip).WithField("host", host).WithError(err).Error("could not register runner") + return nil, err + } + return &DelegateInfo{ + ID: id, + Host: host, + IP: ip, + Name: p.Name, + }, nil +} + +// Register registers the runner and runs a background thread which keeps pinging the server +// at a period of interval. It returns the delegate ID. +func (p *KeepAlive) register(ctx context.Context, interval time.Duration, ip, host string) (string, error) { + req := &client.RegisterRequest{ + AccountID: p.AccountID, + RunnerName: p.Name, + LastHeartbeat: time.Now().UnixMilli(), + //Token: p.AccountSecret, + NG: true, + Type: "DOCKER", + Polling: true, + HostName: host, + IP: ip, + // SupportedTaskTypes: p.Router.Routes(), // Ignore this because for new Runner tasks, this SupportedTaskTypes feature doesn't apply + Tags: p.Tags, + HeartbeatAsObject: true, + } + resp, err := p.Client.Register(ctx, req) + if err != nil { + return "", errors.Wrap(err, "could not register the runner") + } + req.ID = resp.Resource.DelegateID + logrus.WithField("id", req.ID).WithField("host", req.HostName). + WithField("ip", req.IP).Info("registered delegate successfully") + p.heartbeat(ctx, req, interval) + return resp.Resource.DelegateID, nil +} + +// heartbeat starts a periodic thread in the background which continually pings the server +func (p *KeepAlive) heartbeat(ctx context.Context, req *client.RegisterRequest, interval time.Duration) { + go func() { + msgDelayTimer := time.NewTimer(interval) + defer msgDelayTimer.Stop() + for { + msgDelayTimer.Reset(interval) + select { + case <-ctx.Done(): + logrus.Error("context canceled") + return + case <-msgDelayTimer.C: + req.LastHeartbeat = time.Now().UnixMilli() + heartbeatCtx, cancelFn := context.WithTimeout(ctx, heartbeatTimeout) + err := p.Client.Heartbeat(heartbeatCtx, req) + if err != nil { + logrus.WithError(err).Errorf("could not send heartbeat") + } + cancelFn() + } + } + }() +} + +// Get preferred outbound ip of this machine. It returns a fake IP in case of errors. +func getOutboundIP() string { + conn, err := net.Dial("udp", "8.8.8.8:80") + if err != nil { + logrus.WithError(err).Error("could not figure out an IP, using a randomly generated IP") + return "fake-" + fake.IPv4() + } + defer conn.Close() + + localAddr := conn.LocalAddr().(*net.UDPAddr) + return localAddr.IP.String() +} diff --git a/delegateshell/heartbeat/writer.go b/delegateshell/heartbeat/writer.go new file mode 100644 index 0000000..15bc5d6 --- /dev/null +++ b/delegateshell/heartbeat/writer.go @@ -0,0 +1,29 @@ +package heartbeat + +import ( + "bytes" + "net/http" +) + +// response implements http.ResponseWriter +type response struct { + buf bytes.Buffer + header http.Header + status int +} + +func (r *response) Header() http.Header { + return r.header +} + +func (r *response) WriteHeader(statusCode int) { + r.status = statusCode +} + +func (r *response) Write(p []byte) (n int, err error) { + return r.buf.Write(p) +} + +func NewResponseWriter() *response { //nolint:revive + return &response{header: map[string][]string{}, buf: bytes.Buffer{}} +} diff --git a/delegateshell/logger/logger.go b/delegateshell/logger/logger.go new file mode 100644 index 0000000..0024885 --- /dev/null +++ b/delegateshell/logger/logger.go @@ -0,0 +1,53 @@ +package logger + +// A Logger represents an active logging object that generates +// lines of output to an io.Writer. +type Logger interface { + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Debugln(args ...interface{}) + + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Errorln(args ...interface{}) + + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Infoln(args ...interface{}) + + Trace(args ...interface{}) + Tracef(format string, args ...interface{}) + Traceln(args ...interface{}) + + Warn(args ...interface{}) + Warnf(format string, args ...interface{}) + Warnln(args ...interface{}) +} + +// Default returns the default logger. +var Default = Discard() + +// Discard returns a no-op logger +func Discard() Logger { + return &discard{} +} + +type discard struct{} + +func (*discard) Debug(args ...interface{}) {} +func (*discard) Debugf(format string, args ...interface{}) {} +func (*discard) Debugln(args ...interface{}) {} +func (*discard) Error(args ...interface{}) {} +func (*discard) Errorf(format string, args ...interface{}) {} +func (*discard) Errorln(args ...interface{}) {} +func (*discard) Info(args ...interface{}) {} +func (*discard) Infof(format string, args ...interface{}) {} +func (*discard) Infoln(args ...interface{}) {} +func (*discard) Trace(args ...interface{}) {} +func (*discard) Tracef(format string, args ...interface{}) {} +func (*discard) Traceln(args ...interface{}) {} +func (*discard) Warn(args ...interface{}) {} +func (*discard) Warnf(format string, args ...interface{}) {} +func (*discard) Warnln(args ...interface{}) {} +func (d *discard) WithError(error) Logger { return d } +func (d *discard) WithField(string, interface{}) Logger { return d } diff --git a/delegateshell/poller/events_poller.go b/delegateshell/poller/events_poller.go new file mode 100644 index 0000000..7beea13 --- /dev/null +++ b/delegateshell/poller/events_poller.go @@ -0,0 +1,111 @@ +package poller + +import ( + "context" + "sync" + "time" + + "github.com/drone/go-task/delegateshell/client" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + taskEventsTimeout = 30 * time.Second +) + +type FilterFn func(*client.RunnerEvent) bool + +type EventsServer struct { + Client client.Client + RequestsStream chan<- *client.RunnerRequest + Filter FilterFn + // The Harness manager allows two task acquire calls with the same delegate ID to go through (by design). + // We need to make sure two different threads do not acquire the same task. + // This map makes sure Acquire() is called only once per task ID. The mapping is removed once the status + // for the task has been sent. + m sync.Map +} + +func New(c client.Client, requestsChan chan<- *client.RunnerRequest) *EventsServer { + return &EventsServer{ + Client: c, + RequestsStream: requestsChan, + m: sync.Map{}, + } +} + +func (p *EventsServer) SetFilter(filter FilterFn) { + p.Filter = filter +} + +// Poll continually asks the task server for tasks to execute. +func (p *EventsServer) PollRunnerEvents(ctx context.Context, n int, id string, interval time.Duration) error { + var wg sync.WaitGroup + events := make(chan *client.RunnerEvent, n) + // Task event poller + go func() { + pollTimer := time.NewTimer(interval) + for { + pollTimer.Reset(interval) + select { + case <-ctx.Done(): + logrus.Error("context canceled") + return + case <-pollTimer.C: + taskEventsCtx, cancelFn := context.WithTimeout(ctx, taskEventsTimeout) + tasks, err := p.Client.GetRunnerEvents(taskEventsCtx, id) + if err != nil { + logrus.WithError(err).Errorf("could not query for task events") + } + cancelFn() + + for _, e := range tasks.RunnerEvents { + events <- e + } + } + } + }() + // Task event processor. Start n threads to process events from the channel + for i := 0; i < n; i++ { + wg.Add(1) + go func(i int) { + for { + select { + case <-ctx.Done(): + wg.Done() + return + case task := <-events: + logrus.Info(*task) + err := p.queueRunnerRequest(ctx, id, *task) + if err != nil { + logrus.WithError(err).WithField("task_id", task.TaskID).Errorf("[Thread %d]: delegate [%s] could not queue runner request", i, id) + } + } + } + }(i) + } + logrus.Infof("initialized %d threads successfully and starting polling for tasks", n) + wg.Wait() + return nil +} + +// execute tries to acquire the task and executes the handler for it +func (p *EventsServer) queueRunnerRequest(ctx context.Context, delegateID string, rv client.RunnerEvent) error { + taskID := rv.TaskID + if _, loaded := p.m.LoadOrStore(taskID, true); loaded { + return nil + } + defer p.m.Delete(taskID) + payloads, err := p.Client.GetExecutionPayload(ctx, delegateID, taskID) + logrus.Info("hey here") + logrus.Info(payloads.Requests) + if err != nil { + return errors.Wrap(err, "failed to get payload") + } + for _, request := range payloads.Requests { + p.RequestsStream <- request + } + return nil +} diff --git a/go.mod b/go.mod index a87e9a5..2e58159 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,22 @@ module github.com/drone/go-task go 1.20 -require github.com/google/go-cmp v0.6.0 +require ( + github.com/cenkalti/backoff/v4 v4.2.1 + github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 + github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2 + github.com/kelseyhightower/envconfig v1.4.0 + github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.9.3 + github.com/wings-software/dlite v1.0.0-rc.10 + google.golang.org/protobuf v1.33.0 + gopkg.in/square/go-jose.v2 v2.6.0 +) + +require ( + github.com/corpix/uarand v0.0.0-20170723150923-031be390f409 // indirect + golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 // indirect + golang.org/x/sys v0.0.0-20220727055044-e65921a090b8 // indirect +) diff --git a/go.sum b/go.sum index 5a8d551..2ed802a 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,45 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/corpix/uarand v0.0.0-20170723150923-031be390f409 h1:9A+mfQmwzZ6KwUXPc8nHxFtKgn9VIvO3gXAOspIcE3s= +github.com/corpix/uarand v0.0.0-20170723150923-031be390f409/go.mod h1:JSm890tOkDN+M1jqN8pUGDKnzJrsVbJwSMHBY4zwz7M= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2 h1:qU3v73XG4QAqCPHA4HOpfC1EfUvtLIDvQK4mNQ0LvgI= +github.com/icrowley/fake v0.0.0-20221112152111-d7b7e2276db2/go.mod h1:dQ6TM/OGAe+cMws81eTe4Btv1dKxfPZ2CX+YaAFAPN4= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= +github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q= +github.com/wings-software/dlite v1.0.0-rc.10 h1:epasWALCQSsJt9J8go7jJ4JeXVlqJ30Ch8X9pHjS9vc= +github.com/wings-software/dlite v1.0.0-rc.10/go.mod h1:zZd6iaMk8Av1QSABGuDWdxBFO82MxE0r6PRoDsLDvCE= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4 h1:ydJNl0ENAG67pFbB+9tfhiL2pYqLhfoaZFw/cjLhY4A= +golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220727055044-e65921a090b8 h1:dyU22nBWzrmTQxtNrr4dzVOvaw35nUYE279vF9UmsI8= +golang.org/x/sys v0.0.0-20220727055044-e65921a090b8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI= +gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=