Skip to content

Commit

Permalink
Merge pull request #36 from hengyoush/feature/compatible_set
Browse files Browse the repository at this point in the history
[Feature] Support tracking data on existing connections
  • Loading branch information
hengyoush authored Sep 13, 2024
2 parents b2038aa + 59037cc commit 6ca3200
Show file tree
Hide file tree
Showing 360 changed files with 392 additions and 143 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func validateAndRepairOptions(options AgentOptions) AgentOptions {
newOptions.ProcessorsNum = runtime.NumCPU()
}
if newOptions.MessageFilter == nil {
newOptions.MessageFilter = protocol.NoopFilter{}
newOptions.MessageFilter = protocol.BaseFilter{}
}
if newOptions.BPFVerifyLogSize <= 0 {
newOptions.BPFVerifyLogSize = 1 * 1024 * 1024
Expand Down
4 changes: 2 additions & 2 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,7 @@ func TestTcpV4DoRcv(t *testing.T) {
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTNIC_IN, p)
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTTCP_IN, p)
Expand Down Expand Up @@ -977,7 +977,7 @@ func TestSkbCopyDatagramIter(t *testing.T) {
bpf.AttachKProbeSecuritySocketSendmsgEntry,
bpf.AttachKProbeSecuritySocketRecvmsgEntry,
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTNIC_IN, p)
return ApplyKernelVersionFunctions(t, bpf.AgentStepTDEV_IN, p)
},
func(p interface{}) link.Link {
return ApplyKernelVersionFunctions(t, bpf.AgentStepTUSER_COPY, p)
Expand Down
26 changes: 13 additions & 13 deletions agent/analysis/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ type AnnotatedRecord struct {
endTs uint64
reqSize int
respSize int
totalDuration int
blackBoxDuration int
readFromSocketBufferDuration int
totalDuration float64
blackBoxDuration float64
readFromSocketBufferDuration float64
reqSyscallEventDetails []SyscallEventDetail
respSyscallEventDetails []SyscallEventDetail
reqNicEventDetails []NicEventDetail
Expand Down Expand Up @@ -78,17 +78,17 @@ func (r *AnnotatedRecord) String(options AnnotatedRecordToStringOptions) string
result += r.Record.String(options.RecordToStringOptions)
result += "\n"
if _, ok := options.MetricTypeSet[TotalDuration]; ok {
result += fmt.Sprintf("[total duration] = %d(%s)(start=%s, end=%s)\n", common.ConvertDurationToMillisecondsIfNeeded(int64(r.totalDuration), nano), timeUnitName(nano),
result += fmt.Sprintf("[total duration] = %.3f(%s)(start=%s, end=%s)\n", common.ConvertDurationToMillisecondsIfNeeded(float64(r.totalDuration), nano), timeUnitName(nano),
common.FormatTimestampWithPrecision(r.startTs, nano),
common.FormatTimestampWithPrecision(r.endTs, nano))
}
if _, ok := options.MetricTypeSet[ReadFromSocketBufferDuration]; ok {
result += fmt.Sprintf("[read from sockbuf]=%d(%s)\n", common.ConvertDurationToMillisecondsIfNeeded(int64(r.readFromSocketBufferDuration), nano),
result += fmt.Sprintf("[read from sockbuf]=%.3f(%s)\n", common.ConvertDurationToMillisecondsIfNeeded(float64(r.readFromSocketBufferDuration), nano),
timeUnitName(nano))
}
if _, ok := options.MetricTypeSet[BlackBoxDuration]; ok {
result += fmt.Sprintf("[%s]=%d(%s)\n", r.blackboxName(),
common.ConvertDurationToMillisecondsIfNeeded(int64(r.blackBoxDuration), nano),
result += fmt.Sprintf("[%s]=%.3f(%s)\n", r.blackboxName(),
common.ConvertDurationToMillisecondsIfNeeded(float64(r.blackBoxDuration), nano),
timeUnitName(nano))
}
if _, ok := options.MetricTypeSet[RequestSize]; ok {
Expand Down Expand Up @@ -198,13 +198,13 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.reqSize = ingressMessage.ByteSize()
annotatedRecord.respSize = egressMessage.ByteSize()
if hasNicInEvents && hasDevOutEvents {
annotatedRecord.totalDuration = int(annotatedRecord.endTs) - int(annotatedRecord.startTs)
annotatedRecord.totalDuration = float64(annotatedRecord.endTs) - float64(annotatedRecord.startTs)
}
if hasReadSyscallEvents && hasWriteSyscallEvents {
annotatedRecord.blackBoxDuration = int(writeSyscallEvents[len(writeSyscallEvents)-1].GetTimestamp()) - int(readSyscallEvents[0].GetTimestamp())
annotatedRecord.blackBoxDuration = float64(writeSyscallEvents[len(writeSyscallEvents)-1].GetTimestamp()) - float64(readSyscallEvents[0].GetTimestamp())
}
if hasUserCopyEvents && hasTcpInEvents {
annotatedRecord.readFromSocketBufferDuration = int(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - int(tcpInEvents[0].GetTimestamp())
annotatedRecord.readFromSocketBufferDuration = float64(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - float64(tcpInEvents[0].GetTimestamp())
}
annotatedRecord.reqSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](readSyscallEvents)
annotatedRecord.respSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](writeSyscallEvents)
Expand All @@ -220,13 +220,13 @@ func (s *StatRecorder) ReceiveRecord(r protocol.Record, connection *conn.Connect
annotatedRecord.reqSize = egressMessage.ByteSize()
annotatedRecord.respSize = ingressMessage.ByteSize()
if hasReadSyscallEvents && hasWriteSyscallEvents {
annotatedRecord.totalDuration = int(annotatedRecord.endTs) - int(annotatedRecord.startTs)
annotatedRecord.totalDuration = float64(annotatedRecord.endTs) - float64(annotatedRecord.startTs)
}
if hasNicInEvents && hasDevOutEvents {
annotatedRecord.blackBoxDuration = int(nicIngressEvents[len(nicIngressEvents)-1].GetTimestamp()) - int(devOutSyscallEvents[0].GetTimestamp())
annotatedRecord.blackBoxDuration = float64(nicIngressEvents[len(nicIngressEvents)-1].GetTimestamp()) - float64(devOutSyscallEvents[0].GetTimestamp())
}
if hasUserCopyEvents && hasTcpInEvents {
annotatedRecord.readFromSocketBufferDuration = int(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - int(tcpInEvents[0].GetTimestamp())
annotatedRecord.readFromSocketBufferDuration = float64(userCopyEvents[len(userCopyEvents)-1].GetTimestamp()) - float64(tcpInEvents[0].GetTimestamp())
}
annotatedRecord.reqSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](writeSyscallEvents)
annotatedRecord.respSyscallEventDetails = KernEventsToEventDetails[SyscallEventDetail](readSyscallEvents)
Expand Down
1 change: 1 addition & 0 deletions agent/buffer/stream_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (sb *StreamBuffer) IsEmpty() bool {
}
func (sb *StreamBuffer) Clear() {
sb.buffers = sb.buffers[:]
sb.timestamps.Clear()
}
func (sb *StreamBuffer) RemovePrefix(length int) {
if sb.IsEmpty() {
Expand Down
112 changes: 89 additions & 23 deletions agent/conn/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/cilium/ebpf"
)

// var RecordFunc func(protocol.Record, *Connection4) error
Expand Down Expand Up @@ -233,11 +235,11 @@ func (c *Connection4) extractSockKeys() (bpf.AgentSockKey, bpf.AgentSockKey) {
key.Family = uint32(common.AF_INET) // TODO @ipv6

var revKey bpf.AgentSockKey
key.Sip = common.BytesToInt[uint32](c.RemoteIp)
key.Dip = common.BytesToInt[uint32](c.LocalIp)
key.Sport = uint32(c.RemotePort)
key.Dport = uint32(c.LocalPort)
key.Family = uint32(common.AF_INET)
revKey.Sip = common.BytesToInt[uint32](c.RemoteIp)
revKey.Dip = common.BytesToInt[uint32](c.LocalIp)
revKey.Sport = uint32(c.RemotePort)
revKey.Dport = uint32(c.LocalPort)
revKey.Family = uint32(common.AF_INET)
return key, revKey
}

Expand All @@ -249,36 +251,76 @@ func (c *Connection4) OnClose(needClearBpfMap bool) {
err := connInfoMap.Delete(c.TgidFd)
if err != nil {
log.Debugf("clean conn_info_map failed: %v", err)
} else {
log.Debugf("clean conn_info_map deleted")
}
key, revKey := c.extractSockKeys()
sockKeyConnIdMap := bpf.GetMap("SockKeyConnIdMap")
err = sockKeyConnIdMap.Delete(&key)
err = sockKeyConnIdMap.Delete(key)
if err != nil {
log.Debugf("clean sock_key_conn_id_map key failed: %v", err)
} else {
log.Debugf("clean sockKeyConnIdMap deleted key")
}
err = sockKeyConnIdMap.Delete(&revKey)
err = sockKeyConnIdMap.Delete(revKey)
if err != nil {
log.Debugf("clean sock_key_conn_id_map revkey failed: %v", err)
} else {
log.Debugf("clean sockKeyConnIdMap deleted revkey")
}
sockXmitMap := bpf.GetMap("SockXmitMap")
n, err := sockXmitMap.BatchDelete([]bpf.AgentSockKey{key, revKey}, nil)
err = sockXmitMap.Delete(key)
if err == nil {
log.Debugf("clean sockXmitMap deleted: %v", n)
log.Debugf("clean sockXmitMap deleted key")
} else {
log.Debugf("clean sockXmitMap failed: %v", err)
}
err = sockXmitMap.Delete(revKey)
if err == nil {
log.Debugf("clean sockXmitMap deleted revkey")
} else {
log.Debugf("clean sockXmitMap failed: %v", err)
}
}
monitor.UnregisterMetricExporter(c.StreamEvents)
}

func (c *Connection4) OnCloseWithoutClearBpfMap() {
c.OnClose(false)
func (c *Connection4) UpdateConnectionTraceable(traceable bool) {
key, revKey := c.extractSockKeys()
sockKeyConnIdMap := bpf.GetMap("SockKeyConnIdMap")
c.doUpdateConnIdMapProtocolToUnknwon(key, sockKeyConnIdMap, traceable)
c.doUpdateConnIdMapProtocolToUnknwon(revKey, sockKeyConnIdMap, traceable)

connInfoMap := bpf.GetMap("ConnInfoMap")
connInfo := bpf.AgentConnInfoT{}
err := connInfoMap.Lookup(c.TgidFd, &connInfo)
if err == nil {
connInfo.NoTrace = !traceable
connInfoMap.Update(c.TgidFd, &connInfo, ebpf.UpdateExist)
} else {
log.Debugf("try to update %s conn_info_map to no_trace, but no entry in map found!", c.ToString())
}
}

func (c *Connection4) doUpdateConnIdMapProtocolToUnknwon(key bpf.AgentSockKey, m *ebpf.Map, traceable bool) {
var connIds bpf.AgentConnIdS_t
err := m.Lookup(&key, &connIds)
if err == nil {
connIds.NoTrace = !traceable
m.Update(&key, &connIds, ebpf.UpdateExist)
} else {
log.Debugf("try to update %s conn_id_map to no_trace, but no entry in map found! key: %v", c.ToString(), key)
}
}

// func (c *Connection4) OnCloseWithoutClearBpfMap() {
// c.OnClose(false)
// }
func (c *Connection4) OnKernEvent(event *bpf.AgentKernEvt) bool {
isReq := isReq(c, event)
isReq, ok := isReq(c, event)
if event.Len > 0 {
c.StreamEvents.AddKernEvent(event)
} else {
} else if ok {
if (event.Flags&uint8(common.TCP_FLAGS_SYN) != 0) && !isReq && event.Step == bpf.AgentStepTIP_IN {
// 接收到Server给的Syn包
if c.ServerSynReceived {
Expand All @@ -298,16 +340,15 @@ func (c *Connection4) OnKernEvent(event *bpf.AgentKernEvt) bool {
return true
}
func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData) {
isReq := isReq(c, &event.SyscallEvent.Ke)
isReq, _ := isReq(c, &event.SyscallEvent.Ke)
if isReq {

c.reqStreamBuffer.Add(event.SyscallEvent.Ke.Seq, data, event.SyscallEvent.Ke.Ts)
} else {
c.respStreamBuffer.Add(event.SyscallEvent.Ke.Seq, data, event.SyscallEvent.Ke.Ts)
}

c.parseStreamBuffer(c.reqStreamBuffer, protocol.Request, &c.ReqQueue)
c.parseStreamBuffer(c.respStreamBuffer, protocol.Response, &c.RespQueue)
c.parseStreamBuffer(c.reqStreamBuffer, protocol.Request, &c.ReqQueue, event.SyscallEvent.Ke.Step)
c.parseStreamBuffer(c.respStreamBuffer, protocol.Response, &c.RespQueue, event.SyscallEvent.Ke.Step)
c.StreamEvents.AddSyscallEvent(event)

parser := c.GetProtocolParser(c.Protocol)
Expand All @@ -321,7 +362,7 @@ func (c *Connection4) OnSyscallEvent(data []byte, event *bpf.SyscallEventData) {
}
}

func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, resultQueue *[]protocol.ParsedMessage) {
func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, resultQueue *[]protocol.ParsedMessage, step bpf.AgentStepT) {
parser := c.GetProtocolParser(c.Protocol)
if parser == nil {
streamBuffer.Clear()
Expand All @@ -341,8 +382,19 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa
parseResult := parser.ParseStream(streamBuffer, messageType)
switch parseResult.ParseState {
case protocol.Success:
*resultQueue = append(*resultQueue, parseResult.ParsedMessages...)
streamBuffer.RemovePrefix(parseResult.ReadBytes)
if c.Role == bpf.AgentEndpointRoleTKRoleUnknown && len(parseResult.ParsedMessages) > 0 {
parsedMessage := parseResult.ParsedMessages[0]
if (step == bpf.AgentStepTSYSCALL_IN && parsedMessage.IsReq()) || (step == bpf.AgentStepTSYSCALL_OUT && !parsedMessage.IsReq()) {
c.Role = bpf.AgentEndpointRoleTKRoleServer
} else {
c.Role = bpf.AgentEndpointRoleTKRoleClient
}
log.Debugf("Update %s role", c.ToString())
c.resetParseProgress()
} else {
*resultQueue = append(*resultQueue, parseResult.ParsedMessages...)
streamBuffer.RemovePrefix(parseResult.ReadBytes)
}
case protocol.Invalid:
pos := parser.FindBoundary(streamBuffer, messageType, 1)
if pos != -1 {
Expand All @@ -363,14 +415,17 @@ func (c *Connection4) parseStreamBuffer(streamBuffer *buffer.StreamBuffer, messa

}

func isReq(conn *Connection4, event *bpf.AgentKernEvt) bool {
func isReq(conn *Connection4, event *bpf.AgentKernEvt) (bool, bool) {
if conn.Role == bpf.AgentEndpointRoleTKRoleUnknown {
return false, false
}
var isReq bool
if !conn.IsServerSide() {
isReq = event.ConnIdS.Direct == bpf.AgentTrafficDirectionTKEgress
} else {
isReq = event.ConnIdS.Direct == bpf.AgentTrafficDirectionTKIngress
}
return isReq
return isReq, true
}

func (c *Connection4) IsServerSide() bool {
Expand Down Expand Up @@ -399,8 +454,12 @@ func (c *Connection4) Identity() string {
}
func (c *Connection4) ToString() string {
direct := "=>"
if c.Role != bpf.AgentEndpointRoleTKRoleClient {
if c.Role == bpf.AgentEndpointRoleTKRoleServer {
direct = "<="
} else if c.Role == bpf.AgentEndpointRoleTKRoleClient {
direct = "=>"
} else {
direct = "<unknown>"
}
return fmt.Sprintf("[tgid=%d fd=%d][protocol=%d][%s] *%s:%d %s %s:%d", c.TgidFd>>32, uint32(c.TgidFd), c.Protocol, c.StatusString(), c.LocalIp.String(), c.LocalPort, direct, c.RemoteIp.String(), c.RemotePort)
}
Expand All @@ -422,3 +481,10 @@ func (c *Connection4) GetProtocolParser(p bpf.AgentTrafficProtocolT) protocol.Pr
return parser
}
}

func (c *Connection4) resetParseProgress() {
c.reqStreamBuffer.Clear()
c.respStreamBuffer.Clear()
c.ReqQueue = c.ReqQueue[:]
c.RespQueue = c.RespQueue[:]
}
18 changes: 14 additions & 4 deletions agent/conn/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ func (p *Processor) run() {
}
conn.StreamEvents = NewKernEventStream(conn, 300)
if p.side != common.AllSide && p.side != conn.Side() {
conn.OnClose(true)
// conn.OnClose(true)
conn.UpdateConnectionTraceable(false)
continue
}
conn.ConnectStartTs = event.Ts + common.LaunchEpochTime
Expand All @@ -145,30 +146,39 @@ func (p *Processor) run() {
}
go func(c *Connection4) {
time.Sleep(1 * time.Second)
c.OnCloseWithoutClearBpfMap()
c.OnClose(true)
}(conn)
} else if event.ConnType == bpf.AgentConnTypeTKProtocolInfer {
// 协议推断
conn = p.connManager.FindConnection4Or(TgidFd, event.Ts+common.LaunchEpochTime)
// previousProtocol := conn.Protocol
if conn != nil && conn.Status != Closed {
conn.Protocol = event.ConnInfo.Protocol
} else {
continue
}
isProtocolInterested := conn.Protocol == bpf.AgentTrafficProtocolTKProtocolUnknown ||

if conn.Role == bpf.AgentEndpointRoleTKRoleUnknown && event.ConnInfo.Role != bpf.AgentEndpointRoleTKRoleUnknown {
conn.Role = event.ConnInfo.Role
}

isProtocolInterested := conn.Protocol == bpf.AgentTrafficProtocolTKProtocolUnset ||
conn.MessageFilter.FilterByProtocol(conn.Protocol)

if isProtocolInterested {
if conn.Protocol != bpf.AgentTrafficProtocolTKProtocolUnknown {
for _, sysEvent := range conn.TempSyscallEvents {
log.Debugf("%s process temp syscall events before infer\n", conn.ToString())
conn.OnSyscallEvent(sysEvent.Buf, sysEvent)
}
conn.UpdateConnectionTraceable(true)
}
conn.TempKernEvents = conn.TempKernEvents[0:0]
conn.TempConnEvents = conn.TempConnEvents[0:0]
} else {
log.Debugf("%s discarded due to not interested", conn.ToString())
conn.OnClose(true)
conn.UpdateConnectionTraceable(false)
// conn.OnClose(true)
}
}
eventType := "connect"
Expand Down
21 changes: 21 additions & 0 deletions agent/protocol/generic_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,27 @@ func (NoopFilter) Filter(ParsedMessage, ParsedMessage) bool {
return true
}

var _ ProtocolFilter = BaseFilter{}

type BaseFilter struct {
}

func (n BaseFilter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool {
return p != bpf.AgentTrafficProtocolTKProtocolUnknown
}

func (n BaseFilter) FilterByRequest() bool {
return false
}

func (n BaseFilter) FilterByResponse() bool {
return false
}

func (BaseFilter) Filter(ParsedMessage, ParsedMessage) bool {
return true
}

func IsNoopFilter(filter ProtocolFilter) bool {
_, ok := filter.(NoopFilter)
return ok
Expand Down
1 change: 1 addition & 0 deletions agent/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type MessageType int
const (
Request MessageType = iota
Response
Unknown
)

const (
Expand Down
Loading

0 comments on commit 6ca3200

Please sign in to comment.