Skip to content

fix: forward logs directly when invocation is over #613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error {
c.logger.Debugf("Received something from '%s' without APMData", data.AgentInfo)
continue
}
if err := c.forwardAgentData(ctx, data); err != nil {
if err := c.ForwardAgentData(ctx, data); err != nil {
return err
}
if lambdaDataChan == nil {
Expand All @@ -73,7 +73,7 @@ func (c *Client) ForwardApmData(ctx context.Context) error {
c.logger.Debug("Assigned Lambda data channel")
}
case data := <-lambdaDataChan:
if err := c.forwardLambdaData(ctx, data); err != nil {
if err := c.ForwardLambdaData(ctx, data); err != nil {
return err
}
}
Expand All @@ -91,7 +91,7 @@ func (c *Client) FlushAPMData(ctx context.Context) {
// Flush agent data first to make sure metadata is available if possible
for i := len(c.AgentDataChannel); i > 0; i-- {
data := <-c.AgentDataChannel
if err := c.forwardAgentData(ctx, data); err != nil {
if err := c.ForwardAgentData(ctx, data); err != nil {
c.logger.Errorf("Error sending to APM Server, skipping: %v", err)
}
}
Expand All @@ -107,7 +107,7 @@ func (c *Client) FlushAPMData(ctx context.Context) {
for {
select {
case apmData := <-c.LambdaDataChannel:
if err := c.forwardLambdaData(ctx, apmData); err != nil {
if err := c.ForwardLambdaData(ctx, apmData); err != nil {
c.logger.Errorf("Error sending to APM server, skipping: %v", err)
}
case <-ctx.Done():
Expand Down Expand Up @@ -349,7 +349,7 @@ func (c *Client) WaitForFlush() <-chan struct{} {
return c.flushCh
}

func (c *Client) forwardAgentData(ctx context.Context, apmData accumulator.APMData) error {
func (c *Client) ForwardAgentData(ctx context.Context, apmData accumulator.APMData) error {
if err := c.batch.AddAgentData(apmData); err != nil {
c.logger.Warnf("Dropping agent data due to error: %v", err)
}
Expand All @@ -359,7 +359,7 @@ func (c *Client) forwardAgentData(ctx context.Context, apmData accumulator.APMDa
return nil
}

func (c *Client) forwardLambdaData(ctx context.Context, data []byte) error {
func (c *Client) ForwardLambdaData(ctx context.Context, data []byte) error {
if err := c.batch.AddLambdaData(data); err != nil {
c.logger.Warnf("Dropping lambda data due to error: %v", err)
}
Expand Down
12 changes: 9 additions & 3 deletions app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (app *App) Run(ctx context.Context) error {
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
if app.logsClient != nil {
// Flush buffered logs if any
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, true)
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.ForwardLambdaData, true)
}
// Since we have waited for the processEvent loop to finish we
// already have received all the data we can from the agent. So, we
Expand Down Expand Up @@ -131,7 +131,7 @@ func (app *App) Run(ctx context.Context) error {
flushCtx, cancel := context.WithCancel(ctx)
if app.logsClient != nil {
// Flush buffered logs if any
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.LambdaDataChannel, false)
app.logsClient.FlushData(ctx, event.RequestID, event.InvokedFunctionArn, app.apmClient.ForwardLambdaData, false)
}
// Flush APM data now that the function invocation has completed
app.apmClient.FlushAPMData(flushCtx)
Expand Down Expand Up @@ -213,7 +213,13 @@ func (app *App) processEvent(
invocationCtx,
event.RequestID,
event.InvokedFunctionArn,
app.apmClient.LambdaDataChannel,
func(ctx context.Context, b []byte) error {
select {
case app.apmClient.LambdaDataChannel <- b:
case <-ctx.Done():
}
return nil
},
event.EventType == extension.Shutdown,
)
}()
Expand Down
22 changes: 11 additions & 11 deletions logsapi/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// LogEventType represents the log type that is received in the log messages
type LogEventType string

type Forwarder func(context.Context, []byte) error

const (
// PlatformRuntimeDone event is sent when lambda function is finished it's execution
PlatformRuntimeDone LogEventType = "platform.runtimeDone"
Expand Down Expand Up @@ -60,13 +62,13 @@ func (lc *Client) ProcessLogs(
ctx context.Context,
requestID string,
invokedFnArn string,
dataChan chan []byte,
forwardFn Forwarder,
isShutdown bool,
) {
for {
select {
case logEvent := <-lc.logsChannel:
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit {
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, forwardFn, isShutdown); shouldExit {
return
}
case <-ctx.Done():
Expand All @@ -80,14 +82,14 @@ func (lc *Client) FlushData(
ctx context.Context,
requestID string,
invokedFnArn string,
dataChan chan []byte,
forwardFn Forwarder,
isShutdown bool,
) {
lc.logger.Infof("flushing %d buffered logs", len(lc.logsChannel))
for {
select {
case logEvent := <-lc.logsChannel:
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, dataChan, isShutdown); shouldExit {
if shouldExit := lc.handleEvent(ctx, logEvent, requestID, invokedFnArn, forwardFn, isShutdown); shouldExit {
return
}
case <-ctx.Done():
Expand All @@ -106,7 +108,7 @@ func (lc *Client) handleEvent(ctx context.Context,
logEvent LogEvent,
requestID string,
invokedFnArn string,
dataChan chan []byte,
forwardFn Forwarder,
isShutdown bool,
) bool {
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
Expand Down Expand Up @@ -139,9 +141,8 @@ func (lc *Client) handleEvent(ctx context.Context,
if err != nil {
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
} else {
select {
case dataChan <- processedMetrics:
case <-ctx.Done():
if err := forwardFn(ctx, processedMetrics); err != nil {
lc.logger.Errorf("Error forwarding Lambda platform metrics: %v", err)
}
}
}
Expand All @@ -166,9 +167,8 @@ func (lc *Client) handleEvent(ctx context.Context,
if err != nil {
lc.logger.Warnf("Error processing function log : %v", err)
} else {
select {
case dataChan <- processedLog:
case <-ctx.Done():
if err := forwardFn(ctx, processedLog); err != nil {
lc.logger.Warnf("Error forwarding function log : %v", err)
}
}
}
Expand Down
Loading