Skip to content

Commit

Permalink
Merge pull request #404 from kubescape/bil
Browse files Browse the repository at this point in the history
improve memory management
  • Loading branch information
amitschendel authored Nov 10, 2024
2 parents 0b0d68f + 5267186 commit 291f4c9
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 118 deletions.
133 changes: 58 additions & 75 deletions pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"istio.io/pkg/cache"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const OpenDynamicThreshold = 50

var procRegex = regexp.MustCompile(`^/proc/\d+`)

type ApplicationProfileManager struct {
Expand All @@ -48,11 +47,11 @@ type ApplicationProfileManager struct {
containerMutexes storageUtils.MapMutex[string] // key is k8sContainerID
trackedContainers mapset.Set[string] // key is k8sContainerID
removedContainers mapset.Set[string] // key is k8sContainerID
droppedEvents maps.SafeMap[string, bool] // key is k8sContainerID
savedCapabilities maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
savedEndpoints maps.SafeMap[string, *maps.SafeMap[string, *v1beta1.HTTPEndpoint]] // key is k8sContainerID
savedExecs maps.SafeMap[string, *maps.SafeMap[string, []string]] // key is k8sContainerID
savedOpens maps.SafeMap[string, *maps.SafeMap[string, mapset.Set[string]]] // key is k8sContainerID
droppedEventsContainers mapset.Set[string] // key is k8sContainerID
savedCapabilities maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedEndpoints maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedExecs maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedOpens maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedSyscalls maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
toSaveCapabilities maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
toSaveEndpoints maps.SafeMap[string, *maps.SafeMap[string, *v1beta1.HTTPEndpoint]] // key is k8sContainerID
Expand All @@ -71,17 +70,18 @@ var _ applicationprofilemanager.ApplicationProfileManagerClient = (*ApplicationP

func CreateApplicationProfileManager(ctx context.Context, cfg config.Config, clusterName string, k8sClient k8sclient.K8sClientInterface, storageClient storage.StorageClient, preRunningContainerIDs mapset.Set[string], k8sObjectCache objectcache.K8sObjectCache, seccompManager seccompmanager.SeccompManagerClient) (*ApplicationProfileManager, error) {
return &ApplicationProfileManager{
cfg: cfg,
clusterName: clusterName,
ctx: ctx,
k8sClient: k8sClient,
k8sObjectCache: k8sObjectCache,
storageClient: storageClient,
containerMutexes: storageUtils.NewMapMutex[string](),
trackedContainers: mapset.NewSet[string](),
removedContainers: mapset.NewSet[string](),
preRunningContainerIDs: preRunningContainerIDs,
seccompManager: seccompManager,
cfg: cfg,
clusterName: clusterName,
ctx: ctx,
k8sClient: k8sClient,
k8sObjectCache: k8sObjectCache,
storageClient: storageClient,
containerMutexes: storageUtils.NewMapMutex[string](),
trackedContainers: mapset.NewSet[string](),
removedContainers: mapset.NewSet[string](),
droppedEventsContainers: mapset.NewSet[string](),
preRunningContainerIDs: preRunningContainerIDs,
seccompManager: seccompManager,
}, nil
}

Expand Down Expand Up @@ -140,16 +140,19 @@ func (am *ApplicationProfileManager) deleteResources(watchedContainer *utils.Wat
// delete resources
watchedContainer.UpdateDataTicker.Stop()
am.trackedContainers.Remove(watchedContainer.K8sContainerID)
am.droppedEventsContainers.Remove(watchedContainer.K8sContainerID)
am.savedCapabilities.Delete(watchedContainer.K8sContainerID)
am.savedEndpoints.Delete(watchedContainer.K8sContainerID)
am.savedExecs.Delete(watchedContainer.K8sContainerID)
am.droppedEvents.Delete(watchedContainer.K8sContainerID)
am.savedOpens.Delete(watchedContainer.K8sContainerID)
am.savedSyscalls.Delete(watchedContainer.K8sContainerID)
am.toSaveCapabilities.Delete(watchedContainer.K8sContainerID)
am.toSaveEndpoints.Delete(watchedContainer.K8sContainerID)
am.toSaveExecs.Delete(watchedContainer.K8sContainerID)
am.toSaveOpens.Delete(watchedContainer.K8sContainerID)
am.watchedContainerChannels.Delete(watchedContainer.ContainerID)
}

func (am *ApplicationProfileManager) ContainerReachedMaxTime(containerID string) {
if channel := am.watchedContainerChannels.Get(containerID); channel != nil {
channel <- utils.ContainerReachedMaxTime
Expand Down Expand Up @@ -243,7 +246,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
// sleep for container index second to desynchronize the profiles saving
time.Sleep(time.Duration(watchedContainer.ContainerIndex) * time.Second)

if droppedEvents := am.droppedEvents.Get(watchedContainer.K8sContainerID); droppedEvents {
if am.droppedEventsContainers.ContainsOne(watchedContainer.K8sContainerID) {
watchedContainer.SetStatus(utils.WatchedContainerStatusMissingRuntime)
}

Expand Down Expand Up @@ -529,50 +532,32 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
// record saved syscalls
am.savedSyscalls.Get(watchedContainer.K8sContainerID).Append(toSaveSyscalls...)
// record saved capabilities
am.savedCapabilities.Get(watchedContainer.K8sContainerID).Append(capabilities...)
savedCapabilities := am.savedCapabilities.Get(watchedContainer.K8sContainerID)
for _, capability := range capabilities {
savedCapabilities.Set(capability, nil)
}
// record saved endpoints
savedEndpoints := am.savedEndpoints.Get(watchedContainer.K8sContainerID)
toSaveEndpoints.Range(func(path string, endpoint *v1beta1.HTTPEndpoint) bool {
if !am.savedEndpoints.Get(watchedContainer.K8sContainerID).Has(path) {
am.savedEndpoints.Get(watchedContainer.K8sContainerID).Set(path, endpoint)
}
savedEndpoints.Set(path, endpoint)
return true
})
// record saved execs
savedExecs := am.savedExecs.Get(watchedContainer.K8sContainerID)
toSaveExecs.Range(func(uniqueExecIdentifier string, v []string) bool {
if !am.savedExecs.Get(watchedContainer.K8sContainerID).Has(uniqueExecIdentifier) {
am.savedExecs.Get(watchedContainer.K8sContainerID).Set(uniqueExecIdentifier, v)
}
savedExecs.Set(uniqueExecIdentifier, v)
return true
})
// record saved opens
savedOpens := am.savedOpens.Get(watchedContainer.K8sContainerID)
toSaveOpens.Range(utils.SetInMap(savedOpens))
// use a dynamic path detector to compress opens
analyzer := dynamicpathdetector.NewPathAnalyzer(OpenDynamicThreshold)
keys := savedOpens.Keys()
// first pass to learn the opens
for _, path := range keys {
_, _ = dynamicpathdetector.AnalyzeOpen(path, analyzer)
}
// second pass to compress the opens
for _, path := range keys {
result, err := dynamicpathdetector.AnalyzeOpen(path, analyzer)
if err != nil {
continue
}
if result != path {
// path becomes compressed
// we avoid a lock by using Pop to remove path and retrieve its flags
pathFlags := savedOpens.Pop(path)
if savedOpens.Has(result) {
// merge flags
savedOpens.Get(result).Append(pathFlags.ToSlice()...)
} else {
// create new entry
savedOpens.Set(result, pathFlags)
}
toSaveOpens.Range(func(path string, newOpens mapset.Set[string]) bool {
if oldOpens, ok := savedOpens.Get(path); ok {
oldOpens.(mapset.Set[string]).Append(newOpens.ToSlice()...)
} else {
savedOpens.Set(path, newOpens)
}
}
return true
})
logger.L().Debug("ApplicationProfileManager - saved application profile",
helpers.Int("capabilities", len(capabilities)),
helpers.Int("endpoints", toSaveEndpoints.Len()),
Expand Down Expand Up @@ -648,11 +633,10 @@ func (am *ApplicationProfileManager) ContainerCallback(notif containercollection
if am.watchedContainerChannels.Has(notif.Container.Runtime.ContainerID) {
return
}
am.savedCapabilities.Set(k8sContainerID, mapset.NewSet[string]())
am.droppedEvents.Set(k8sContainerID, false)
am.savedEndpoints.Set(k8sContainerID, new(maps.SafeMap[string, *v1beta1.HTTPEndpoint]))
am.savedExecs.Set(k8sContainerID, new(maps.SafeMap[string, []string]))
am.savedOpens.Set(k8sContainerID, new(maps.SafeMap[string, mapset.Set[string]]))
am.savedCapabilities.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedEndpoints.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedExecs.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedOpens.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedSyscalls.Set(k8sContainerID, mapset.NewSet[string]())
am.toSaveCapabilities.Set(k8sContainerID, mapset.NewSet[string]())
am.toSaveEndpoints.Set(k8sContainerID, new(maps.SafeMap[string, *v1beta1.HTTPEndpoint]))
Expand All @@ -678,9 +662,11 @@ func (am *ApplicationProfileManager) ReportCapability(k8sContainerID, capability
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}
if am.savedCapabilities.Get(k8sContainerID).ContainsOne(capability) {
// check if we already have this capability
if _, ok := am.savedCapabilities.Get(k8sContainerID).Get(capability); ok {
return
}
// add to capability map
am.toSaveCapabilities.Get(k8sContainerID).Add(capability)
}

Expand All @@ -694,15 +680,12 @@ func (am *ApplicationProfileManager) ReportFileExec(k8sContainerID, path string,
}
// check if we already have this exec
// we use a SHA256 hash of the exec to identify it uniquely (path + args, in the order they were provided)
savedExecs := am.savedExecs.Get(k8sContainerID)
execIdentifier := utils.CalculateSHA256FileExecHash(path, args)
if savedExecs.Has(execIdentifier) {
if _, ok := am.savedExecs.Get(k8sContainerID).Get(execIdentifier); ok {
return
}

// add to exec map, first element is the path, the rest are the args
execMap := am.toSaveExecs.Get(k8sContainerID)
execMap.Set(execIdentifier, append([]string{path}, args...))
am.toSaveExecs.Get(k8sContainerID).Set(execIdentifier, append([]string{path}, args...))
}

func (am *ApplicationProfileManager) ReportFileOpen(k8sContainerID, path string, flags []string) {
Expand All @@ -715,8 +698,7 @@ func (am *ApplicationProfileManager) ReportFileOpen(k8sContainerID, path string,
path = procRegex.ReplaceAllString(path, "/proc/"+dynamicpathdetector.DynamicIdentifier)
}
// check if we already have this open
savedOpens := am.savedOpens.Get(k8sContainerID)
if savedOpens.Has(path) && savedOpens.Get(path).Contains(flags...) {
if opens, ok := am.savedOpens.Get(k8sContainerID).Get(path); ok && opens.(mapset.Set[string]).Contains(flags...) {
return
}
// add to open map
Expand All @@ -729,28 +711,29 @@ func (am *ApplicationProfileManager) ReportFileOpen(k8sContainerID, path string,
}

func (am *ApplicationProfileManager) ReportDroppedEvent(k8sContainerID string) {
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}
am.droppedEvents.Set(k8sContainerID, true)
am.droppedEventsContainers.Add(k8sContainerID)
}

func (am *ApplicationProfileManager) ReportHTTPEvent(k8sContainerID string, event *tracerhttptype.Event) {
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}

// get endpoint from event
endpointIdentifier, err := am.GetEndpointIdentifier(event)
if err != nil {
logger.L().Ctx(am.ctx).Warning("ApplicationProfileManager - failed to get endpoint identifier", helpers.Error(err))
return
}

endpoint, err := GetNewEndpoint(event, endpointIdentifier)
if err != nil {
logger.L().Ctx(am.ctx).Warning("ApplicationProfileManager - failed to get new endpoint", helpers.Error(err))
return
}

endpointMap := am.toSaveEndpoints.Get(k8sContainerID)
// check if we already have this endpoint
endpointHash := CalculateHTTPEndpointHash(endpoint)
endpointMap.Set(endpointHash, endpoint)
if _, ok := am.savedEndpoints.Get(k8sContainerID).Get(endpointHash); ok {
return
}
// add to endpoint map
am.toSaveEndpoints.Get(k8sContainerID).Set(endpointHash, endpoint)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestApplicationProfileManager(t *testing.T) {
cfg := config.Config{
InitialDelay: 1 * time.Second,
MaxSniffingTime: 5 * time.Minute,
UpdateDataPeriod: 1 * time.Second,
UpdateDataPeriod: 5 * time.Second,
}
ctx := context.TODO()
k8sClient := &k8sclient.K8sClientMock{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ func (ch *IGContainerWatcher) unregisterContainer(container *containercollection
}

func (ch *IGContainerWatcher) ignoreContainer(namespace, name string) bool {
// do not trace the node-agent pod
if name == ch.podName && namespace == ch.namespace {
// do not trace any of our pods
if namespace == ch.namespace {
return true
}
// do not trace the node-agent pods if MULTIPLY is set
Expand Down
Loading

0 comments on commit 291f4c9

Please sign in to comment.