Skip to content

Commit

Permalink
feat: allow query restrictions for log queries (#4778)
Browse files Browse the repository at this point in the history
* feat: allow query restrictions for log queries

* fix: error check

* fix: set default only if not present

* chore: add error log for query restriction error

* fix: add limtations for traces

* fix: fix wrapper

---------

Co-authored-by: Srikanth Chekuri <[email protected]>
  • Loading branch information
nityanandagohain and srikanthccv committed Apr 10, 2024
1 parent 27e412d commit 389058b
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 17 deletions.
28 changes: 25 additions & 3 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,14 @@ func NewReaderFromClickhouseConnection(
os.Exit(1)
}

wrap := clickhouseConnWrapper{conn: db}
wrap := clickhouseConnWrapper{
conn: db,
settings: ClickhouseQuerySettings{
MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"),
TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"),
MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"),
},
}

return &ClickHouseReader{
db: wrap,
Expand Down Expand Up @@ -4742,7 +4749,7 @@ func readRowsForTimeSeriesResult(rows driver.Rows, vars []interface{}, columnNam
series := v3.Series{Labels: seriesToAttrs[key], Points: points, GroupingSetsPoint: groupingSetsPoint, LabelsArray: labelsArray[key]}
seriesList = append(seriesList, &series)
}
return seriesList, nil
return seriesList, getPersonalisedError(rows.Err())
}

func logComment(ctx context.Context) string {
Expand Down Expand Up @@ -4833,8 +4840,23 @@ func (r *ClickHouseReader) GetListResultV3(ctx context.Context, query string) ([
rowList = append(rowList, &v3.Row{Timestamp: t, Data: row})
}

return rowList, nil
return rowList, getPersonalisedError(rows.Err())

}

func getPersonalisedError(err error) error {
if err == nil {
return nil
}
zap.L().Error("error while reading result", zap.Error(err))
if strings.Contains(err.Error(), "code: 307") {
return errors.New("query is consuming too much resources, please reach out to the team")
}

if strings.Contains(err.Error(), "code: 159") {
return errors.New("Query is taking too long to run, please reach out to the team")
}
return err
}

func removeDuplicateUnderscoreAttributes(row map[string]interface{}) {
Expand Down
63 changes: 49 additions & 14 deletions pkg/query-service/app/clickhouseReader/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,21 @@ package clickhouseReader
import (
"context"
"encoding/json"
"strings"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

type ClickhouseQuerySettings struct {
MaxExecutionTimeLeaf string
TimeoutBeforeCheckingExecutionSpeed string
MaxBytesToRead string
}

type clickhouseConnWrapper struct {
conn clickhouse.Conn
conn clickhouse.Conn
settings ClickhouseQuerySettings
}

func (c clickhouseConnWrapper) Close() error {
Expand All @@ -24,48 +32,75 @@ func (c clickhouseConnWrapper) Stats() driver.Stats {
return c.conn.Stats()
}

func (c clickhouseConnWrapper) logComment(ctx context.Context) context.Context {
func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context {
settings := clickhouse.Settings{}

logComment := c.getLogComment(ctx)
if logComment != "" {
settings["log_comment"] = logComment
}

// don't add resource restrictions traces
if strings.Contains(query, "signoz_traces") {
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx
}

if c.settings.MaxBytesToRead != "" {
settings["max_bytes_to_read"] = c.settings.MaxBytesToRead
}

if c.settings.MaxExecutionTimeLeaf != "" {
settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf
}

if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" {
settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed
}

ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))
return ctx
}

func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string {
// Get the key-value pairs from context for log comment
kv := ctx.Value("log_comment")
if kv == nil {
return ctx
return ""
}

logCommentKVs, ok := kv.(map[string]string)
if !ok {
return ctx
return ""
}

logComment, _ := json.Marshal(logCommentKVs)

ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"log_comment": logComment,
}))
return ctx
return string(logComment)
}

func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
return c.conn.Query(c.logComment(ctx), query, args...)
return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...)
}

func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
return c.conn.QueryRow(c.logComment(ctx), query, args...)
return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...)
}

func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return c.conn.Select(c.logComment(ctx), dest, query, args...)
return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...)
}

func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error {
return c.conn.Exec(c.logComment(ctx), query, args...)
return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...)
}

func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error {
return c.conn.AsyncInsert(c.logComment(ctx), query, wait, args...)
return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...)
}

func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) {
return c.conn.PrepareBatch(c.logComment(ctx), query, opts...)
return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...)
}

func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/query-service/app/logs/v3/enrich_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu
return field
}

// enrich with default values if metadata is not found
if field.Type == "" {
field.Type = v3.AttributeKeyTypeTag
}
Expand Down

0 comments on commit 389058b

Please sign in to comment.