Skip to content

Commit

Permalink
chore: move channel management from CH reader to rule DB (#5934)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Sep 17, 2024
1 parent b78ade2 commit f8e97c9
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 369 deletions.
266 changes: 1 addition & 265 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package clickhouseReader

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"os"
"reflect"
"regexp"
Expand Down Expand Up @@ -173,7 +170,7 @@ func NewReaderFromClickhouseConnection(
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New("")
alertManager, err := am.New()
if err != nil {
zap.L().Error("failed to initialize alert manager", zap.Error(err))
zap.L().Error("check if the alert manager URL is correctly set and valid")
Expand Down Expand Up @@ -414,267 +411,6 @@ func (r *ClickHouseReader) GetConn() clickhouse.Conn {
return r.db
}

func (r *ClickHouseReader) LoadChannel(channel *model.ChannelItem) *model.ApiError {

receiver := &am.Receiver{}
if err := json.Unmarshal([]byte(channel.Data), receiver); err != nil { // Parse []byte to go struct pointer
return &model.ApiError{Typ: model.ErrorBadData, Err: err}
}

response, err := http.Post(constants.GetAlertManagerApiPrefix()+"v1/receivers", "application/json", bytes.NewBuffer([]byte(channel.Data)))

if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager/v1/receivers", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
responseData, _ := io.ReadAll(response.Body)

err := fmt.Errorf("error in getting 2xx response in API call to alertmanager/v1/receivers")
zap.L().Error("Error in getting 2xx response in API call to alertmanager/v1/receivers", zap.String("Status", response.Status), zap.String("Data", string(responseData)))

return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return nil
}

func (r *ClickHouseReader) GetChannel(id string) (*model.ChannelItem, *model.ApiError) {

idInt, _ := strconv.Atoi(id)
channel := model.ChannelItem{}

query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=? "

stmt, err := r.localDB.Preparex(query)

if err != nil {
zap.L().Error("Error in preparing sql query for GetChannel", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

err = stmt.Get(&channel, idInt)

if err != nil {
zap.L().Error("Error in getting channel with id", zap.Int("id", idInt), zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return &channel, nil

}

func (r *ClickHouseReader) DeleteChannel(id string) *model.ApiError {

idInt, _ := strconv.Atoi(id)

channelToDelete, apiErrorObj := r.GetChannel(id)

if apiErrorObj != nil {
return apiErrorObj
}

tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

{
stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()

if _, err := stmt.Exec(idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}

apiError := r.alertManager.DeleteRoute(channelToDelete.Name)
if apiError != nil {
tx.Rollback()
return apiError
}

err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for DELETE command to notification_channels", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return nil

}

func (r *ClickHouseReader) GetChannels() (*[]model.ChannelItem, *model.ApiError) {

channels := []model.ChannelItem{}

query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels"

err := r.localDB.Select(&channels, query)

zap.L().Info(query)

if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return &channels, nil

}

func getChannelType(receiver *am.Receiver) string {

if receiver.EmailConfigs != nil {
return "email"
}
if receiver.OpsGenieConfigs != nil {
return "opsgenie"
}
if receiver.PagerdutyConfigs != nil {
return "pagerduty"
}
if receiver.PushoverConfigs != nil {
return "pushover"
}
if receiver.SNSConfigs != nil {
return "sns"
}
if receiver.SlackConfigs != nil {
return "slack"
}
if receiver.VictorOpsConfigs != nil {
return "victorops"
}
if receiver.WebhookConfigs != nil {
return "webhook"
}
if receiver.WechatConfigs != nil {
return "wechat"
}
if receiver.MSTeamsConfigs != nil {
return "msteams"
}
return ""
}

func (r *ClickHouseReader) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) {

idInt, _ := strconv.Atoi(id)

channel, apiErrObj := r.GetChannel(id)

if apiErrObj != nil {
return nil, apiErrObj
}
if channel.Name != receiver.Name {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")}
}

tx, err := r.localDB.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

channel_type := getChannelType(receiver)

// check if channel type is supported in the current user plan
if err := r.featureFlags.CheckFeature(fmt.Sprintf("ALERT_CHANNEL_%s", strings.ToUpper(channel_type))); err != nil {
zap.L().Warn("an unsupported feature was blocked", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("unsupported feature. please upgrade your plan to access this feature")}
}

receiverString, _ := json.Marshal(receiver)

{
stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`)

if err != nil {
zap.L().Error("Error in preparing statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()

if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}

apiError := r.alertManager.EditRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}

err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return receiver, nil

}

func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {

channel_type := getChannelType(receiver)

// check if channel type is supported in the current user plan
if err := r.featureFlags.CheckFeature(fmt.Sprintf("ALERT_CHANNEL_%s", strings.ToUpper(channel_type))); err != nil {
zap.L().Warn("an unsupported feature was blocked", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("unsupported feature. please upgrade your plan to access this feature")}
}

receiverString, _ := json.Marshal(receiver)

tx, err := r.localDB.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

{
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()

if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}

apiError := r.alertManager.AddRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}

err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return receiver, nil

}

func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type APIHandlerOpts struct {
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {

alertManager, err := am.New("")
alertManager, err := am.New()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {

func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
channel, apiErrorObj := aH.reader.GetChannel(id)
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
Expand All @@ -1192,7 +1192,7 @@ func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {

func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
apiErrorObj := aH.reader.DeleteChannel(id)
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
Expand All @@ -1201,7 +1201,7 @@ func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
}

func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
channels, apiErrorObj := aH.reader.GetChannels()
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
Expand Down Expand Up @@ -1254,7 +1254,7 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
return
}

_, apiErrorObj := aH.reader.EditChannel(receiver, id)
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)

if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
return
}

_, apiErrorObj := aH.reader.CreateChannel(receiver)
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)

if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
Expand Down
Loading

0 comments on commit f8e97c9

Please sign in to comment.