-
Notifications
You must be signed in to change notification settings - Fork 0
/
consul_watcher.go
132 lines (113 loc) · 2.41 KB
/
consul_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// Copyright Piero de Salvia.
// All Rights Reserved
package pluginator
import (
"log"
"strconv"
"time"
"github.com/hashicorp/consul/api"
)
type consulWatcher struct {
prefix string
Events chan consulEvent
KVClient *api.KV
kvS map[string]*valueAndModified
terminate bool
}
type consulEvent struct {
Action consulAction
Key string
Value string
}
type consulAction string
const (
consulAddAction consulAction = "Add"
consulRemoveAction consulAction = "Remove"
consulUpdateAction consulAction = "Update"
)
type valueAndModified struct {
Value string
Modified uint64
}
func newConsulWatcher(host string, port int, keyPrefix string) (*consulWatcher, error) {
cw := consulWatcher{}
config := api.DefaultConfig()
(*config).Address = host + ":" + strconv.Itoa(port)
client, err := api.NewClient(config)
if err != nil {
return nil, err
}
kv := client.KV()
cw.prefix = keyPrefix
cw.Events = make(chan consulEvent)
cw.KVClient = kv
cw.kvS = make(map[string]*valueAndModified)
go func() {
for !cw.terminate {
time.Sleep(3 * time.Second)
cw.scan()
}
}()
return &cw, nil
}
func (cw *consulWatcher) Terminate() {
cw.terminate = true
log.Println("Terminating consul watcher...")
}
func (cw *consulWatcher) scan() {
kvList, _, err := cw.KVClient.List(cw.prefix, nil)
if err != nil {
log.Println(err)
return
}
for _, kvPair := range kvList {
if vM, exists := cw.kvS[kvPair.Key]; !exists {
vM := valueAndModified{
Value: string(kvPair.Value),
Modified: kvPair.ModifyIndex,
}
cw.kvS[kvPair.Key] = &vM
event := consulEvent{
Action: consulAddAction,
Key: kvPair.Key,
Value: string(kvPair.Value),
}
cw.Events <- event
} else {
if kvPair.ModifyIndex > vM.Modified {
vM := valueAndModified{
Value: string(kvPair.Value),
Modified: kvPair.ModifyIndex,
}
cw.kvS[kvPair.Key] = &vM
event := consulEvent{
Action: consulUpdateAction,
Key: kvPair.Key,
Value: string(kvPair.Value),
}
cw.Events <- event
}
}
}
for k, vm := range cw.kvS {
if !contains(kvList, k) {
event := consulEvent{
Action: consulRemoveAction,
Key: k,
Value: vm.Value,
}
cw.Events <- event
delete(cw.kvS, k)
}
}
}
func contains(slice []*api.KVPair, key string) bool {
found := false
for _, kvPair := range slice {
found = (*kvPair).Key == key
if found {
break
}
}
return found
}