-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathmonitor.go
119 lines (100 loc) · 3.35 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"fmt"
"path/filepath"
"time"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
cacheddiscovery "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/informers"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/controller-manager/pkg/clientbuilder"
"go-learning/practise/k8s-practise/app"
)
const ResourceResyncTime time.Duration = 0
// monitor runs a Controller with a local stop channel.
type monitor struct {
controller cache.Controller
store cache.Store
stopCh chan struct{}
}
func (m *monitor) Run() {
// 启动监控
m.controller.Run(m.stopCh)
}
var (
sharedInformers informers.SharedInformerFactory
restMapper *restmapper.DeferredDiscoveryRESTMapper
)
func main() {
config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(homedir.HomeDir(), ".kube", "config"))
if err != nil {
panic(err)
}
rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{ClientConfig: config}
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers = informers.NewSharedInformerFactory(versionedClient, ResourceResyncTime)
if err := app.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
panic(fmt.Errorf("failed to wait for apiserver being healthy: %v", err))
}
stopCh := make(chan struct{})
// Use a discovery client capable of being refreshed.
discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery")
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
restMapper = restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
restMapper.Reset()
}, 30*time.Second, stopCh)
gvrs := []schema.GroupVersionResource{
{Group: "apps", Version: "v1", Resource: "deployments"},
{Group: "", Version: "v1", Resource: "pods"},
}
monitors := make(map[schema.GroupVersionResource]*monitor)
for _, gvr := range gvrs {
kind, err := restMapper.KindFor(gvr)
if err != nil {
panic(err)
}
c, s, err := controllerFor(gvr, kind)
if err != nil {
panic(err)
}
monitors[gvr] = &monitor{store: s, controller: c}
}
for kind, monitor := range monitors {
if monitor.stopCh == nil {
monitor.stopCh = make(chan struct{})
sharedInformers.Start(stopCh)
fmt.Println("start monitor for ", kind)
go monitor.Run()
}
}
select {}
}
func controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
handlers := cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
fmt.Println("add", kind.String(), mObj.GetName(), mObj.GetFinalizers())
},
UpdateFunc: func(oldObj, newObj interface{}) {
mObj := newObj.(v1.Object)
fmt.Println("update", kind.String(), mObj.GetName(), mObj.GetFinalizers())
},
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
fmt.Println("delete", kind.String(), mObj.GetName(), mObj.GetFinalizers())
},
}
shared, err := sharedInformers.ForResource(resource)
if err != nil {
panic(err)
}
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
return shared.Informer().GetController(), shared.Informer().GetStore(), nil
}