1
1
package main
2
2
3
3
import (
4
+ "bytes"
4
5
"database/sql"
6
+ "encoding/json"
5
7
"fmt"
6
8
"io/ioutil"
7
9
"log"
8
10
"net"
11
+ "net/http"
9
12
"os"
10
13
"regexp"
11
14
"strconv"
@@ -19,10 +22,14 @@ import (
19
22
var conn net.Conn
20
23
var db * sql.DB
21
24
25
+ // Only used if ENABLE_UNIQUE_METRIC_LIMIT is set to "true"
22
26
var uniqueMetricLimit int
23
27
24
28
const defaultMetricLimit int = 100
25
29
30
+ // Only used if LOGSHUTTLE_URL is set
31
+ var sentRejections map [string ]map [string ]int
32
+
26
33
func initDB () (err error ) {
27
34
var dberr error
28
35
db , dberr = sql .Open ("postgres" , os .Getenv ("DATABASE_URL" ))
@@ -132,6 +139,98 @@ func sendMetric(v []string, t map[string]string, message string, tags [][]string
132
139
fmt .Fprintf (conn , put )
133
140
}
134
141
142
+ // checkPreviousRejections - Go through the rejection cache and report if we have reached the rejection limit for the app and metric
143
+ func checkPreviousRejections (app string , metric string , metricType string ) bool {
144
+ limit , err := strconv .Atoi (os .Getenv ("REJECT_MESSAGE_LIMIT" ))
145
+ if err != nil {
146
+ limit = 1
147
+ }
148
+
149
+ if sentRejections == nil {
150
+ sentRejections = make (map [string ]map [string ]int )
151
+ }
152
+
153
+ if sentRejections [app ] == nil {
154
+ sentRejections [app ] = make (map [string ]int )
155
+ }
156
+
157
+ ok := (sentRejections [app ][metricType + metric ] < limit )
158
+ sentRejections [app ][metricType + metric ]++
159
+
160
+ return ok
161
+ }
162
+
163
+ func rejectMetric (app string , metric string , metricType string ) {
164
+ appParts := strings .SplitN (app , "-" , 2 )
165
+ appName := appParts [0 ]
166
+ appSpace := appParts [1 ]
167
+
168
+ // Replace all # characters with _ so that we don't send anything
169
+ // to the logshuttle that might be considered a new metric
170
+ metric = strings .Replace (metric , "#" , "_" , - 1 )
171
+
172
+ // Limit the number of times we report to the app logs
173
+ if os .Getenv ("REJECT_MESSAGE_LIMIT" ) != "" {
174
+ if ! checkPreviousRejections (app , metric , metricType ) {
175
+ if os .Getenv ("DEBUG" ) == "true" {
176
+ fmt .Println ("Reject message limit reached for " + app + ": [" + metricType + "] " + metric )
177
+ }
178
+ return
179
+ }
180
+ }
181
+
182
+ logMessage := "Unique metrics limit exceeded. Metric discarded: [" + metricType + "] " + metric
183
+ if os .Getenv ("UNIQUE_METRIC_LIMIT_HELP" ) != "" {
184
+ logMessage = "(" + os .Getenv ("UNIQUE_METRIC_LIMIT_HELP" ) + ") " + logMessage
185
+ }
186
+
187
+ rejectMessage := struct {
188
+ Log string `json:"log"`
189
+ Stream string `json:"stream"`
190
+ Time time.Time `json:"time"`
191
+ Kubernetes interface {} `json:"kubernetes"`
192
+ Topic string `json:"topic"`
193
+ }{
194
+ Log : logMessage ,
195
+ Stream : "stdout" ,
196
+ Time : time .Now (),
197
+ Kubernetes : struct {
198
+ PodName string `json:"pod_name"`
199
+ ContainerName string `json:"container_name"`
200
+ }{
201
+ PodName : "akkeris/metrics" ,
202
+ ContainerName : appName ,
203
+ },
204
+ Topic : appSpace ,
205
+ }
206
+
207
+ var logshuttleURL string
208
+ if os .Getenv ("LOGSHUTTLE_URL" ) != "" {
209
+ logshuttleURL = os .Getenv ("LOGSHUTTLE_URL" )
210
+ } else {
211
+ logshuttleURL = "http://logshuttle.akkeris-system.svc.cluster.local"
212
+ }
213
+
214
+ jsonb , err := json .Marshal (rejectMessage )
215
+ if err != nil {
216
+ fmt .Println ("Unable to send reject message to logshuttle" )
217
+ fmt .Println (err )
218
+ return
219
+ }
220
+
221
+ resp , err := http .Post (logshuttleURL + "/log-events" , "application/json" , bytes .NewBuffer (jsonb ))
222
+ if err != nil {
223
+ fmt .Println ("Unable to send reject message to logshuttle" )
224
+ fmt .Println (err )
225
+ return
226
+ }
227
+ defer resp .Body .Close ()
228
+
229
+ if resp .StatusCode < 200 || resp .StatusCode > 299 {
230
+ fmt .Println ("Error: " + strconv .Itoa (resp .StatusCode ) + " response returned from logshuttle" )
231
+ }
232
+ }
233
+
135
234
func main () {
136
235
var err error
137
236
conn , err = net .Dial ("tcp" , os .Getenv ("OPENTSDB_IP" ))
@@ -150,10 +249,12 @@ func main() {
150
249
151
250
server .Boot ()
152
251
153
- err = initDB ()
154
- if err != nil {
155
- fmt .Println ("Error establishing database connection: " + err .Error ())
156
- return
252
+ if os .Getenv ("ENABLE_UNIQUE_METRIC_LIMIT" ) == "true" {
253
+ err = initDB ()
254
+ if err != nil {
255
+ fmt .Println ("Error establishing database connection: " + err .Error ())
256
+ return
257
+ }
157
258
}
158
259
159
260
if os .Getenv ("UNIQUE_METRIC_LIMIT" ) != "" {
@@ -178,14 +279,28 @@ func main() {
178
279
t := make (map [string ]string )
179
280
t ["app" ] = logParts ["hostname" ].(string )
180
281
282
+ // Useful for testing:
283
+ // t["app"] = logParts["app_name"].(string)
284
+
181
285
measurements := re .FindAllStringSubmatch (message , - 1 )
182
286
tags := tagsRe .FindAllStringSubmatch (message , - 1 )
183
287
184
288
for _ , v := range measurements {
185
289
t ["metric" ] = v [2 ]
186
- ok := checkMetric (t ["app" ], t ["metric" ])
290
+
291
+ // Only check metric limit if ENABLE_UNIQUE_METRIC_LIMIT is set to "true"
292
+ ok := true
293
+ if os .Getenv ("ENABLE_UNIQUE_METRIC_LIMIT" ) == "true" {
294
+ ok = checkMetric (t ["app" ], t ["metric" ])
295
+ }
296
+
187
297
if ok {
188
298
sendMetric (v , t , message , tags )
299
+ } else {
300
+ // Only send rejection message if LOGSHUTTLE_URL is set
301
+ if os .Getenv ("LOGSHUTTLE_URL" ) != "" {
302
+ rejectMetric (t ["app" ], t ["metric" ], v [1 ])
303
+ }
189
304
}
190
305
}
191
306
}
0 commit comments