Skip to content

Commit

Permalink
chore: update heartbeat interval logic (#5507)
Browse files Browse the repository at this point in the history
* chore: update heartbeat interval logic

* chore: address review comment
  • Loading branch information
makeavish authored Jul 17, 2024
1 parent 77eba9a commit d3b83f5
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 130 deletions.
16 changes: 16 additions & 0 deletions pkg/query-service/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func GetAlertManagerApiPrefix() string {
return "http://alertmanager:9093/api/"
}

var TELEMETRY_HEART_BEAT_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_HEART_BEAT_DURATION_MINUTES", 720)

var TELEMETRY_ACTIVE_USER_DURATION_MINUTES = GetOrDefaultEnvInt("TELEMETRY_ACTIVE_USER_DURATION_MINUTES", 360)

var InviteEmailTemplate = GetOrDefaultEnv("INVITE_EMAIL_TEMPLATE", "/root/templates/invitation_email_template.html")

// Alert manager channel subpath
Expand Down Expand Up @@ -232,6 +236,18 @@ func GetOrDefaultEnv(key string, fallback string) string {
return v
}

func GetOrDefaultEnvInt(key string, fallback int) int {
v := os.Getenv(key)
if len(v) == 0 {
return fallback
}
intVal, err := strconv.Atoi(v)
if err != nil {
return fallback
}
return intVal
}

const (
STRING = "String"
UINT32 = "UInt32"
Expand Down
286 changes: 156 additions & 130 deletions pkg/query-service/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/go-co-op/gocron"
"gopkg.in/segmentio/analytics-go.v3"

"go.signoz.io/signoz/pkg/query-service/constants"
Expand Down Expand Up @@ -85,19 +86,11 @@ const api_key = "9kRrJ7oPCGPEJLF6QjMPLt5bljFhRQBr"
const IP_NOT_FOUND_PLACEHOLDER = "NA"
const DEFAULT_NUMBER_OF_SERVICES = 6

const HEART_BEAT_DURATION = 12 * time.Hour

const ACTIVE_USER_DURATION = 6 * time.Hour

// const HEART_BEAT_DURATION = 30 * time.Second
// const ACTIVE_USER_DURATION = 30 * time.Second
const SCHEDULE_START_TIME = "04:00" // 4 AM UTC

const RATE_LIMIT_CHECK_DURATION = 1 * time.Minute
const RATE_LIMIT_VALUE = 1

// const RATE_LIMIT_CHECK_DURATION = 20 * time.Second
// const RATE_LIMIT_VALUE = 5

var telemetry *Telemetry
var once sync.Once

Expand Down Expand Up @@ -213,8 +206,11 @@ func createTelemetry() {

telemetry.SetTelemetryEnabled(constants.IsTelemetryEnabled())

ticker := time.NewTicker(HEART_BEAT_DURATION)
activeUserTicker := time.NewTicker(ACTIVE_USER_DURATION)
// Create a new scheduler
s := gocron.NewScheduler(time.UTC)

HEART_BEAT_DURATION := time.Duration(constants.TELEMETRY_HEART_BEAT_DURATION_MINUTES) * time.Minute
ACTIVE_USER_DURATION := time.Duration(constants.TELEMETRY_ACTIVE_USER_DURATION_MINUTES) * time.Minute

rateLimitTicker := time.NewTicker(RATE_LIMIT_CHECK_DURATION)

Expand All @@ -227,141 +223,171 @@ func createTelemetry() {
}
}
}()
go func() {
for {
select {
case <-activeUserTicker.C:
if telemetry.activeUser["logs"] != 0 {
getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION)
if err != nil && getLogsInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["logs"] = 0
}
}
if telemetry.activeUser["metrics"] != 0 {
getSamplesInfoInLastHeartBeatInterval, err := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), ACTIVE_USER_DURATION)
if err != nil && getSamplesInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["metrics"] = 0
}
}
if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) {
telemetry.activeUser["any"] = 1
}
telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{
"traces": telemetry.activeUser["traces"],
"metrics": telemetry.activeUser["metrics"],
"logs": telemetry.activeUser["logs"],
"any": telemetry.activeUser["any"]},
"", true, false)
telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0}

case <-ticker.C:
ctx := context.Background()
// Define heartbeat function
heartbeatFunc := func() {
tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)

tagsInfo, _ := telemetry.reader.GetTagsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
if len(tagsInfo.Env) != 0 {
telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "", true, false)
}

if len(tagsInfo.Env) != 0 {
telemetry.SendEvent(TELEMETRY_EVENT_ENVIRONMENT, map[string]interface{}{"value": tagsInfo.Env}, "", true, false)
}
languages := []string{}
for language := range tagsInfo.Languages {
languages = append(languages, language)
}
if len(languages) > 0 {
telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": languages}, "", true, false)
}
services := []string{}
for service := range tagsInfo.Services {
services = append(services, service)
}
if len(services) > 0 {
telemetry.SendEvent(TELEMETRY_EVENT_SERVICE, map[string]interface{}{"serviceName": services}, "", true, false)
}
totalSpans, _ := telemetry.reader.GetTotalSpans(ctx)
totalLogs, _ := telemetry.reader.GetTotalLogs(ctx)
spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)
totalSamples, _ := telemetry.reader.GetTotalSamples(ctx)
tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(ctx)

getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, HEART_BEAT_DURATION)

traceTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.TraceTTL})
metricsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.MetricsTTL})
logsTTL, _ := telemetry.reader.GetTTL(ctx, &model.GetTTLParams{Type: constants.LogsTTL})

data := map[string]interface{}{
"totalSpans": totalSpans,
"spansInLastHeartBeatInterval": spansInLastHeartBeatInterval,
"totalSamples": totalSamples,
"getSamplesInfoInLastHeartBeatInterval": getSamplesInfoInLastHeartBeatInterval,
"totalLogs": totalLogs,
"getLogsInfoInLastHeartBeatInterval": getLogsInfoInLastHeartBeatInterval,
"countUsers": telemetry.countUsers,
"metricsTTLStatus": metricsTTL.Status,
"tracesTTLStatus": traceTTL.Status,
"logsTTLStatus": logsTTL.Status,
"patUser": telemetry.patTokenUser,
}
telemetry.patTokenUser = false
for key, value := range tsInfo {
data[key] = value
}

languages := []string{}
for language := range tagsInfo.Languages {
languages = append(languages, language)
}
if len(languages) > 0 {
telemetry.SendEvent(TELEMETRY_EVENT_LANGUAGE, map[string]interface{}{"language": languages}, "", true, false)
}
services := []string{}
for service := range tagsInfo.Services {
services = append(services, service)
}
if len(services) > 0 {
telemetry.SendEvent(TELEMETRY_EVENT_SERVICE, map[string]interface{}{"serviceName": services}, "", true, false)
}
totalSpans, _ := telemetry.reader.GetTotalSpans(context.Background())
totalLogs, _ := telemetry.reader.GetTotalLogs(context.Background())
spansInLastHeartBeatInterval, _ := telemetry.reader.GetSpansInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
getSamplesInfoInLastHeartBeatInterval, _ := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)
totalSamples, _ := telemetry.reader.GetTotalSamples(context.Background())
tsInfo, _ := telemetry.reader.GetTimeSeriesInfo(context.Background())

getLogsInfoInLastHeartBeatInterval, _ := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(context.Background(), HEART_BEAT_DURATION)

traceTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.TraceTTL})
metricsTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.MetricsTTL})
logsTTL, _ := telemetry.reader.GetTTL(context.Background(), &model.GetTTLParams{Type: constants.LogsTTL})

data := map[string]interface{}{
"totalSpans": totalSpans,
"spansInLastHeartBeatInterval": spansInLastHeartBeatInterval,
"totalSamples": totalSamples,
"getSamplesInfoInLastHeartBeatInterval": getSamplesInfoInLastHeartBeatInterval,
"totalLogs": totalLogs,
"getLogsInfoInLastHeartBeatInterval": getLogsInfoInLastHeartBeatInterval,
"countUsers": telemetry.countUsers,
"metricsTTLStatus": metricsTTL.Status,
"tracesTTLStatus": traceTTL.Status,
"logsTTLStatus": logsTTL.Status,
"patUser": telemetry.patTokenUser,
}
telemetry.patTokenUser = false
for key, value := range tsInfo {
data[key] = value
users, apiErr := telemetry.reader.GetUsers(ctx)
if apiErr == nil {
for _, user := range users {
if user.Email == DEFAULT_CLOUD_EMAIL {
continue
}
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, user.Email, false, false)
}
}

users, apiErr := telemetry.reader.GetUsers(context.Background())
if apiErr == nil {
for _, user := range users {
if user.Email == DEFAULT_CLOUD_EMAIL {
continue
}
telemetry.SendEvent(TELEMETRY_EVENT_HEART_BEAT, data, user.Email, false, false)
}
}
alertsInfo, err := telemetry.alertsInfoCallback(context.Background())
alertsInfo, err := telemetry.alertsInfoCallback(ctx)
if err == nil {
dashboardsInfo, err := telemetry.reader.GetDashboardsInfo(ctx)
if err == nil {
channels, err := telemetry.reader.GetChannels()
if err == nil {
dashboardsInfo, err := telemetry.reader.GetDashboardsInfo(context.Background())
savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(ctx)
if err == nil {
channels, err := telemetry.reader.GetChannels()
if err == nil {
savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(context.Background())
if err == nil {
dashboardsAlertsData := map[string]interface{}{
"totalDashboards": dashboardsInfo.TotalDashboards,
"totalDashboardsWithPanelAndName": dashboardsInfo.TotalDashboardsWithPanelAndName,
"logsBasedPanels": dashboardsInfo.LogsBasedPanels,
"metricBasedPanels": dashboardsInfo.MetricBasedPanels,
"tracesBasedPanels": dashboardsInfo.TracesBasedPanels,
"totalAlerts": alertsInfo.TotalAlerts,
"logsBasedAlerts": alertsInfo.LogsBasedAlerts,
"metricBasedAlerts": alertsInfo.MetricBasedAlerts,
"tracesBasedAlerts": alertsInfo.TracesBasedAlerts,
"totalChannels": len(*channels),
"totalSavedViews": savedViewsInfo.TotalSavedViews,
"logsSavedViews": savedViewsInfo.LogsSavedViews,
"tracesSavedViews": savedViewsInfo.TracesSavedViews,
}
// send event only if there are dashboards or alerts or channels
if (dashboardsInfo.TotalDashboards > 0 || alertsInfo.TotalAlerts > 0 || len(*channels) > 0 || savedViewsInfo.TotalSavedViews > 0) && apiErr == nil {
for _, user := range users {
if user.Email == DEFAULT_CLOUD_EMAIL {
continue
}
telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, dashboardsAlertsData, user.Email, false, false)
}
dashboardsAlertsData := map[string]interface{}{
"totalDashboards": dashboardsInfo.TotalDashboards,
"totalDashboardsWithPanelAndName": dashboardsInfo.TotalDashboardsWithPanelAndName,
"logsBasedPanels": dashboardsInfo.LogsBasedPanels,
"metricBasedPanels": dashboardsInfo.MetricBasedPanels,
"tracesBasedPanels": dashboardsInfo.TracesBasedPanels,
"totalAlerts": alertsInfo.TotalAlerts,
"logsBasedAlerts": alertsInfo.LogsBasedAlerts,
"metricBasedAlerts": alertsInfo.MetricBasedAlerts,
"tracesBasedAlerts": alertsInfo.TracesBasedAlerts,
"totalChannels": len(*channels),
"totalSavedViews": savedViewsInfo.TotalSavedViews,
"logsSavedViews": savedViewsInfo.LogsSavedViews,
"tracesSavedViews": savedViewsInfo.TracesSavedViews,
}
// send event only if there are dashboards or alerts or channels
if (dashboardsInfo.TotalDashboards > 0 || alertsInfo.TotalAlerts > 0 || len(*channels) > 0 || savedViewsInfo.TotalSavedViews > 0) && apiErr == nil {
for _, user := range users {
if user.Email == DEFAULT_CLOUD_EMAIL {
continue
}
telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, dashboardsAlertsData, user.Email, false, false)
}
}
}
}
if err != nil || apiErr != nil {
telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, map[string]interface{}{"error": err.Error()}, "", true, false)
}
}
}
if err != nil || apiErr != nil {
telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, map[string]interface{}{"error": err.Error()}, "", true, false)
}

getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(context.Background())
telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "", true, false)
getDistributedInfoInLastHeartBeatInterval, _ := telemetry.reader.GetDistributedInfoInLastHeartBeatInterval(ctx)
telemetry.SendEvent(TELEMETRY_EVENT_DISTRIBUTED, getDistributedInfoInLastHeartBeatInterval, "", true, false)
}

// Define active user function
activeUserFunc := func() {
if telemetry.activeUser["logs"] != 0 {
getLogsInfoInLastHeartBeatInterval, err := telemetry.reader.GetLogsInfoInLastHeartBeatInterval(ctx, ACTIVE_USER_DURATION)
if err != nil && getLogsInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["logs"] = 0
}
}
}()
if telemetry.activeUser["metrics"] != 0 {
getSamplesInfoInLastHeartBeatInterval, err := telemetry.reader.GetSamplesInfoInLastHeartBeatInterval(ctx, ACTIVE_USER_DURATION)
if err != nil && getSamplesInfoInLastHeartBeatInterval == 0 {
telemetry.activeUser["metrics"] = 0
}
}
if (telemetry.activeUser["traces"] != 0) || (telemetry.activeUser["metrics"] != 0) || (telemetry.activeUser["logs"] != 0) {
telemetry.activeUser["any"] = 1
}
telemetry.SendEvent(TELEMETRY_EVENT_ACTIVE_USER, map[string]interface{}{
"traces": telemetry.activeUser["traces"],
"metrics": telemetry.activeUser["metrics"],
"logs": telemetry.activeUser["logs"],
"any": telemetry.activeUser["any"]},
"", true, false)
telemetry.activeUser = map[string]int8{"traces": 0, "metrics": 0, "logs": 0, "any": 0}
}

// Calculate next run time based on duration and start time
calculateNextRun := func(duration time.Duration, startTimeStr string) time.Time {
now := time.Now().UTC()
startTime, _ := time.Parse("15:04", startTimeStr)
todayStartTime := time.Date(now.Year(), now.Month(), now.Day(), startTime.Hour(), startTime.Minute(), 0, 0, time.UTC)

if now.Before(todayStartTime) {
todayStartTime = todayStartTime.Add(-24 * time.Hour)
}

diff := now.Sub(todayStartTime)
intervalsPassed := int(diff / duration)
nextRun := todayStartTime.Add(time.Duration(intervalsPassed+1) * duration)

return nextRun
}

// Schedule next runs
scheduleNextRuns := func() {
nextHeartbeat := calculateNextRun(HEART_BEAT_DURATION, SCHEDULE_START_TIME)
nextActiveUser := calculateNextRun(ACTIVE_USER_DURATION, SCHEDULE_START_TIME)

s.Every(HEART_BEAT_DURATION).StartAt(nextHeartbeat).Do(heartbeatFunc)
s.Every(ACTIVE_USER_DURATION).StartAt(nextActiveUser).Do(activeUserFunc)
}

// Schedule immediate execution and subsequent runs
scheduleNextRuns()

// Start the scheduler in a separate goroutine
go s.StartBlocking()
}

// Get preferred outbound ip of this machine
Expand Down

0 comments on commit d3b83f5

Please sign in to comment.