-
Notifications
You must be signed in to change notification settings - Fork 3
/
scheduler.go
83 lines (69 loc) · 1.73 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package poller
import (
"sync"
"time"
)
type Scheduler interface {
Schedule(check *Check) error
Stop(key string)
StopAll()
Start()
Next() <-chan *Check
}
type simpleScheduler struct {
stopSignals map[string]chan int // collection of channels which are used to signal a goroutine to abandon ship immediately
toPoll chan *Check // checks which are due to polling
toSchedule chan *Check // checks which are due to scheduling
mu sync.Mutex
}
// Instantiates a SimpleScheduler which scheduling strategy's fairly basic.
// For each scheduled check, a new time.Timer is created in its own goroutine.
func NewSimpleScheduler() Scheduler {
return &simpleScheduler{
stopSignals: make(map[string]chan int),
toPoll: make(chan *Check),
toSchedule: make(chan *Check)}
}
func (s *simpleScheduler) schedule(check *Check, deleteSignal <-chan int) {
timer := time.NewTimer(check.Interval)
select {
case <-timer.C:
s.toSchedule <- check
case <-deleteSignal:
break
}
}
func (s *simpleScheduler) Schedule(check *Check) error {
s.mu.Lock()
defer s.mu.Unlock()
s.stopSignals[check.Key] = make(chan int)
go s.schedule(check, s.stopSignals[check.Key])
return nil
}
func (s *simpleScheduler) stop(key string) {
s.stopSignals[key] <- 1
close(s.stopSignals[key])
delete(s.stopSignals, key)
}
func (s *simpleScheduler) Stop(key string) {
s.mu.Lock()
defer s.mu.Unlock()
s.stop(key)
}
func (s *simpleScheduler) StopAll() {
s.mu.Lock()
defer s.mu.Unlock()
for k := range s.stopSignals {
s.stop(k)
}
}
func (s *simpleScheduler) Start() {
for {
check := <-s.toSchedule
go s.schedule(check, s.stopSignals[check.Key])
s.toPoll <- check
}
}
func (s *simpleScheduler) Next() <-chan *Check {
return s.toPoll
}