-
Notifications
You must be signed in to change notification settings - Fork 0
/
visitworker.go
207 lines (173 loc) · 5.59 KB
/
visitworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// Copyright 2024 Factorial GmbH. All rights reserved.
package main
import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const (
// The maximum number of retries for a job. Jobs are retried if they fail
// with an error that indicates a temporary issue, i.e. a 503, when a host
// is down for maintenance.
MaxJobRetries = 3
)
// CreateVisitWorkersPool initizalizes a worker pool and fills it with a number
// of VisitWorker.
func CreateVisitWorkersPool(
ctx context.Context,
num int,
runs *RunManager,
q VisitWorkQueue,
progress Progress,
hooks *WebhookDispatcher,
) *sync.WaitGroup {
var wg sync.WaitGroup
slog.Debug("Visitor: Starting workers...", "num", num)
for i := 0; i < num; i++ {
wg.Add(1)
go func(id int) {
if err := VisitWorker(ctx, id, runs, q, progress, hooks); err != nil {
slog.Error("Visitor: Worker exited with error.", "worker.id", id, "error", err)
} else {
slog.Debug("Visitor: Worker exited cleanly.", "worker.id", id)
}
wg.Done()
}(i)
}
return &wg
}
// VisitWorker fetches a resource from a given URL, consumed from the work queue.
func VisitWorker(
ctx context.Context,
id int,
runs *RunManager,
q VisitWorkQueue,
progress Progress,
hooks *WebhookDispatcher,
) error {
wlogger := slog.With("worker.id", id)
wlogger.Debug("Visitor: Starting...")
jobs, errs := q.Consume(ctx)
for {
var job *VisitJob
wlogger.Debug("Visitor: Waiting for job...")
select {
// This allows to stop a worker gracefully.
case <-ctx.Done():
wlogger.Debug("Visitor: Context cancelled, stopping worker.")
return nil
case err := <-errs:
_, span := tracer.Start(ctx, "handle.visit.queue.worker.error")
wlogger.Error("Visitor: Failed to consume from queue.", "error", err)
span.RecordError(err)
span.End()
return err
case j := <-jobs:
job = j
}
jlogger := wlogger.With("run", job.Run, "url", job.URL, "job.id", job.ID)
p := progress.With(job.Run, job.URL)
jctx, span := tracer.Start(job.Context, "process_visit_job")
span.SetAttributes(attribute.String("Url", job.URL))
t := trace.WithAttributes(attribute.String("Url", job.URL))
jlogger.Debug("Visitor: Received job.")
if _, err := job.Validate(); err != nil {
jlogger.Error(err.Error())
span.End()
continue
}
// If this tobey instance is also the instance that received the run request,
// we already have a Collector locally available. If this instance has retrieved
// a VisitMessage that was put in the queue by another tobey instance, we don't
// yet have a collector available via the Manager. Please note that Collectors
// are not shared by the Manager across tobey instances.
r, _ := runs.Get(ctx, job.Run)
c := r.GetCollector(ctx, q, progress, hooks)
p.Update(jctx, ProgressStateCrawling)
res, err := c.Visit(jctx, job.URL)
if UseMetrics {
PromVisitTxns.Inc()
}
if UsePulse {
atomic.AddInt32(&PulseVisitTxns, 1)
}
if res != nil {
q.TakeRateLimitHeaders(jctx, job.URL, res.Headers)
q.TakeSample(jctx, job.URL, res.StatusCode, err, res.Took)
} else {
q.TakeSample(jctx, job.URL, 0, err, 0)
}
if err == nil {
p.Update(jctx, ProgressStateCrawled)
jlogger.Info("Visitor: Visited URL.", "took.lifetime", time.Since(job.Created), "took.fetch", res.Took)
span.AddEvent("Visitor: Visited URL.", t)
// TODO: Notify the webhook.
span.End()
continue
}
if res != nil {
// We have response information, use it to determine the correct error handling in detail.
switch res.StatusCode {
case 302: // Redirect
// When a redirect is encountered, the visit errors out. This is in fact
// no an actual error, but just a skip.
p.Update(jctx, ProgressStateCancelled)
jlogger.Info("Visitor: Skipped URL, got redirected.")
span.AddEvent("Cancelled visiting URL", t)
span.End()
continue
case 404:
// FIXME: Probably want to lower the log level to Info for 404s here.
case 429: // Too Many Requests
fallthrough
case 503: // Service Unavailable
// Additionally want to retrieve the Retry-After header here and wait for that amount of time, if
// we just have to wait than don't error out but reschedule the job and wait. In order to not do
// that infinitely, we should have a maximum number of retries.
// Handling of Retry-After header is optional, so errors here
// are not critical.
if v := res.Headers.Get("Retry-After"); v != "" {
d, _ := time.ParseDuration(v + "s")
q.Pause(jctx, res.Request.URL.String(), d)
}
if job.Retries < MaxJobRetries {
if err := q.Republish(jctx, job); err != nil {
jlogger.Warn("Visitor: Republish failed, stopping retrying.", "error", err)
} else {
// Leave job in "Crawling" state.
span.End()
continue
}
} else {
jlogger.Warn("Visitor: Maximum number of retries reached.")
}
default:
// Noop, fallthrough to generic error handling.
}
} else if errors.Is(err, context.DeadlineExceeded) {
// We react to timeouts as a temporary issue and retry the job, similary
// to 429 and 503 errors.
if job.Retries < MaxJobRetries {
if err := q.Republish(jctx, job); err != nil {
jlogger.Warn("Visitor: Republish failed, stopping retrying.", "error", err)
} else {
// Leave job in "Crawling" state.
span.End()
continue
}
} else {
jlogger.Warn("Visitor: Maximum number of retries reached.")
}
}
p.Update(jctx, ProgressStateErrored)
jlogger.Error("Visitor: Error visiting URL.", "error", err)
span.RecordError(err)
span.End()
continue
}
}