diff --git a/host/host.go b/host/host.go index c12a02a3..f4c84c99 100644 --- a/host/host.go +++ b/host/host.go @@ -53,6 +53,8 @@ type Trace struct { KTime times.KTime PID libpf.PID TID libpf.PID + Origin int + OffTime uint64 // Time a task was off-cpu. APMTraceID libpf.APMTraceID APMTransactionID libpf.APMTransactionID } diff --git a/reporter/iface.go b/reporter/iface.go index 818d5224..f612f121 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -29,6 +29,8 @@ type TraceEventMeta struct { Comm string APMServiceName string PID, TID libpf.PID + Origin int + OffTime uint64 } type TraceReporter interface { diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index ec08578e..3d613388 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/libpf/xsync" + "go.opentelemetry.io/ebpf-profiler/support" ) var ( @@ -81,6 +82,7 @@ type traceEvents struct { mappingEnds []libpf.Address mappingFileOffsets []uint64 timestamps []uint64 // in nanoseconds + offTimes []uint64 // in nanoseconds } // attrKeyValue is a helper to populate Profile.attribute_table. @@ -89,6 +91,8 @@ type attrKeyValue struct { value string } +type samplesMap map[traceAndMetaKey]*traceEvents + // OTLPReporter receives and transforms information to be OTLP/profiles compliant. type OTLPReporter struct { // name is the ScopeProfile's name. @@ -122,8 +126,9 @@ type OTLPReporter struct { // frames maps frame information to its source location. frames *lru.SyncedLRU[libpf.FileID, *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]] - // traceEvents stores reported trace events (trace metadata with frames and counts) - traceEvents xsync.RWMutex[map[traceAndMetaKey]*traceEvents] + // traceEvents stores reported trace events (trace metadata with frames and counts) from + // various origins. + traceEvents xsync.RWMutex[map[int]samplesMap] // pkgGRPCOperationTimeout sets the time limit for GRPC requests. pkgGRPCOperationTimeout time.Duration @@ -155,8 +160,11 @@ func (r *OTLPReporter) SupportsReportTraceEvent() bool { return true } // ReportTraceEvent enqueues reported trace events for the OTLP reporter. func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta) { - traceEventsMap := r.traceEvents.WLock() - defer r.traceEvents.WUnlock(&traceEventsMap) + if meta.Origin != support.TraceOriginSampling && meta.Origin != support.TraceOriginOffCPU { + // At the moment only on-CPU and off-CPU traces are reported. + log.Errorf("Skip reporting trace for unexpected %d origin", meta.Origin) + return + } containerID, err := r.lookupCgroupv2(meta.PID) if err != nil { @@ -171,13 +179,17 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta containerID: containerID, } - if events, exists := (*traceEventsMap)[key]; exists { + traceEventsMap := r.traceEvents.WLock() + defer r.traceEvents.WUnlock(&traceEventsMap) + + if events, exists := (*traceEventsMap)[meta.Origin][key]; exists { events.timestamps = append(events.timestamps, uint64(meta.Timestamp)) - (*traceEventsMap)[key] = events + events.offTimes = append(events.offTimes, uint64(meta.Timestamp)) + (*traceEventsMap)[meta.Origin][key] = events return } - (*traceEventsMap)[key] = &traceEvents{ + (*traceEventsMap)[meta.Origin][key] = &traceEvents{ files: trace.Files, linenos: trace.Linenos, frameTypes: trace.FrameTypes, @@ -185,6 +197,7 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta mappingEnds: trace.MappingEnd, mappingFileOffsets: trace.MappingFileOffsets, timestamps: []uint64{uint64(meta.Timestamp)}, + offTimes: []uint64{meta.OffTime}, } } @@ -331,6 +344,12 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { return nil, err } + originsMap := make(map[int]samplesMap, 2) + for _, origin := range []int{support.TraceOriginSampling, + support.TraceOriginOffCPU} { + originsMap[origin] = make(samplesMap) + } + r := &OTLPReporter{ name: cfg.Name, version: cfg.Version, @@ -346,7 +365,7 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { executables: executables, frames: frames, hostmetadata: hostmetadata, - traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}), + traceEvents: xsync.NewRWMutex(originsMap), cgroupv2ID: cgroupv2ID, } @@ -398,24 +417,33 @@ func Start(mainCtx context.Context, cfg *Config) (Reporter, error) { // reportOTLPProfile creates and sends out an OTLP profile. func (r *OTLPReporter) reportOTLPProfile(ctx context.Context) error { - profile, startTS, endTS := r.getProfile() + var pc = []*profiles.ProfileContainer{} + + for _, kind := range []int{support.TraceOriginSampling, support.TraceOriginOffCPU} { + profile, startTS, endTS := r.getProfile(kind) + + if len(profile.Sample) == 0 { + log.Debugf("No samples for %d", kind) + continue + } + + pc = append(pc, &profiles.ProfileContainer{ + ProfileId: mkProfileID(), + StartTimeUnixNano: startTS, + EndTimeUnixNano: endTS, + // Attributes - Optional element we do not use. + // DroppedAttributesCount - Optional element we do not use. + // OriginalPayloadFormat - Optional element we do not use. + // OriginalPayload - Optional element we do not use. + Profile: profile, + }) + } - if len(profile.Sample) == 0 { + if len(pc) == 0 { log.Debugf("Skip sending of OTLP profile with no samples") return nil } - pc := []*profiles.ProfileContainer{{ - ProfileId: mkProfileID(), - StartTimeUnixNano: startTS, - EndTimeUnixNano: endTS, - // Attributes - Optional element we do not use. - // DroppedAttributesCount - Optional element we do not use. - // OriginalPayloadFormat - Optional element we do not use. - // OriginalPayload - Optional element we do not use. - Profile: profile, - }} - scopeProfiles := []*profiles.ScopeProfiles{{ Profiles: pc, Scope: &common.InstrumentationScope{ @@ -486,54 +514,40 @@ func (r *OTLPReporter) getResource() *resource.Resource { } } -// getProfile returns an OTLP profile containing all collected samples up to this moment. -func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS uint64) { - traceEvents := r.traceEvents.WLock() - samples := maps.Clone(*traceEvents) - clear(*traceEvents) - r.traceEvents.WUnlock(&traceEvents) - - // stringMap is a temporary helper that will build the StringTable. - // By specification, the first element should be empty. - stringMap := make(map[string]uint32) - stringMap[""] = 0 - - // funcMap is a temporary helper that will build the Function array - // in profile and make sure information is deduplicated. - funcMap := make(map[funcInfo]uint64) - funcMap[funcInfo{name: "", fileName: ""}] = 0 - - // attributeMap is a temporary helper that maps attribute values to - // their respective indices. - // This is to ensure that AttributeTable does not contain duplicates. - attributeMap := make(map[string]uint64) +// getProfile returns an OTLP profile containing all collected traces up to this moment. +func (r *OTLPReporter) getProfile(origin int) (profile *profiles.Profile, startTS, endTS uint64) { + samples := r.getEventsFromOrigin(origin) + stringMap, funcMap, attributeMap := getHelperMaps() numSamples := len(samples) - profile = &profiles.Profile{ - // SampleType - Next step: Figure out the correct SampleType. - Sample: make([]*profiles.Sample, 0, numSamples), - SampleType: []*profiles.ValueType{{ - Type: int64(getStringMapIndex(stringMap, "samples")), - Unit: int64(getStringMapIndex(stringMap, "count")), - }}, - PeriodType: &profiles.ValueType{ - Type: int64(getStringMapIndex(stringMap, "cpu")), - Unit: int64(getStringMapIndex(stringMap, "nanoseconds")), - }, - Period: 1e9 / int64(r.samplesPerSecond), - // AttributeUnits - Optional element we do not use. - // LinkTable - Optional element we do not use. - // DropFrames - Optional element we do not use. - // KeepFrames - Optional element we do not use. - // Comment - Optional element we do not use. - // DefaultSampleType - Optional element we do not use. + switch origin { + case support.TraceOriginSampling: + profile = &profiles.Profile{ + Sample: make([]*profiles.Sample, 0, numSamples), + SampleType: []*profiles.ValueType{{ + Type: int64(getStringMapIndex(stringMap, "samples")), + Unit: int64(getStringMapIndex(stringMap, "count")), + }}, + PeriodType: &profiles.ValueType{ + Type: int64(getStringMapIndex(stringMap, "cpu")), + Unit: int64(getStringMapIndex(stringMap, "nanoseconds")), + }, + Period: 1e9 / int64(r.samplesPerSecond), + } + case support.TraceOriginOffCPU: + profile = &profiles.Profile{ + Sample: make([]*profiles.Sample, 0, numSamples), + SampleType: []*profiles.ValueType{{ + Type: int64(getStringMapIndex(stringMap, "events")), + Unit: int64(getStringMapIndex(stringMap, "nanoseconds")), + }}, + } + default: + return &profiles.Profile{}, 0, 0 } locationIndex := uint64(0) - // Temporary lookup to reference existing Mappings. - fileIDtoMapping := make(map[libpf.FileID]uint64) - for traceKey, traceInfo := range samples { sample := &profiles.Sample{} sample.LocationsStartIndex = locationIndex @@ -546,108 +560,18 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u endTS = traceInfo.timestamps[len(traceInfo.timestamps)-1] sample.TimestampsUnixNano = traceInfo.timestamps - sample.Value = []int64{1} - - // Walk every frame of the trace. - for i := range traceInfo.frameTypes { - frameAttributes := addProfileAttributes(profile, []attrKeyValue{ - {key: "profile.frame.type", value: traceInfo.frameTypes[i].String()}, - }, attributeMap) - - loc := &profiles.Location{ - // Id - Optional element we do not use. - Address: uint64(traceInfo.linenos[i]), - // IsFolded - Optional element we do not use. - Attributes: frameAttributes, + switch origin { + case support.TraceOriginSampling: + sample.Value = []int64{1} + case support.TraceOriginOffCPU: + sample.Value = make([]int64, len(traceInfo.offTimes)) + for idx, offTime := range traceInfo.offTimes { + sample.Value[idx] = int64(offTime) } - - switch frameKind := traceInfo.frameTypes[i]; frameKind { - case libpf.NativeFrame: - // As native frames are resolved in the backend, we use Mapping to - // report these frames. - - var locationMappingIndex uint64 - if tmpMappingIndex, exists := fileIDtoMapping[traceInfo.files[i]]; exists { - locationMappingIndex = tmpMappingIndex - } else { - idx := uint64(len(fileIDtoMapping)) - fileIDtoMapping[traceInfo.files[i]] = idx - locationMappingIndex = idx - - execInfo, exists := r.executables.Get(traceInfo.files[i]) - - // Next step: Select a proper default value, - // if the name of the executable is not known yet. - var fileName = "UNKNOWN" - if exists { - fileName = execInfo.fileName - } - - mappingAttributes := addProfileAttributes(profile, []attrKeyValue{ - // Once SemConv and its Go package is released with the new - // semantic convention for build_id, replace these hard coded - // strings. - {key: "process.executable.build_id.gnu", value: execInfo.gnuBuildID}, - {key: "process.executable.build_id.profiling", - value: traceInfo.files[i].StringNoQuotes()}, - }, attributeMap) - - profile.Mapping = append(profile.Mapping, &profiles.Mapping{ - // Id - Optional element we do not use. - MemoryStart: uint64(traceInfo.mappingStarts[i]), - MemoryLimit: uint64(traceInfo.mappingEnds[i]), - FileOffset: traceInfo.mappingFileOffsets[i], - Filename: int64(getStringMapIndex(stringMap, fileName)), - Attributes: mappingAttributes, - // HasFunctions - Optional element we do not use. - // HasFilenames - Optional element we do not use. - // HasLineNumbers - Optional element we do not use. - // HasInlinedFrames - Optional element we do not use. - }) - } - loc.MappingIndex = locationMappingIndex - case libpf.AbortFrame: - // Next step: Figure out how the OTLP protocol - // could handle artificial frames, like AbortFrame, - // that are not originated from a native or interpreted - // program. - default: - // Store interpreted frame information as a Line message: - line := &profiles.Line{} - - fileIDInfoLock, exists := r.frames.Get(traceInfo.files[i]) - if !exists { - // At this point, we do not have enough information for the frame. - // Therefore, we report a dummy entry and use the interpreter as filename. - line.FunctionIndex = createFunctionEntry(funcMap, - "UNREPORTED", frameKind.String()) - } else { - fileIDInfo := fileIDInfoLock.RLock() - if si, exists := (*fileIDInfo)[traceInfo.linenos[i]]; exists { - line.Line = int64(si.lineNumber) - - line.FunctionIndex = createFunctionEntry(funcMap, - si.functionName, si.filePath) - } else { - // At this point, we do not have enough information for the frame. - // Therefore, we report a dummy entry and use the interpreter as filename. - // To differentiate this case from the case where no information about - // the file ID is available at all, we use a different name for reported - // function. - line.FunctionIndex = createFunctionEntry(funcMap, - "UNRESOLVED", frameKind.String()) - } - fileIDInfoLock.RUnlock(&fileIDInfo) - } - loc.Line = append(loc.Line, line) - - // To be compliant with the protocol, generate a dummy mapping entry. - loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping, stringMap, - profile, traceInfo.files[i]) - } - profile.Location = append(profile.Location, loc) } + r.populateTrace(profile, traceInfo, stringMap, funcMap, attributeMap) + sample.Attributes = addProfileAttributes(profile, []attrKeyValue{ {key: string(semconv.ContainerIDKey), value: traceKey.containerID}, {key: string(semconv.ThreadNameKey), value: traceKey.comm}, @@ -660,6 +584,153 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u } log.Debugf("Reporting OTLP profile with %d samples", len(profile.Sample)) + populateMetadata(profile, stringMap, funcMap) + + profile.DurationNanos = int64(endTS - startTS) + profile.TimeNanos = int64(startTS) + + return profile, startTS, endTS +} + +// getEventsFromOrigin returns all reported traces from a specified kind of profiling. +func (r *OTLPReporter) getEventsFromOrigin(kind int) samplesMap { + traceEvents := r.traceEvents.WLock() + samples := maps.Clone((*traceEvents)[kind]) + clear((*traceEvents)[kind]) + r.traceEvents.WUnlock(&traceEvents) + return samples +} + +// getHelperMaps returns temporary maps that support constructing a profile. +func getHelperMaps() (stringMap map[string]uint32, funcMap map[funcInfo]uint64, + attributeMap map[string]uint64) { + // stringMap is a temporary helper that will build the StringTable. + // By specification, the first element should be empty. + stringMap = make(map[string]uint32) + stringMap[""] = 0 + + // funcMap is a temporary helper that will build the Function array + // in profile and make sure information is deduplicated. + funcMap = make(map[funcInfo]uint64) + funcMap[funcInfo{name: "", fileName: ""}] = 0 + + // attributeMap is a temporary helper that maps attribute values to + // their respective indices. + // This is to ensure that AttributeTable does not contain duplicates. + attributeMap = make(map[string]uint64) + + return stringMap, funcMap, attributeMap +} + +func (r *OTLPReporter) populateTrace(profile *profiles.Profile, traceInfo *traceEvents, + stringMap map[string]uint32, funcMap map[funcInfo]uint64, attributeMap map[string]uint64) { + // Temporary lookup to reference existing Mappings. + fileIDtoMapping := make(map[libpf.FileID]uint64) + + // Walk every frame of the trace. + for i := range traceInfo.frameTypes { + frameAttributes := addProfileAttributes(profile, []attrKeyValue{ + {key: "profile.frame.type", value: traceInfo.frameTypes[i].String()}, + }, attributeMap) + + loc := &profiles.Location{ + // Id - Optional element we do not use. + Address: uint64(traceInfo.linenos[i]), + // IsFolded - Optional element we do not use. + Attributes: frameAttributes, + } + + switch frameKind := traceInfo.frameTypes[i]; frameKind { + case libpf.NativeFrame: + // As native frames are resolved in the backend, we use Mapping to + // report these frames. + + var locationMappingIndex uint64 + if tmpMappingIndex, exists := fileIDtoMapping[traceInfo.files[i]]; exists { + locationMappingIndex = tmpMappingIndex + } else { + idx := uint64(len(fileIDtoMapping)) + fileIDtoMapping[traceInfo.files[i]] = idx + locationMappingIndex = idx + + execInfo, exists := r.executables.Get(traceInfo.files[i]) + + // Next step: Select a proper default value, + // if the name of the executable is not known yet. + var fileName = "UNKNOWN" + if exists { + fileName = execInfo.fileName + } + + mappingAttributes := addProfileAttributes(profile, []attrKeyValue{ + // Once SemConv and its Go package is released with the new + // semantic convention for build_id, replace these hard coded + // strings. + {key: "process.executable.build_id.gnu", value: execInfo.gnuBuildID}, + {key: "process.executable.build_id.profiling", + value: traceInfo.files[i].StringNoQuotes()}, + }, attributeMap) + + profile.Mapping = append(profile.Mapping, &profiles.Mapping{ + // Id - Optional element we do not use. + MemoryStart: uint64(traceInfo.mappingStarts[i]), + MemoryLimit: uint64(traceInfo.mappingEnds[i]), + FileOffset: traceInfo.mappingFileOffsets[i], + Filename: int64(getStringMapIndex(stringMap, fileName)), + Attributes: mappingAttributes, + // HasFunctions - Optional element we do not use. + // HasFilenames - Optional element we do not use. + // HasLineNumbers - Optional element we do not use. + // HasInlinedFrames - Optional element we do not use. + }) + } + loc.MappingIndex = locationMappingIndex + case libpf.AbortFrame: + // Next step: Figure out how the OTLP protocol + // could handle artificial frames, like AbortFrame, + // that are not originated from a native or interpreted + // program. + default: + // Store interpreted frame information as a Line message: + line := &profiles.Line{} + + fileIDInfoLock, exists := r.frames.Get(traceInfo.files[i]) + if !exists { + // At this point, we do not have enough information for the frame. + // Therefore, we report a dummy entry and use the interpreter as filename. + line.FunctionIndex = createFunctionEntry(funcMap, + "UNREPORTED", frameKind.String()) + } else { + fileIDInfo := fileIDInfoLock.RLock() + if si, exists := (*fileIDInfo)[traceInfo.linenos[i]]; exists { + line.Line = int64(si.lineNumber) + + line.FunctionIndex = createFunctionEntry(funcMap, + si.functionName, si.filePath) + } else { + // At this point, we do not have enough information for the frame. + // Therefore, we report a dummy entry and use the interpreter as filename. + // To differentiate this case from the case where no information about + // the file ID is available at all, we use a different name for reported + // function. + line.FunctionIndex = createFunctionEntry(funcMap, + "UNRESOLVED", frameKind.String()) + } + fileIDInfoLock.RUnlock(&fileIDInfo) + } + loc.Line = append(loc.Line, line) + + // To be compliant with the protocol, generate a dummy mapping entry. + loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping, stringMap, + profile, traceInfo.files[i]) + } + profile.Location = append(profile.Location, loc) + } +} + +// populateMetadata adds the information from the temporary helper maps to the profile. +func populateMetadata(profile *profiles.Profile, stringMap map[string]uint32, + funcMap map[funcInfo]uint64) { // Populate the deduplicated functions into profile. funcTable := make([]*profiles.Function, len(funcMap)) for v, idx := range funcMap { @@ -685,11 +756,6 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u for i := int64(0); i < int64(len(profile.Location)); i++ { profile.LocationIndices[i] = i } - - profile.DurationNanos = int64(endTS - startTS) - profile.TimeNanos = int64(startTS) - - return profile, startTS, endTS } // getStringMapIndex inserts or looks up the index for value in stringMap. diff --git a/tracehandler/tracehandler.go b/tracehandler/tracehandler.go index c1a6e391..38eeeccf 100644 --- a/tracehandler/tracehandler.go +++ b/tracehandler/tracehandler.go @@ -126,6 +126,8 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { PID: bpfTrace.PID, TID: bpfTrace.TID, APMServiceName: "", // filled in below + Origin: bpfTrace.Origin, + OffTime: bpfTrace.OffTime, } if !m.reporter.SupportsReportTraceEvent() { diff --git a/tracer/tracer.go b/tracer/tracer.go index 9c8d317b..f7fd35fe 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -973,17 +973,21 @@ func (t *Tracer) loadBpfTrace(raw []byte) *host.Trace { APMTransactionID: *(*libpf.APMTransactionID)(unsafe.Pointer(&ptr.apm_transaction_id)), PID: libpf.PID(ptr.pid), TID: libpf.PID(ptr.tid), + Origin: int(ptr.origin), + OffTime: uint64(ptr.offtime), KTime: times.KTime(ptr.ktime), } // Trace fields included in the hash: // - PID, kernel stack ID, length & frame array // Intentionally excluded: - // - ktime, COMM, APM trace, APM transaction ID + // - ktime, COMM, APM trace, APM transaction ID, Origin and Off Time ptr.comm = [16]C.char{} ptr.apm_trace_id = C.ApmTraceID{} ptr.apm_transaction_id = C.ApmSpanID{} ptr.ktime = 0 + ptr.origin = 0 + ptr.offtime = 0 trace.Hash = host.TraceHash(xxh3.Hash128(raw).Lo) userFrameOffs := 0