Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move channel management from CH reader to rule DB #5934

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 1 addition & 265 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package clickhouseReader

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"os"
"reflect"
"regexp"
Expand Down Expand Up @@ -172,7 +169,7 @@ func NewReaderFromClickhouseConnection(
cluster string,
useLogsNewSchema bool,
) *ClickHouseReader {
alertManager, err := am.New("")
alertManager, err := am.New()
if err != nil {
zap.L().Error("failed to initialize alert manager", zap.Error(err))
zap.L().Error("check if the alert manager URL is correctly set and valid")
Expand Down Expand Up @@ -410,267 +407,6 @@ func (r *ClickHouseReader) GetConn() clickhouse.Conn {
return r.db
}

func (r *ClickHouseReader) LoadChannel(channel *model.ChannelItem) *model.ApiError {

receiver := &am.Receiver{}
if err := json.Unmarshal([]byte(channel.Data), receiver); err != nil { // Parse []byte to go struct pointer
return &model.ApiError{Typ: model.ErrorBadData, Err: err}
}

response, err := http.Post(constants.GetAlertManagerApiPrefix()+"v1/receivers", "application/json", bytes.NewBuffer([]byte(channel.Data)))

if err != nil {
zap.L().Error("Error in getting response of API call to alertmanager/v1/receivers", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if response.StatusCode > 299 {
responseData, _ := io.ReadAll(response.Body)

err := fmt.Errorf("error in getting 2xx response in API call to alertmanager/v1/receivers")
zap.L().Error("Error in getting 2xx response in API call to alertmanager/v1/receivers", zap.String("Status", response.Status), zap.String("Data", string(responseData)))

return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return nil
}

func (r *ClickHouseReader) GetChannel(id string) (*model.ChannelItem, *model.ApiError) {

idInt, _ := strconv.Atoi(id)
channel := model.ChannelItem{}

query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=? "

stmt, err := r.localDB.Preparex(query)

if err != nil {
zap.L().Error("Error in preparing sql query for GetChannel", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

err = stmt.Get(&channel, idInt)

if err != nil {
zap.L().Error("Error in getting channel with id", zap.Int("id", idInt), zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return &channel, nil

}

func (r *ClickHouseReader) DeleteChannel(id string) *model.ApiError {

idInt, _ := strconv.Atoi(id)

channelToDelete, apiErrorObj := r.GetChannel(id)

if apiErrorObj != nil {
return apiErrorObj
}

tx, err := r.localDB.Begin()
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

{
stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()

if _, err := stmt.Exec(idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}

apiError := r.alertManager.DeleteRoute(channelToDelete.Name)
if apiError != nil {
tx.Rollback()
return apiError
}

err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for DELETE command to notification_channels", zap.Error(err))
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return nil

}

func (r *ClickHouseReader) GetChannels() (*[]model.ChannelItem, *model.ApiError) {

channels := []model.ChannelItem{}

query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels"

err := r.localDB.Select(&channels, query)

zap.L().Info(query)

if err != nil {
zap.L().Error("Error in processing sql query", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return &channels, nil

}

func getChannelType(receiver *am.Receiver) string {

if receiver.EmailConfigs != nil {
return "email"
}
if receiver.OpsGenieConfigs != nil {
return "opsgenie"
}
if receiver.PagerdutyConfigs != nil {
return "pagerduty"
}
if receiver.PushoverConfigs != nil {
return "pushover"
}
if receiver.SNSConfigs != nil {
return "sns"
}
if receiver.SlackConfigs != nil {
return "slack"
}
if receiver.VictorOpsConfigs != nil {
return "victorops"
}
if receiver.WebhookConfigs != nil {
return "webhook"
}
if receiver.WechatConfigs != nil {
return "wechat"
}
if receiver.MSTeamsConfigs != nil {
return "msteams"
}
return ""
}

func (r *ClickHouseReader) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) {

idInt, _ := strconv.Atoi(id)

channel, apiErrObj := r.GetChannel(id)

if apiErrObj != nil {
return nil, apiErrObj
}
if channel.Name != receiver.Name {
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")}
}

tx, err := r.localDB.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

channel_type := getChannelType(receiver)

// check if channel type is supported in the current user plan
if err := r.featureFlags.CheckFeature(fmt.Sprintf("ALERT_CHANNEL_%s", strings.ToUpper(channel_type))); err != nil {
zap.L().Warn("an unsupported feature was blocked", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("unsupported feature. please upgrade your plan to access this feature")}
}

receiverString, _ := json.Marshal(receiver)

{
stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`)

if err != nil {
zap.L().Error("Error in preparing statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()

if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil {
zap.L().Error("Error in Executing prepared statement for UPDATE to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}

apiError := r.alertManager.EditRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}

err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return receiver, nil

}

func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) {

channel_type := getChannelType(receiver)

// check if channel type is supported in the current user plan
if err := r.featureFlags.CheckFeature(fmt.Sprintf("ALERT_CHANNEL_%s", strings.ToUpper(channel_type))); err != nil {
zap.L().Warn("an unsupported feature was blocked", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("unsupported feature. please upgrade your plan to access this feature")}
}

receiverString, _ := json.Marshal(receiver)

tx, err := r.localDB.Begin()
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

{
stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`)
if err != nil {
zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback()
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
defer stmt.Close()

if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil {
zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err))
tx.Rollback() // return an error too, we may want to wrap them
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
}

apiError := r.alertManager.AddRoute(receiver)
if apiError != nil {
tx.Rollback()
return nil, apiError
}

err = tx.Commit()
if err != nil {
zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err))
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}

return receiver, nil

}

func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type APIHandlerOpts struct {
// NewAPIHandler returns an APIHandler
func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) {

alertManager, err := am.New("")
alertManager, err := am.New()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) {

func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
channel, apiErrorObj := aH.reader.GetChannel(id)
channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
Expand All @@ -1192,7 +1192,7 @@ func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) {

func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
apiErrorObj := aH.reader.DeleteChannel(id)
apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id)
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
Expand All @@ -1201,7 +1201,7 @@ func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) {
}

func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) {
channels, apiErrorObj := aH.reader.GetChannels()
channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels()
if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
return
Expand Down Expand Up @@ -1254,7 +1254,7 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) {
return
}

_, apiErrorObj := aH.reader.EditChannel(receiver, id)
_, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id)

if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
Expand Down Expand Up @@ -1282,7 +1282,7 @@ func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) {
return
}

_, apiErrorObj := aH.reader.CreateChannel(receiver)
_, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver)

if apiErrorObj != nil {
RespondError(w, apiErrorObj, nil)
Expand Down
Loading
Loading