-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnrt.go
342 lines (300 loc) · 7.61 KB
/
nrt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
package nrt
import (
"fmt"
"os"
"sync"
"github.com/gosuri/uiprogress"
splitter "github.com/nsip/dev-nrt-splitter"
"github.com/nsip/dev-nrt/helper"
"github.com/nsip/dev-nrt/repository"
)
// note, if repo nil, then post-ingest cannot continue
// if repo existing, go straight to reports
// logic???
//
// ingest currently stops flow.
//
// the core nrt engine, passes the streams of
// rrd data through pipelines of report processors
// creating tabluar and fixed-width reports from the
// xml data
type Transformer struct {
repository *repository.BadgerRepo
inputFolder string
uip *uiprogress.Progress
helper helper.CodeframeHelper
objecthelper helper.ObjectHelper
qaReports bool
itemLevelReports bool
coreReports bool
wxReports bool
xmlReports bool
forceIngest bool
skipIngest bool
stopAfterIngest bool
showProgress bool
split bool
stats repository.ObjectStats
xmlWaitGroup *sync.WaitGroup
}
func NewTransformer(userOpts ...Option) (*Transformer, error) {
// generate the standard options
opts := defaultOptions()
// now add any user/caller options
opts = append(opts, userOpts...)
tr := Transformer{uip: uiprogress.New()}
tr.setOptions(opts...)
return &tr, nil
}
// Ingest and process data from RRD files
func (tr *Transformer) Run() error {
// ======================================
//
// Create or open data repository
//
var r *repository.BadgerRepo
var err error
if tr.forceIngest {
r, err = repository.NewBadgerRepo("./kv/")
if err != nil {
return err
}
tr.repository = r
}
if tr.skipIngest {
fmt.Println("--- Skipping ingest, using existing data:")
r, err = repository.OpenExistingBadgerRepo("./kv/")
if err != nil {
return err
}
tr.repository = r
}
if !tr.skipIngest {
fmt.Println("--- Ingesting results data:")
err = ingestResults(tr.inputFolder, r)
if err != nil {
return err
}
}
defer tr.repository.Close()
if tr.stopAfterIngest {
fmt.Println("--- Halting post-ingest.")
return nil
}
// =======================================
//
// Get the data stats, and show to user
//
fmt.Println("--- Data stats:")
fmt.Println()
tr.stats = r.GetStats()
for k, v := range tr.stats {
fmt.Printf("\t%s: %d\n", k, v)
}
fmt.Println()
var w sync.WaitGroup
if !tr.split {
// ====================================
//
// Build the codeframe helper
//
w.Add(1)
cfh, err := helper.NewCodeframeHelper(r, &w)
if err != nil {
return err
}
tr.helper = cfh
// ====================================
//
// Build the object helper
//
w.Add(1)
oh, err := helper.NewObjectHelper(r, &w)
if err != nil {
return err
}
tr.objecthelper = oh
w.Wait()
// ==================================
//
// run the report processing streams
//
err = tr.streamResults()
if err != nil {
return err
}
}
// =================================
//
// split
splitter.EnableProgBar(tr.showProgress)
err = splitter.NrtSplit("./config_split/config.toml")
if err != nil {
return err
}
// =================================
//
// tidy up
err = os.RemoveAll("./out/null/")
if err != nil {
return err
}
return nil
}
// ==========================================
//
// Options
//
// set the default options for a transformer
// basic reports, no pause after ingest, does show progress bars
func defaultOptions() []Option {
return []Option{
InputFolder("./in/"),
QAReports(false),
ItemLevelReports(false),
CoreReports(true),
WritingExtractReports(false),
XmlExtractReports(false),
ForceIngest(true),
StopAfterIngest(false),
ShowProgress(true),
Split(false),
}
}
type Option func(*Transformer) error
// apply all supplied options to the transformer
// returns any error encountered while applying the options
func (tr *Transformer) setOptions(options ...Option) error {
for _, opt := range options {
if err := opt(tr); err != nil {
return err
}
}
return nil
}
// Show progress bars for report processing.
// (The progress bar for data ingest is always shown)
// Defaults to true.
// Reasons to disable are: to get clear visibility of any
// std.Out console messages so they don't mix with the console
// progress bars. Also if piping the output to a file
// the progress bars are witten out sequentially producing a lot
// of unnecessary noise data in the file.
func ShowProgress(sp bool) Option {
return func(tr *Transformer) error {
tr.showProgress = sp
return nil
}
}
// Make transformer stop once data ingest is complete
// various report configurations can then be run
// independently without reloading the results data
// Default is false, tranformer will ingest data and move
// directly to report processing
func StopAfterIngest(sai bool) Option {
return func(tr *Transformer) error {
tr.stopAfterIngest = sai
return nil
}
}
// Even if an existing datastore has been created
// in the past, this option ensures that the old data
// is removed and the ingest cycle is run again
// reading all data files from the input folder.
// Default is true.
func ForceIngest(fi bool) Option {
return func(tr *Transformer) error {
tr.forceIngest = fi
return nil
}
}
// Tells NRT to go straight the the report processing
// activity, as data has already been ingested at
// an earlier point in time
func SkipIngest(si bool) Option {
return func(tr *Transformer) error {
tr.skipIngest = si
tr.forceIngest = false // must also be set
return nil
}
}
// indicate whether the writing-extract reports
// (input to downstream writing marking systems)
// will be included in this run of the transformer
func WritingExtractReports(wx bool) Option {
return func(tr *Transformer) error {
tr.wxReports = wx
if wx {
tr.coreReports = false // set so can run just wx, or add core as later option to re-instate
}
return nil
}
}
// indicate whether the XML reports
// (output of ingested files back as XML, with redaction)
// will be included in this run of the transformer
func XmlExtractReports(xml bool) Option {
return func(tr *Transformer) error {
tr.xmlReports = xml
if xml {
tr.coreReports = false // set so can run just xml, or add core as later option to re-instate
}
return nil
}
}
// indicate whether the most heavyweight/detailed
// reports will be included in this run of the
// transformer, including these has the largest effect
// on overall processing time
func ItemLevelReports(ilevel bool) Option {
return func(tr *Transformer) error {
tr.itemLevelReports = ilevel
return nil
}
}
// indicate whether the most-used common
// reports will be included in this run of the
// transformer
func CoreReports(core bool) Option {
return func(tr *Transformer) error {
tr.coreReports = core
return nil
}
}
// indicate whether qa reports will be included
// in this run of the transformer
func QAReports(qa bool) Option {
return func(tr *Transformer) error {
tr.qaReports = qa
return nil
}
}
// indicate whether trimmer/splitter will be run on its own
// in this run of the transformer
func Split(split bool) Option {
return func(tr *Transformer) error {
tr.split = split
return nil
}
}
// the folder contaning RRD xml data files for
// processing
func InputFolder(path string) Option {
return func(tr *Transformer) error {
// try to create the folder if it doesn't exist
if _, err := os.Stat(path); os.IsNotExist(err) {
if err := os.Mkdir(path, os.ModePerm); err != nil {
return err
}
}
tr.inputFolder = path
return nil
}
}
// the key-value store cotaining the ingested RRD data
func Repository(repo *repository.BadgerRepo) Option {
return func(tr *Transformer) error {
tr.repository = repo
return nil
}
}