Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 171 additions & 68 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ type logQueuer struct {
loggerTTL time.Duration
loggers map[string]agentLoggerLifecycle
logCache logCache

retries map[string]*retryState
Comment on lines 408 to +411
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also know this wasn't you, but could you put a comment explaining what the key is? E.g. // Map of agent token to retry state

}

func (l *logQueuer) work(ctx context.Context) {
Expand All @@ -427,87 +429,120 @@ func (l *logQueuer) work(ctx context.Context) {
}
}

func (l *logQueuer) newLogger(ctx context.Context, log agentLog, queuedLogs []agentsdk.Log) (agentLoggerLifecycle, error) {
client := agentsdk.New(l.coderURL)
client.SetSessionToken(log.agentToken)
logger := l.logger.With(slog.F("resource_name", log.resourceName))
client.SDK.SetLogger(logger)

_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
ID: sourceUUID,
Icon: "/icon/k8s.png",
DisplayName: "Kubernetes",
})
if err != nil {
// This shouldn't fail sending the log, as it only affects how they
// appear.
logger.Error(ctx, "post log source", slog.Error(err))
l.scheduleRetry(ctx, log.agentToken)
return agentLoggerLifecycle{}, err
Comment on lines +444 to +448
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we would entirely ignore this error (other than logging it) but now we're forcing a retry. I agree that this way is better now that we have proper retries, but the comment is wrong now.

}

ls := agentsdk.NewLogSender(logger)
sl := ls.GetScriptLogger(sourceUUID)

gracefulCtx, gracefulCancel := context.WithCancel(context.Background())

// connect to Agent v2.0 API, since we don't need features added later.
// This maximizes compatibility.
arpc, err := client.ConnectRPC20(gracefulCtx)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
l.scheduleRetry(ctx, log.agentToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also call l.scheduleRetry in processLog on line 531 when this method fails. Probably should just have one or the other (my vote is for the processLog one)

return agentLoggerLifecycle{}, err
}
go func() {
err := ls.SendLoop(gracefulCtx, arpc)
// if the send loop exits on its own without the context
// canceling, timeout the logger and force it to recreate.
if err != nil && ctx.Err() == nil {
l.loggerTimeout(log.agentToken)
}
}()

closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
logger.Info(ctx, "logger timeout firing")
l.loggerTimeout(log.agentToken)
})
lifecycle := agentLoggerLifecycle{
scriptLogger: sl,
close: func() {
// We could be stopping for reasons other than the timeout. If
// so, stop the timer.
closeTimer.Stop()
defer gracefulCancel()
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
defer timeout.Stop()
logger.Info(ctx, "logger closing")

if err := sl.Flush(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while flushing")
return
}

if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
}

_ = arpc.DRPCConn().Close()
client.SDK.HTTPClient.CloseIdleConnections()
Comment on lines +500 to +501
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this wasn't your code, but maybe these should be deferred at the top or something? They could be skipped if the flush fails for whatever reason (or a panic)

},
}
lifecycle.closeTimer = closeTimer
return lifecycle, nil
}

func (l *logQueuer) processLog(ctx context.Context, log agentLog) {
l.mu.Lock()
defer l.mu.Unlock()
queuedLogs := l.logCache.push(log)

queuedLogs := l.logCache.get(log.agentToken)
if isAgentLogEmpty(log) {
if queuedLogs == nil {
return
}
} else {
queuedLogs = l.logCache.push(log)
}

lgr, ok := l.loggers[log.agentToken]
if !ok {
client := agentsdk.New(l.coderURL)
client.SetSessionToken(log.agentToken)
logger := l.logger.With(slog.F("resource_name", log.resourceName))
client.SDK.SetLogger(logger)

_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{
ID: sourceUUID,
Icon: "/icon/k8s.png",
DisplayName: "Kubernetes",
})
if err != nil {
// This shouldn't fail sending the log, as it only affects how they
// appear.
logger.Error(ctx, "post log source", slog.Error(err))
// skip if we're in a retry cooldown window
if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil {
return
}

ls := agentsdk.NewLogSender(logger)
sl := ls.GetScriptLogger(sourceUUID)

gracefulCtx, gracefulCancel := context.WithCancel(context.Background())

// connect to Agent v2.0 API, since we don't need features added later.
// This maximizes compatibility.
arpc, err := client.ConnectRPC20(gracefulCtx)
var err error
lgr, err = l.newLogger(ctx, log, queuedLogs)
if err != nil {
logger.Error(ctx, "drpc connect", slog.Error(err))
gracefulCancel()
l.scheduleRetry(ctx, log.agentToken)
return
}
go func() {
err := ls.SendLoop(gracefulCtx, arpc)
// if the send loop exits on its own without the context
// canceling, timeout the logger and force it to recreate.
if err != nil && ctx.Err() == nil {
l.loggerTimeout(log.agentToken)
}
}()

closeTimer := l.clock.AfterFunc(l.loggerTTL, func() {
logger.Info(ctx, "logger timeout firing")
l.loggerTimeout(log.agentToken)
})
lifecycle := agentLoggerLifecycle{
scriptLogger: sl,
close: func() {
// We could be stopping for reasons other than the timeout. If
// so, stop the timer.
closeTimer.Stop()
defer gracefulCancel()
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel)
defer timeout.Stop()
logger.Info(ctx, "logger closing")

if err := sl.Flush(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while flushing")
return
}

if err := ls.WaitUntilEmpty(gracefulCtx); err != nil {
// ctx err
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty")
}

_ = arpc.DRPCConn().Close()
client.SDK.HTTPClient.CloseIdleConnections()
},
}
lifecycle.closeTimer = closeTimer
l.loggers[log.agentToken] = lifecycle
lgr = lifecycle
l.loggers[log.agentToken] = lgr
}

lgr.resetCloseTimer(l.loggerTTL)
_ = lgr.scriptLogger.Send(ctx, queuedLogs...)
if len(queuedLogs) == 0 {
return
}
if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil {
l.scheduleRetry(ctx, log.agentToken)
return
}
l.clearRetry(log.agentToken)
l.logCache.delete(log.agentToken)
}

Expand All @@ -518,6 +553,8 @@ func (l *logQueuer) processDelete(log agentLog) {
delete(l.loggers, log.agentToken)

}
l.clearRetry(log.agentToken)
l.logCache.delete(log.agentToken)
l.mu.Unlock()

if ok {
Expand Down Expand Up @@ -549,6 +586,60 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) {
}
}

// retryState tracks exponential backoff for an agent token.
type retryState struct {
delay time.Duration
timer *quartz.Timer
}

func (l *logQueuer) scheduleRetry(ctx context.Context, token string) {
if l.retries == nil {
l.retries = make(map[string]*retryState)
}

rs := l.retries[token]
if rs == nil {
rs = &retryState{delay: time.Second}
l.retries[token] = rs
}

if rs.timer != nil {
return
}

if rs.delay < time.Second {
rs.delay = time.Second
} else if rs.delay > 30*time.Second {
rs.delay = 30 * time.Second
}

l.logger.Info(ctx, "scheduling retry", slog.F("delay", rs.delay.String()))

rs.timer = l.clock.AfterFunc(rs.delay, func() {
l.mu.Lock()
if cur := l.retries[token]; cur != nil {
cur.timer = nil
}
l.mu.Unlock()

l.q <- agentLog{op: opLog, agentToken: token}
})

rs.delay *= 2
if rs.delay > 30*time.Second {
rs.delay = 30 * time.Second
}
}

func (l *logQueuer) clearRetry(token string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be called clearRetryLocked with a comment mentioning that the lock must be held

if rs := l.retries[token]; rs != nil {
if rs.timer != nil {
rs.timer.Stop()
}
delete(l.retries, token)
}
}

func newColor(value ...color.Attribute) *color.Color {
c := color.New(value...)
c.EnableColor()
Expand All @@ -572,3 +663,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log {
func (l *logCache) delete(token string) {
delete(l.logs, token)
}

func (l *logCache) get(token string) []agentsdk.Log {
logs, ok := l.logs[token]
if !ok {
return nil
}
return logs
}

func isAgentLogEmpty(log agentLog) bool {
return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero()
}
Loading
Loading