Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Off CPU profiling #196

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions cli_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/peterbourgon/ff/v3"

"go.opentelemetry.io/ebpf-profiler/internal/controller"
"go.opentelemetry.io/ebpf-profiler/support"
"go.opentelemetry.io/ebpf-profiler/tracer"
)

Expand All @@ -24,6 +25,7 @@ const (
defaultProbabilisticThreshold = tracer.ProbabilisticThresholdMax
defaultProbabilisticInterval = 1 * time.Minute
defaultArgSendErrorFrames = false
defaultOffCPUThreshold = support.OffCPUThresholdMax

// This is the X in 2^(n + x) where n is the default hardcoded map size value
defaultArgMapScaleFactor = 0
Expand Down Expand Up @@ -61,6 +63,11 @@ var (
"If zero, monotonic-realtime clock sync will be performed once, " +
"on agent startup, but not periodically."
sendErrorFramesHelp = "Send error frames (devfiler only, breaks Kibana)"
offCPUThresholdHelp = fmt.Sprintf("If set to a value between 1 and %d will enable "+
"off-cpu profiling: Every time an off-cpu entry point is hit, a random number between "+
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe more informative to be more specific:

Suggested change
"off-cpu profiling: Every time an off-cpu entry point is hit, a random number between "+
"off-cpu profiling: Every time sched_switch is called in the kernel, a random number between "+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this implementation detail around sched_switch should be part of a help message. As users can not change sched_switch to something different as entry point, I think, this implementation detail should not matter.

"0 and %d is chosen. If the given threshold is greater than this random number, the "+
"off-cpu trace is collected and reported.",
support.OffCPUThresholdMax-1, support.OffCPUThresholdMax-1)
)

// Package-scope variable, so that conditionally compiled other components can refer
Expand Down Expand Up @@ -114,6 +121,9 @@ func parseArgs() (*controller.Config, error) {
fs.BoolVar(&args.VerboseMode, "verbose", false, verboseModeHelp)
fs.BoolVar(&args.Version, "version", false, versionHelp)

fs.UintVar(&args.OffCPUThreshold, "off-cpu-threshold",
defaultOffCPUThreshold, offCPUThresholdHelp)

fs.Usage = func() {
fs.PrintDefaults()
}
Expand Down
2 changes: 2 additions & 0 deletions host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Trace struct {
KTime times.KTime
PID libpf.PID
TID libpf.PID
Origin libpf.Origin
OffTime uint64 // Time a task was off-cpu in nanoseconds.
APMTraceID libpf.APMTraceID
APMTransactionID libpf.APMTransactionID
CPU int
Expand Down
3 changes: 2 additions & 1 deletion internal/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type Config struct {
// HostName is the name of the host.
HostName string
// IPAddress is the IP address of the host that sends data to CollAgentAddr.
IPAddress string
IPAddress string
OffCPUThreshold uint

Reporter reporter.Reporter

Expand Down
8 changes: 8 additions & 0 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/internal/helpers"
"go.opentelemetry.io/ebpf-profiler/metrics"
"go.opentelemetry.io/ebpf-profiler/reporter"
"go.opentelemetry.io/ebpf-profiler/support"
"go.opentelemetry.io/ebpf-profiler/times"
"go.opentelemetry.io/ebpf-profiler/tracehandler"
"go.opentelemetry.io/ebpf-profiler/tracer"
Expand Down Expand Up @@ -124,6 +125,13 @@ func (c *Controller) Start(ctx context.Context) error {
}
log.Info("Attached tracer program")

if c.config.OffCPUThreshold < support.OffCPUThresholdMax {
if err := trc.StartOffCPUProfiling(); err != nil {
return fmt.Errorf("failed to start off-cpu profiling: %v", err)
}
log.Printf("Enabled off-cpu profiling")
}

if c.config.ProbabilisticThreshold < tracer.ProbabilisticThresholdMax {
trc.StartProbabilisticProfiling(ctx)
log.Printf("Enabled probabilistic profiling")
Expand Down
3 changes: 3 additions & 0 deletions libpf/libpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ type Void struct{}
// source line numbers associated with offsets in native code, or for source line numbers in
// interpreted code.
type SourceLineno uint64

// Origin determines the source of a trace.
type Origin int
12 changes: 12 additions & 0 deletions libpf/symbol.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package libpf // import "go.opentelemetry.io/ebpf-profiler/libpf"
import (
"fmt"
"sort"
"strings"
)

// SymbolValue represents the value associated with a symbol, e.g. either an
Expand Down Expand Up @@ -81,6 +82,17 @@ func (symmap *SymbolMap) LookupSymbol(symbolName SymbolName) (*Symbol, error) {
return nil, fmt.Errorf("symbol %v not present in map", symbolName)
}

// LookupSymbolByPrefix loops over all known symbols and returns the first symbol
// that starts with the given prefix.
func (symmap *SymbolMap) LookupSymbolByPrefix(prefix string) (*Symbol, error) {
for name, sym := range symmap.nameToSymbol {
if strings.HasPrefix(string(name), prefix) {
return sym, nil
}
}
return nil, fmt.Errorf("no symbol present that starts with '%s'", prefix)
}

// LookupSymbolAddress returns the address of a symbol.
// Returns SymbolValueInvalid and error if not found.
func (symmap *SymbolMap) LookupSymbolAddress(symbolName SymbolName) (SymbolValue, error) {
Expand Down
21 changes: 15 additions & 6 deletions reporter/base_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/pdata"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/samples"
"go.opentelemetry.io/ebpf-profiler/support"
)

// baseReporter encapsulates shared behavior between all the available reporters.
Expand All @@ -35,7 +36,7 @@ type baseReporter struct {
cgroupv2ID *lru.SyncedLRU[libpf.PID, string]

// traceEvents stores reported trace events (trace metadata with frames and counts)
traceEvents xsync.RWMutex[map[samples.TraceAndMetaKey]*samples.TraceEvents]
traceEvents xsync.RWMutex[map[libpf.Origin]samples.KeyToEventMapping]

// hostmetadata stores metadata that is sent out with every request.
hostmetadata *lru.SyncedLRU[string, string]
Expand Down Expand Up @@ -97,8 +98,11 @@ func (*baseReporter) ReportMetrics(_ uint32, _ []uint32, _ []int64) {}
func (*baseReporter) SupportsReportTraceEvent() bool { return true }

func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta) {
traceEventsMap := b.traceEvents.WLock()
defer b.traceEvents.WUnlock(&traceEventsMap)
if meta.Origin != support.TraceOriginSampling && meta.Origin != support.TraceOriginOffCPU {
// At the moment only on-CPU and off-CPU traces are reported.
log.Errorf("Skip reporting trace for unexpected %d origin", meta.Origin)
return
}

var extraMeta any
if b.cfg.ExtraSampleAttrProd != nil {
Expand All @@ -121,20 +125,25 @@ func (b *baseReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta
ExtraMeta: extraMeta,
}

if events, exists := (*traceEventsMap)[key]; exists {
traceEventsMap := b.traceEvents.WLock()
defer b.traceEvents.WUnlock(&traceEventsMap)

if events, exists := (*traceEventsMap)[meta.Origin][key]; exists {
events.Timestamps = append(events.Timestamps, uint64(meta.Timestamp))
(*traceEventsMap)[key] = events
events.OffTimes = append(events.OffTimes, meta.OffTime)
(*traceEventsMap)[meta.Origin][key] = events
return
}

(*traceEventsMap)[key] = &samples.TraceEvents{
(*traceEventsMap)[meta.Origin][key] = &samples.TraceEvents{
Files: trace.Files,
Linenos: trace.Linenos,
FrameTypes: trace.FrameTypes,
MappingStarts: trace.MappingStart,
MappingEnds: trace.MappingEnd,
MappingFileOffsets: trace.MappingFileOffsets,
Timestamps: []uint64{uint64(meta.Timestamp)},
OffTimes: []uint64{meta.OffTime},
}
}

Expand Down
21 changes: 13 additions & 8 deletions reporter/collector_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/pdata"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/samples"
"go.opentelemetry.io/ebpf-profiler/support"
)

// Assert that we implement the full Reporter interface.
Expand Down Expand Up @@ -59,16 +60,20 @@ func NewCollector(cfg *Config, nextConsumer consumerprofiles.Profiles) (*Collect
return nil, err
}

originsMap := make(map[libpf.Origin]samples.KeyToEventMapping, 2)
for _, origin := range []libpf.Origin{support.TraceOriginSampling,
support.TraceOriginOffCPU} {
originsMap[origin] = make(samples.KeyToEventMapping)
}

return &CollectorReporter{
baseReporter: &baseReporter{
cfg: cfg,
name: cfg.Name,
version: cfg.Version,
pdata: data,
cgroupv2ID: cgroupv2ID,
traceEvents: xsync.NewRWMutex(
map[samples.TraceAndMetaKey]*samples.TraceEvents{},
),
cfg: cfg,
name: cfg.Name,
version: cfg.Version,
pdata: data,
cgroupv2ID: cgroupv2ID,
traceEvents: xsync.NewRWMutex(originsMap),
hostmetadata: hostmetadata,
runLoop: &runLoop{
stopSignal: make(chan libpf.Void),
Expand Down
48 changes: 35 additions & 13 deletions reporter/internal/pdata/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/samples"
"go.opentelemetry.io/ebpf-profiler/support"
)

const (
Expand All @@ -24,14 +25,16 @@ const (

// Generate generates a pdata request out of internal profiles data, to be
// exported.
func (p Pdata) Generate(events map[samples.TraceAndMetaKey]*samples.TraceEvents) pprofile.Profiles {
func (p Pdata) Generate(events map[libpf.Origin]samples.KeyToEventMapping) pprofile.Profiles {
profiles := pprofile.NewProfiles()
rp := profiles.ResourceProfiles().AppendEmpty()
sp := rp.ScopeProfiles().AppendEmpty()
prof := sp.Profiles().AppendEmpty()
prof.SetProfileID(pprofile.ProfileID(mkProfileID()))
p.setProfile(events, prof)

for _, origin := range []libpf.Origin{support.TraceOriginSampling,
support.TraceOriginOffCPU} {
prof := sp.Profiles().AppendEmpty()
prof.SetProfileID(pprofile.ProfileID(mkProfileID()))
p.setProfile(origin, events[origin], prof)
}
return profiles
}

Expand All @@ -48,6 +51,7 @@ func mkProfileID() []byte {
// setProfile sets the data an OTLP profile with all collected samples up to
// this moment.
func (p *Pdata) setProfile(
origin libpf.Origin,
events map[samples.TraceAndMetaKey]*samples.TraceEvents,
profile pprofile.Profile,
) {
Expand All @@ -62,13 +66,23 @@ func (p *Pdata) setProfile(
funcMap[samples.FuncInfo{Name: "", FileName: ""}] = 0

st := profile.SampleType().AppendEmpty()
st.SetTypeStrindex(getStringMapIndex(stringMap, "samples"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "count"))

pt := profile.PeriodType()
pt.SetTypeStrindex(getStringMapIndex(stringMap, "cpu"))
pt.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))
profile.SetPeriod(1e9 / int64(p.samplesPerSecond))
switch origin {
case support.TraceOriginSampling:
st.SetTypeStrindex(getStringMapIndex(stringMap, "samples"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "count"))

pt := profile.PeriodType()
pt.SetTypeStrindex(getStringMapIndex(stringMap, "cpu"))
pt.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))

profile.SetPeriod(1e9 / int64(p.samplesPerSecond))
case support.TraceOriginOffCPU:
st.SetTypeStrindex(getStringMapIndex(stringMap, "events"))
st.SetUnitStrindex(getStringMapIndex(stringMap, "nanoseconds"))
default:
log.Errorf("Generating profile for unsupported origin %d", origin)
return
}

// Temporary lookup to reference existing Mappings.
fileIDtoMapping := make(map[libpf.FileID]int32)
Expand All @@ -85,7 +99,15 @@ func (p *Pdata) setProfile(
endTS = pcommon.Timestamp(traceInfo.Timestamps[len(traceInfo.Timestamps)-1])

sample.TimestampsUnixNano().FromRaw(traceInfo.Timestamps)
sample.Value().Append(1)

switch origin {
case support.TraceOriginSampling:
sample.Value().Append(1)
case support.TraceOriginOffCPU:
for _, offTime := range traceInfo.OffTimes {
sample.Value().Append(int64(offTime))
}
}

// Walk every frame of the trace.
for i := range traceInfo.FrameTypes {
Expand Down
49 changes: 26 additions & 23 deletions reporter/internal/pdata/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/ebpf-profiler/libpf"
"go.opentelemetry.io/ebpf-profiler/libpf/xsync"
"go.opentelemetry.io/ebpf-profiler/reporter/internal/samples"
"go.opentelemetry.io/ebpf-profiler/support"
)

func TestGetStringMapIndex(t *testing.T) {
Expand Down Expand Up @@ -168,15 +169,15 @@ func TestFunctionTableOrder(t *testing.T) {
name string
executables map[libpf.FileID]samples.ExecInfo
frames map[libpf.FileID]map[libpf.AddressOrLineno]samples.SourceInfo
events map[samples.TraceAndMetaKey]*samples.TraceEvents
events map[libpf.Origin]samples.KeyToEventMapping

wantFunctionTable []string
}{
{
name: "with no executables",
executables: map[libpf.FileID]samples.ExecInfo{},
frames: map[libpf.FileID]map[libpf.AddressOrLineno]samples.SourceInfo{},
events: map[samples.TraceAndMetaKey]*samples.TraceEvents{},
events: map[libpf.Origin]samples.KeyToEventMapping{},
wantFunctionTable: []string{""},
}, {
name: "single executable",
Expand All @@ -192,27 +193,29 @@ func TestFunctionTableOrder(t *testing.T) {
libpf.AddressOrLineno(0x4ef): {FunctionName: "func5"},
},
},
events: map[samples.TraceAndMetaKey]*samples.TraceEvents{
{}: {
Files: []libpf.FileID{
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
events: map[libpf.Origin]samples.KeyToEventMapping{
support.TraceOriginSampling: map[samples.TraceAndMetaKey]*samples.TraceEvents{
{}: {
Files: []libpf.FileID{
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
libpf.NewFileID(2, 3),
},
Linenos: []libpf.AddressOrLineno{
libpf.AddressOrLineno(0xef),
libpf.AddressOrLineno(0x1ef),
libpf.AddressOrLineno(0x2ef),
libpf.AddressOrLineno(0x3ef),
libpf.AddressOrLineno(0x4ef),
},
FrameTypes: slices.Repeat([]libpf.FrameType{libpf.KernelFrame}, 5),
MappingStarts: slices.Repeat([]libpf.Address{libpf.Address(0)}, 5),
MappingEnds: slices.Repeat([]libpf.Address{libpf.Address(0)}, 5),
MappingFileOffsets: slices.Repeat([]uint64{0}, 5),
Timestamps: []uint64{1, 2, 3, 4, 5},
},
Linenos: []libpf.AddressOrLineno{
libpf.AddressOrLineno(0xef),
libpf.AddressOrLineno(0x1ef),
libpf.AddressOrLineno(0x2ef),
libpf.AddressOrLineno(0x3ef),
libpf.AddressOrLineno(0x4ef),
},
FrameTypes: slices.Repeat([]libpf.FrameType{libpf.KernelFrame}, 5),
MappingStarts: slices.Repeat([]libpf.Address{libpf.Address(0)}, 5),
MappingEnds: slices.Repeat([]libpf.Address{libpf.Address(0)}, 5),
MappingFileOffsets: slices.Repeat([]uint64{0}, 5),
Timestamps: []uint64{1, 2, 3, 4, 5},
},
},
wantFunctionTable: []string{
Expand All @@ -233,7 +236,7 @@ func TestFunctionTableOrder(t *testing.T) {
res := d.Generate(tt.events)
require.Equal(t, 1, res.ResourceProfiles().Len())
require.Equal(t, 1, res.ResourceProfiles().At(0).ScopeProfiles().Len())
require.Equal(t, 1, res.ResourceProfiles().At(0).ScopeProfiles().At(0).Profiles().Len())
require.Equal(t, 2, res.ResourceProfiles().At(0).ScopeProfiles().At(0).Profiles().Len())
p := res.ResourceProfiles().At(0).ScopeProfiles().At(0).Profiles().At(0)
require.Equal(t, len(tt.wantFunctionTable), p.FunctionTable().Len())
for i := 0; i < p.FunctionTable().Len(); i++ {
Expand Down
Loading
Loading