@@ -17,166 +17,20 @@ limitations under the License.
17
17
package main
18
18
19
19
import (
20
- "errors"
21
- "fmt"
22
- "os"
23
- "sort"
24
- "strconv"
25
- "strings"
26
- "sync"
27
20
"time"
28
21
29
22
"github.com/NVIDIA/go-nvml/pkg/nvml"
30
- corev1 "k8s.io/api/core/v1"
31
23
"k8s.io/klog/v2"
32
24
)
33
25
34
- var cgroupDriver int
35
-
36
- type hostGPUPid struct {
37
- hostGPUPid int
38
- mtime uint64
39
- }
40
-
41
26
type UtilizationPerDevice []int
42
27
43
- var mutex sync.Mutex
44
28
var srPodList map [string ]podusage
45
29
46
30
func init () {
47
31
srPodList = make (map [string ]podusage )
48
32
}
49
33
50
- func setcGgroupDriver () int {
51
- // 1 for cgroupfs 2 for systemd
52
- kubeletconfig , err := os .ReadFile ("/hostvar/lib/kubelet/config.yaml" )
53
- if err != nil {
54
- return 0
55
- }
56
- content := string (kubeletconfig )
57
- pos := strings .LastIndex (content , "cgroupDriver:" )
58
- if pos < 0 {
59
- return 0
60
- }
61
- if strings .Contains (content , "systemd" ) {
62
- return 2
63
- }
64
- if strings .Contains (content , "cgroupfs" ) {
65
- return 1
66
- }
67
- return 0
68
- }
69
-
70
- func getUsedGPUPid () ([]uint , nvml.Return ) {
71
- tmp := []nvml.ProcessInfo {}
72
- count , err := nvml .DeviceGetCount ()
73
- if err != nvml .SUCCESS {
74
- return []uint {}, err
75
- }
76
- for i := 0 ; i < count ; i ++ {
77
- device , err := nvml .DeviceGetHandleByIndex (i )
78
- if err != nvml .SUCCESS {
79
- return []uint {}, err
80
- }
81
- ids , err := device .GetComputeRunningProcesses ()
82
- if err != nvml .SUCCESS {
83
- return []uint {}, err
84
- }
85
- tmp = append (tmp , ids ... )
86
- }
87
- result := make ([]uint , 0 )
88
- m := make (map [uint ]bool )
89
- for _ , v := range tmp {
90
- if _ , ok := m [uint (v .Pid )]; ! ok {
91
- result = append (result , uint (v .Pid ))
92
- m [uint (v .Pid )] = true
93
- }
94
- }
95
- sort .Slice (tmp , func (i , j int ) bool { return tmp [i ].Pid > tmp [j ].Pid })
96
- return result , nvml .SUCCESS
97
- }
98
-
99
- func setHostPid (pod corev1.Pod , ctr corev1.ContainerStatus , sr * podusage ) error {
100
- var pids []string
101
- mutex .Lock ()
102
- defer mutex .Unlock ()
103
-
104
- if cgroupDriver == 0 {
105
- cgroupDriver = setcGgroupDriver ()
106
- }
107
- if cgroupDriver == 0 {
108
- return errors .New ("can not identify cgroup driver" )
109
- }
110
- usedGPUArray , err := getUsedGPUPid ()
111
- if err != nvml .SUCCESS {
112
- return errors .New ("get usedGPUID failed, ret:" + nvml .ErrorString (err ))
113
- }
114
- if len (usedGPUArray ) == 0 {
115
- return nil
116
- }
117
- qos := strings .ToLower (string (pod .Status .QOSClass ))
118
- var filename string
119
- if cgroupDriver == 1 {
120
- /* Cgroupfs */
121
- filename = fmt .Sprintf ("/sysinfo/fs/cgroup/memory/kubepods/%s/pod%s/%s/tasks" , qos , pod .UID , strings .TrimPrefix (ctr .ContainerID , "docker://" ))
122
- }
123
- if cgroupDriver == 2 {
124
- /* Systemd */
125
- cgroupuid := strings .ReplaceAll (string (pod .UID ), "-" , "_" )
126
- filename = fmt .Sprintf ("/sysinfo/fs/cgroup/systemd/kubepods.slice/kubepods-%s.slice/kubepods-%s-pod%s.slice/docker-%s.scope/tasks" , qos , qos , cgroupuid , strings .TrimPrefix (ctr .ContainerID , "docker://" ))
127
- }
128
- fmt .Println ("filename=" , filename )
129
- content , ferr := os .ReadFile (filename )
130
- if ferr != nil {
131
- return ferr
132
- }
133
- pids = strings .Split (string (content ), "\n " )
134
- hostPidArray := []hostGPUPid {}
135
- for _ , val := range pids {
136
- tmp , _ := strconv .Atoi (val )
137
- if tmp != 0 {
138
- var stat os.FileInfo
139
- var err error
140
- if stat , err = os .Lstat (fmt .Sprintf ("/proc/%v" , tmp )); err != nil {
141
- return err
142
- }
143
- mtime := stat .ModTime ().Unix ()
144
- hostPidArray = append (hostPidArray , hostGPUPid {
145
- hostGPUPid : tmp ,
146
- mtime : uint64 (mtime ),
147
- })
148
- }
149
- }
150
- usedGPUHostArray := []hostGPUPid {}
151
- for _ , val := range usedGPUArray {
152
- for _ , hostpid := range hostPidArray {
153
- if uint (hostpid .hostGPUPid ) == val {
154
- usedGPUHostArray = append (usedGPUHostArray , hostpid )
155
- }
156
- }
157
- }
158
- //fmt.Println("usedHostGPUArray=", usedGPUHostArray)
159
- sort .Slice (usedGPUHostArray , func (i , j int ) bool { return usedGPUHostArray [i ].mtime > usedGPUHostArray [j ].mtime })
160
- if sr == nil || sr .sr == nil {
161
- return nil
162
- }
163
- for idx , val := range sr .sr .procs {
164
- //fmt.Println("pid=", val.pid)
165
- if val .pid == 0 {
166
- break
167
- }
168
- if idx < len (usedGPUHostArray ) {
169
- if val .hostpid == 0 || val .hostpid != int32 (usedGPUHostArray [idx ].hostGPUPid ) {
170
- fmt .Println ("Assign host pid to pid instead" , usedGPUHostArray [idx ].hostGPUPid , val .pid , val .hostpid )
171
- sr .sr .procs [idx ].hostpid = int32 (usedGPUHostArray [idx ].hostGPUPid )
172
- fmt .Println ("val=" , val .hostpid , sr .sr .procs [idx ].hostpid )
173
- }
174
- }
175
- }
176
- return nil
177
-
178
- }
179
-
180
34
func CheckBlocking (utSwitchOn map [string ]UtilizationPerDevice , p int , pu podusage ) bool {
181
35
for _ , devuuid := range pu .sr .uuids {
182
36
_ , ok := utSwitchOn [string (devuuid .uuid [:])]
0 commit comments