-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile.go
105 lines (87 loc) · 1.94 KB
/
file.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"github.com/logpipe/logpipe/config"
"github.com/logpipe/logpipe/core"
"github.com/logpipe/logpipe/plugin"
"strings"
)
const STATE_FILE = "state.json"
func Register() {
plugin.RegisterInputBuilder(&FileInputBuilder{})
}
type FileInputSpec struct {
Path string
Delim byte
Include string
Exclude string
}
type FileInput struct {
core.BaseInput
spec FileInputSpec
stop chan struct{}
consumer func(event core.Event)
scanner *FileScanner
syncer *FileStateSyncer
readers map[string]*FileReader
}
func NewFileInput(name string, spec FileInputSpec) *FileInput {
readers := make(map[string]*FileReader)
return &FileInput{spec: spec, readers: readers}
}
func (i *FileInput) Start(consumer func(event core.Event)) error {
i.consumer = consumer
spec := i.spec
paths := strings.Split(spec.Path, ";")
scanner := NewFileScanner(paths, 10, spec.Include, spec.Exclude)
scanner.Start()
i.scanner = scanner
go func() {
path := i.Context().Path()
i.syncer = NewFileStateSyncer(path, 30)
for {
select {
case <-i.stop:
return
case f := <-scanner.Files:
if _, ok := i.readers[f]; !ok {
reader := NewFileReader(f, spec.Delim, i.syncer, i.accept)
reader.Start()
i.readers[f] = reader
}
}
}
}()
return nil
}
func (i *FileInput) accept(line string) {
if i.consumer != nil {
var source interface{} = line
if i.Codec() != nil {
event, e := i.Codec().Decode(source)
if e == nil {
i.consumer(event)
}
} else {
event := core.NewEvent(source)
i.consumer(event)
}
}
}
func (i *FileInput) Stop() error {
i.stop <- struct{}{}
i.scanner.Stop()
for _, r := range i.readers {
r.Stop()
}
return nil
}
type FileInputBuilder struct {
}
func (b *FileInputBuilder) Kind() string {
return "file"
}
func (b *FileInputBuilder) Build(name string, specValue config.Value) core.Input {
spec := FileInputSpec{}
specValue.Parse(&spec)
return NewFileInput(name, spec)
}