diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index f44a8de353..561480b81a 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -55,6 +55,10 @@ func GetAlertManagerApiPrefix() string { return "http://alertmanager:9093/api/" } +var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720) + +var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360) + var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html") // Alert manager channel subpath @@ -232,6 +236,18 @@ func GetOrDefaultEnv(key string, fallback string) string { return v } +func GetOrDefaultEnvInt(key string, fallback int) int { + v := os.Getenv(key) + if len(v) == 0 { + return fallback + } + intVal, err := strconv.Atoi(v) + if err != nil { + return fallback + } + return intVal +} + const ( STRING = "String" UINT32 = "UInt32" diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 555d4a5d6c..e8675b3b90 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/go-co-op/gocron" "gopkg.in/segmentio/analytics-go.v3" "go.signoz.io/signoz/pkg/query-service/constants" @@ -85,19 +86,11 @@ const api_key = "9kRrJ7oPCGPEJLF6QjMPLt5bljFhRQBr" const IP_NOT_FOUND_PLACEHOLDER = "NA" const DEFAULT_NUMBER_OF_SERVICES = 6 -const HEART_BEAT_DURATION = 12 * time.Hour - -const ACTIVE_USER_DURATION = 6 * time.Hour - -// const HEART_BEAT_DURATION = 30 * time.Second -// const ACTIVE_USER_DURATION = 30 * time.Second +const SCHEDULE_START_TIME = "04:00" // 4 AM UTC const RATE_LIMIT_CHECK_DURATION = 1 * time.Minute const RATE_LIMIT_VALUE = 1 -// const RATE_LIMIT_CHECK_DURATION = 20 * time.Second -// const RATE_LIMIT_VALUE = 5 - var telemetry *Telemetry var once sync.Once @@ -213,8 +206,11 @@ func createTelemetry() { telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled()) - ticker := time.NewTicker(HEART_BEAT_DURATION) - activeUserTicker := time.NewTicker(ACTIVE_USER_DURATION) + // Create a new scheduler + s := gocron.NewScheduler(time.UTC) + + HEART_BEAT_DURATION := time.Duration(constants.TELEMETRY_HEART_BEAT_DURATION_MINUTES) * time.Minute + ACTIVE_USER_DURATION := time.Duration(constants.TELEMETRY_ACTIVE_USER_DURATION_MINUTES) * time.Minute rateLimitTicker := time.NewTicker(RATE_LIMIT_CHECK_DURATION) @@ -227,141 +223,171 @@ func createTelemetry() { } } }() - go func() { - for { - select { - case <-activeUserTicker.C: - if telemetry.activeUser["logs"] != 0 { - getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION) - if err != nil && getLogsInfoInLastHeartBeatInterval == 0 { - telemetry.activeUser["logs"] = 0 - } - } - if telemetry.activeUser["metrics"] != 0 { - getSamplesInfoInLastHeartBeatInterval, err := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION) - if err != nil && getSamplesInfoInLastHeartBeatInterval == 0 { - telemetry.activeUser["metrics"] = 0 - } - } - if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) { - telemetry.activeUser["any"] = 1 - } - telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{ - "traces": telemetry.activeUser["traces"], - "metrics": telemetry.activeUser["metrics"], - "logs": telemetry.activeUser["logs"], - "any": telemetry.activeUser["any"]}, - "", true, false) - telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0} - - case <-ticker.C: + ctx := context.Background() + // Define heartbeat function + heartbeatFunc := func() { + tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION) - tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION) + if len(tagsInfo.Env) != 0 { + telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "", true, false) + } - if len(tagsInfo.Env) != 0 { - telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "", true, false) - } + languages := []string{} + for language := range tagsInfo.Languages { + languages = append(languages, language) + } + if len(languages) > 0 { + telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": languages}, "", true, false) + } + services := []string{} + for service := range tagsInfo.Services { + services = append(services, service) + } + if len(services) > 0 { + telemetry.SendEvent(TELEMETRY_EVENT_SERVICE, map[string]interface{}{"serviceName": services}, "", true, false) + } + totalSpans, _ := telemetry.reader.GetTotalSpans(ctx) + totalLogs, _ := telemetry.reader.GetTotalLogs(ctx) + spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION) + getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION) + totalSamples, _ := telemetry.reader.GetTotalSamples(ctx) + tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(ctx) + + getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION) + + traceTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.TraceTTL}) + metricsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.MetricsTTL}) + logsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.LogsTTL}) + + data := map[string]interface{}{ + "totalSpans": totalSpans, + "spansInLastHeartBeatInterval": spansInLastHeartBeatInterval, + "totalSamples": totalSamples, + "getSamplesInfoInLastHeartBeatInterval": getSamplesInfoInLastHeartBeatInterval, + "totalLogs": totalLogs, + "getLogsInfoInLastHeartBeatInterval": getLogsInfoInLastHeartBeatInterval, + "countUsers": telemetry.countUsers, + "metricsTTLStatus": metricsTTL.Status, + "tracesTTLStatus": traceTTL.Status, + "logsTTLStatus": logsTTL.Status, + "patUser": telemetry.patTokenUser, + } + telemetry.patTokenUser = false + for key, value := range tsInfo { + data[key] = value + } - languages := []string{} - for language := range tagsInfo.Languages { - languages = append(languages, language) - } - if len(languages) > 0 { - telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": languages}, "", true, false) - } - services := []string{} - for service := range tagsInfo.Services { - services = append(services, service) - } - if len(services) > 0 { - telemetry.SendEvent(TELEMETRY_EVENT_SERVICE, map[string]interface{}{"serviceName": services}, "", true, false) - } - totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background()) - totalLogs, _ := telemetry.reader.GetTotalLogs(context.Background()) - spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION) - getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION) - totalSamples, _ := telemetry.reader.GetTotalSamples(context.Background()) - tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(context.Background()) - - getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION) - - traceTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.TraceTTL}) - metricsTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.MetricsTTL}) - logsTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.LogsTTL}) - - data := map[string]interface{}{ - "totalSpans": totalSpans, - "spansInLastHeartBeatInterval": spansInLastHeartBeatInterval, - "totalSamples": totalSamples, - "getSamplesInfoInLastHeartBeatInterval": getSamplesInfoInLastHeartBeatInterval, - "totalLogs": totalLogs, - "getLogsInfoInLastHeartBeatInterval": getLogsInfoInLastHeartBeatInterval, - "countUsers": telemetry.countUsers, - "metricsTTLStatus": metricsTTL.Status, - "tracesTTLStatus": traceTTL.Status, - "logsTTLStatus": logsTTL.Status, - "patUser": telemetry.patTokenUser, - } - telemetry.patTokenUser = false - for key, value := range tsInfo { - data[key] = value + users, apiErr := telemetry.reader.GetUsers(ctx) + if apiErr == nil { + for _, user := range users { + if user.Email == DEFAULT_CLOUD_EMAIL { + continue } + telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, user.Email, false, false) + } + } - users, apiErr := telemetry.reader.GetUsers(context.Background()) - if apiErr == nil { - for _, user := range users { - if user.Email == DEFAULT_CLOUD_EMAIL { - continue - } - telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, user.Email, false, false) - } - } - alertsInfo, err := telemetry.alertsInfoCallback(context.Background()) + alertsInfo, err := telemetry.alertsInfoCallback(ctx) + if err == nil { + dashboardsInfo, err := telemetry.reader.GetDashboardsInfo(ctx) + if err == nil { + channels, err := telemetry.reader.GetChannels() if err == nil { - dashboardsInfo, err := telemetry.reader.GetDashboardsInfo(context.Background()) + savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(ctx) if err == nil { - channels, err := telemetry.reader.GetChannels() - if err == nil { - savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(context.Background()) - if err == nil { - dashboardsAlertsData := map[string]interface{}{ - "totalDashboards": dashboardsInfo.TotalDashboards, - "totalDashboardsWithPanelAndName": dashboardsInfo.TotalDashboardsWithPanelAndName, - "logsBasedPanels": dashboardsInfo.LogsBasedPanels, - "metricBasedPanels": dashboardsInfo.MetricBasedPanels, - "tracesBasedPanels": dashboardsInfo.TracesBasedPanels, - "totalAlerts": alertsInfo.TotalAlerts, - "logsBasedAlerts": alertsInfo.LogsBasedAlerts, - "metricBasedAlerts": alertsInfo.MetricBasedAlerts, - "tracesBasedAlerts": alertsInfo.TracesBasedAlerts, - "totalChannels": len(*channels), - "totalSavedViews": savedViewsInfo.TotalSavedViews, - "logsSavedViews": savedViewsInfo.LogsSavedViews, - "tracesSavedViews": savedViewsInfo.TracesSavedViews, - } - // send event only if there are dashboards or alerts or channels - if (dashboardsInfo.TotalDashboards > 0 || alertsInfo.TotalAlerts > 0 || len(*channels) > 0 || savedViewsInfo.TotalSavedViews > 0) && apiErr == nil { - for _, user := range users { - if user.Email == DEFAULT_CLOUD_EMAIL { - continue - } - telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, dashboardsAlertsData, user.Email, false, false) - } + dashboardsAlertsData := map[string]interface{}{ + "totalDashboards": dashboardsInfo.TotalDashboards, + "totalDashboardsWithPanelAndName": dashboardsInfo.TotalDashboardsWithPanelAndName, + "logsBasedPanels": dashboardsInfo.LogsBasedPanels, + "metricBasedPanels": dashboardsInfo.MetricBasedPanels, + "tracesBasedPanels": dashboardsInfo.TracesBasedPanels, + "totalAlerts": alertsInfo.TotalAlerts, + "logsBasedAlerts": alertsInfo.LogsBasedAlerts, + "metricBasedAlerts": alertsInfo.MetricBasedAlerts, + "tracesBasedAlerts": alertsInfo.TracesBasedAlerts, + "totalChannels": len(*channels), + "totalSavedViews": savedViewsInfo.TotalSavedViews, + "logsSavedViews": savedViewsInfo.LogsSavedViews, + "tracesSavedViews": savedViewsInfo.TracesSavedViews, + } + // send event only if there are dashboards or alerts or channels + if (dashboardsInfo.TotalDashboards > 0 || alertsInfo.TotalAlerts > 0 || len(*channels) > 0 || savedViewsInfo.TotalSavedViews > 0) && apiErr == nil { + for _, user := range users { + if user.Email == DEFAULT_CLOUD_EMAIL { + continue } + telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, dashboardsAlertsData, user.Email, false, false) } } } } - if err != nil || apiErr != nil { - telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, map[string]interface{}{"error": err.Error()}, "", true, false) - } + } + } + if err != nil || apiErr != nil { + telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, map[string]interface{}{"error": err.Error()}, "", true, false) + } - getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(context.Background()) - telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "", true, false) + getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(ctx) + telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "", true, false) + } + + // Define active user function + activeUserFunc := func() { + if telemetry.activeUser["logs"] != 0 { + getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, ACTIVE_USER_DURATION) + if err != nil && getLogsInfoInLastHeartBeatInterval == 0 { + telemetry.activeUser["logs"] = 0 } } - }() + if telemetry.activeUser["metrics"] != 0 { + getSamplesInfoInLastHeartBeatInterval, err := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(ctx, ACTIVE_USER_DURATION) + if err != nil && getSamplesInfoInLastHeartBeatInterval == 0 { + telemetry.activeUser["metrics"] = 0 + } + } + if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) { + telemetry.activeUser["any"] = 1 + } + telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{ + "traces": telemetry.activeUser["traces"], + "metrics": telemetry.activeUser["metrics"], + "logs": telemetry.activeUser["logs"], + "any": telemetry.activeUser["any"]}, + "", true, false) + telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0} + } + + // Calculate next run time based on duration and start time + calculateNextRun := func(duration time.Duration, startTimeStr string) time.Time { + now := time.Now().UTC() + startTime, _ := time.Parse("15:04", startTimeStr) + todayStartTime := time.Date(now.Year(), now.Month(), now.Day(), startTime.Hour(), startTime.Minute(), 0, 0, time.UTC) + + if now.Before(todayStartTime) { + todayStartTime = todayStartTime.Add(-24 * time.Hour) + } + + diff := now.Sub(todayStartTime) + intervalsPassed := int(diff / duration) + nextRun := todayStartTime.Add(time.Duration(intervalsPassed+1) * duration) + + return nextRun + } + + // Schedule next runs + scheduleNextRuns := func() { + nextHeartbeat := calculateNextRun(HEART_BEAT_DURATION, SCHEDULE_START_TIME) + nextActiveUser := calculateNextRun(ACTIVE_USER_DURATION, SCHEDULE_START_TIME) + + s.Every(HEART_BEAT_DURATION).StartAt(nextHeartbeat).Do(heartbeatFunc) + s.Every(ACTIVE_USER_DURATION).StartAt(nextActiveUser).Do(activeUserFunc) + } + + // Schedule immediate execution and subsequent runs + scheduleNextRuns() + // Start the scheduler in a separate goroutine + go s.StartBlocking() } // Get preferred outbound ip of this machine