|
1 | 1 | package main
|
2 | 2 |
|
3 | 3 | import (
|
| 4 | + "bytes" |
4 | 5 | "database/sql"
|
| 6 | + "encoding/json" |
5 | 7 | "fmt"
|
6 | 8 | "io/ioutil"
|
7 | 9 | "log"
|
8 | 10 | "net"
|
| 11 | + "net/http" |
9 | 12 | "os"
|
10 | 13 | "regexp"
|
11 | 14 | "strconv"
|
@@ -132,6 +135,58 @@ func sendMetric(v []string, t map[string]string, message string, tags [][]string
|
132 | 135 | fmt.Fprintf(conn, put)
|
133 | 136 | }
|
134 | 137 |
|
| 138 | +func rejectMetric(app string, metric string, metricType string) { |
| 139 | + appParts := strings.SplitN(app, "-", 2) |
| 140 | + appName := appParts[0] |
| 141 | + appSpace := appParts[1] |
| 142 | + |
| 143 | + rejectMessage := struct { |
| 144 | + Log string `json:"log"` |
| 145 | + Stream string `json:"stream"` |
| 146 | + Time time.Time `json:"time"` |
| 147 | + Kubernetes interface{} `json:"kubernetes"` |
| 148 | + Topic string `json:"topic"` |
| 149 | + }{ |
| 150 | + "Unique metrics limit exceeded. Metric discarded: [" + metricType + "] " + metric, |
| 151 | + "stdout", |
| 152 | + time.Now(), |
| 153 | + struct { |
| 154 | + PodName string `json:"pod_name"` |
| 155 | + ContainerName string `json:"container_name"` |
| 156 | + }{ |
| 157 | + "akkeris-warning", |
| 158 | + appName, |
| 159 | + }, |
| 160 | + appSpace, |
| 161 | + } |
| 162 | + |
| 163 | + var logshuttleURL string |
| 164 | + if os.Getenv("LOGSHUTTLE_URL") != "" { |
| 165 | + logshuttleURL = os.Getenv("LOGSHUTTLE_URL") |
| 166 | + } else { |
| 167 | + logshuttleURL = "http://logshuttle.akkeris-system.svc.cluster.local" |
| 168 | + } |
| 169 | + |
| 170 | + jsonb, err := json.Marshal(rejectMessage) |
| 171 | + if err != nil { |
| 172 | + fmt.Println("Unable to send reject message to logshuttle") |
| 173 | + fmt.Println(err) |
| 174 | + return |
| 175 | + } |
| 176 | + |
| 177 | + resp, err := http.Post(logshuttleURL+"/log-events", "application/json", bytes.NewBuffer(jsonb)) |
| 178 | + if err != nil { |
| 179 | + fmt.Println("Unable to send reject message to logshuttle") |
| 180 | + fmt.Println(err) |
| 181 | + return |
| 182 | + } |
| 183 | + defer resp.Body.Close() |
| 184 | + |
| 185 | + if resp.StatusCode < 200 || resp.StatusCode > 299 { |
| 186 | + fmt.Println("Error: " + strconv.Itoa(resp.StatusCode) + " response returned from logshuttle") |
| 187 | + } |
| 188 | +} |
| 189 | + |
135 | 190 | func main() {
|
136 | 191 | var err error
|
137 | 192 | conn, err = net.Dial("tcp", os.Getenv("OPENTSDB_IP"))
|
@@ -186,6 +241,8 @@ func main() {
|
186 | 241 | ok := checkMetric(t["app"], t["metric"])
|
187 | 242 | if ok {
|
188 | 243 | sendMetric(v, t, message, tags)
|
| 244 | + } else { |
| 245 | + rejectMetric(t["app"], t["metric"], v[1]) |
189 | 246 | }
|
190 | 247 | }
|
191 | 248 | }
|
|
0 commit comments