diff --git a/backend/go.mod b/backend/go.mod index 7498c8541..1181071c9 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -7,6 +7,7 @@ require ( cloud.google.com/go/secretmanager v1.11.5 firebase.google.com/go/v4 v4.14.1 github.com/ClickHouse/clickhouse-go/v2 v2.17.1 + github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/Gurpartap/storekit-go v0.0.0-20201205024111-36b6cd5c6a21 github.com/Tangui-Bitfly/ethsimtracer v0.0.0-20241031103622-e76546c3d9c1 github.com/alexedwards/scs/redisstore v0.0.0-20240316134038-7e11d57e8885 diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index da0b896da..8bed700e6 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -494,189 +494,189 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da return result, &paging, nil } -func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Context, dashboardId t.VDBId, groupId int64, period enums.TimePeriod, protocolModes t.VDBProtocolModes) (*t.VDBGroupSummaryData, error) { - // TODO: implement data retrieval for the following new field - // Fetch validator list for user dashboard from the dashboard table when querying the past sync committees as the rolling table might miss exited validators - // TotalMissedRewards - // @DATA-ACCESS incorporate protocolModes - // @DATA-ACCESS implement data retrieval for Rocket Pool stats (if present) +func (d *DataAccessService) getLastScheduledBlockAndSyncDate(ctx context.Context, dashboardId t.VDBId, groupId int64) (time.Time, time.Time, error) { + // we need to go to the all time table for last scheduled block/sync committee epoch + clickhouseTotalTable, _, err := d.getTablesForPeriod(enums.AllTime) + if err != nil { + return time.Time{}, time.Time{}, err + } - var err error - ret := &t.VDBGroupSummaryData{} + ds := goqu.Dialect("postgres"). + Select( + goqu.L("MAX(last_scheduled_block_epoch) as last_scheduled_block_epoch"), + goqu.L("MAX(last_scheduled_sync_epoch) as last_scheduled_sync_epoch")). + From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTotalTable))) - if dashboardId.AggregateGroups { - // If we are aggregating groups then ignore the group id and sum up everything - groupId = t.AllGroups + if dashboardId.Validators == nil { + ds = ds. + With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ? AND (group_id = ? OR ?::smallint = -1))", dashboardId.Id, groupId, groupId)). + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("validator_index IN (SELECT validator_index FROM validators)")) + } else { + ds = ds. + Where(goqu.L("validator_index IN ?", dashboardId.Validators)) } - // Get the current and next sync committee validators - latestEpoch := cache.LatestEpoch.Get() - currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, err := d.getCurrentAndUpcomingSyncCommittees(ctx, latestEpoch) + query, args, err := ds.Prepared(true).ToSQL() if err != nil { - return nil, err + return time.Time{}, time.Time{}, err } - // Get the table names based on the period - clickhouseTable, hours, err := d.getTablesForPeriod(period) + var row struct { + LastScheduledBlockEpoch *int64 `db:"last_scheduled_block_epoch"` + LastSyncEpoch *int64 `db:"last_scheduled_sync_epoch"` + } + err = d.clickhouseReader.GetContext(ctx, &row, query, args...) if err != nil { - return nil, err + return time.Time{}, time.Time{}, err } - getMinMaxEpochs := func() (uint64, uint64, error) { - ds := goqu.Dialect("postgres"). - Select( - goqu.L("MIN(epoch_start) as min_epoch_start"), - goqu.L("MAX(epoch_end) as max_epoch_end")). - From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))) + if row.LastScheduledBlockEpoch == nil || row.LastSyncEpoch == nil { + return time.Time{}, time.Time{}, nil + } - if dashboardId.Validators == nil { - ds = ds. - With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ? AND (group_id = ? OR ?::smallint = -1))", dashboardId.Id, groupId, groupId)). - InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("validator_index IN (SELECT validator_index FROM validators)")) - } else { - ds = ds. - Where(goqu.L("validator_index IN ?", dashboardId.Validators)) - } + return utils.EpochToTime(uint64(*row.LastScheduledBlockEpoch)), + utils.EpochToTime(uint64(*row.LastSyncEpoch)), + nil +} - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return 0, 0, err - } +func (d *DataAccessService) getMinMaxEpochs(ctx context.Context, dashboardId t.VDBId, groupId int64, clickhouseTable string) (uint64, uint64, error) { + ds := goqu.Dialect("postgres"). + Select( + goqu.L("MIN(epoch_start) as min_epoch_start"), + goqu.L("MAX(epoch_end) as max_epoch_end")). + From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))) - var epoch_row struct { - MinEpochStart *uint64 `db:"min_epoch_start"` - MaxEpochEnd *uint64 `db:"max_epoch_end"` - } - err = d.clickhouseReader.GetContext(ctx, &epoch_row, query, args...) - if err != nil { - return 0, 0, err - } - return *epoch_row.MinEpochStart, *epoch_row.MaxEpochEnd, nil + if dashboardId.Validators == nil { + ds = ds. + With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ? AND (group_id = ? OR ?::smallint = -1))", dashboardId.Id, groupId, groupId)). + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("validator_index IN (SELECT validator_index FROM validators)")) + } else { + ds = ds. + Where(goqu.L("validator_index IN ?", dashboardId.Validators)) } - getMissedELRewards := func(epochStart, epochEnd uint64) (float64, error) { - // Initialize the result variable - var totalMissedRewardsEl float64 - - // Define the `targets` CTE - targets := goqu.Dialect("postgres"). - From("blocks"). - Select(goqu.I("blocks.slot").As("slot")). - Where( - goqu.I("blocks.status").Neq("1"), - goqu.I("epoch").Gte(epochStart), - goqu.I("epoch").Lte(epochEnd), - ) + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return 0, 0, err + } - if dashboardId.Validators == nil { - targets = targets. - Join( - goqu.T("users_val_dashboards_validators").As("uvdv"), - goqu.On(goqu.I("blocks.proposer").Eq(goqu.I("uvdv.validator_index"))), - ). - Where( - goqu.And( - goqu.I("uvdv.dashboard_id").Eq(dashboardId.Id), - goqu.Or( - goqu.I("uvdv.group_id").Eq(groupId), - goqu.L("?::smallint = -1", groupId), - ), - ), - ) - } else { - targets = targets. - Where( - goqu.I("blocks.proposer").In(dashboardId.Validators), - ) - } - - slots := utils.Config.Chain.ClConfig.SlotsPerEpoch / 2 - - // Define the `res` CTE - res := goqu. - From("targets"). - LeftJoin( - goqu.T("execution_rewards_finalized").As("b"), - goqu.On( - goqu.L(fmt.Sprintf( - `"b"."slot" >= "targets"."slot" - %d AND "b"."slot" < "targets"."slot" + %d`, - slots, slots, - )), - ), - ). - Select( - goqu.I("targets.slot"), - goqu.L("percentile_cont(0.5) WITHIN GROUP (ORDER BY b.value)::numeric(76,0)").As("v"), - ). - GroupBy(goqu.I("targets.slot")) + var epoch_row struct { + MinEpochStart *uint64 `db:"min_epoch_start"` + MaxEpochEnd *uint64 `db:"max_epoch_end"` + } + err = d.clickhouseReader.GetContext(ctx, &epoch_row, query, args...) + if err != nil { + return 0, 0, err + } + return *epoch_row.MinEpochStart, *epoch_row.MaxEpochEnd, nil +} - // Build the final query - query := goqu.From("res"). - With("targets", targets). - With("res", res). - Select(goqu.L("COALESCE(SUM(v), 0)")) +func (d *DataAccessService) getMissedELRewards(ctx context.Context, dashboardId t.VDBId, groupId int64, epochStart, epochEnd uint64) (float64, error) { + // Initialize the result variable + var totalMissedRewardsEl float64 - // Generate SQL and arguments - sql, args, err := query.Prepared(true).ToSQL() - if err != nil { - return 0, fmt.Errorf("failed to generate SQL: %w", err) - } + // Define the `targets` CTE + targets := goqu.Dialect("postgres"). + From("blocks"). + Select(goqu.I("blocks.slot").As("slot")). + Where( + goqu.I("blocks.status").Neq("1"), + goqu.I("epoch").Gte(epochStart), + goqu.I("epoch").Lte(epochEnd), + ) - // Execute the query with the generated SQL and arguments - err = d.readerDb.GetContext(ctx, &totalMissedRewardsEl, sql, args...) - if err != nil { - return 0, fmt.Errorf("failed to execute query: %w", err) - } + if dashboardId.Validators == nil { + targets = targets. + Join( + goqu.T("users_val_dashboards_validators").As("uvdv"), + goqu.On(goqu.I("blocks.proposer").Eq(goqu.I("uvdv.validator_index"))), + ). + Where( + goqu.And( + goqu.I("uvdv.dashboard_id").Eq(dashboardId.Id), + goqu.Or( + goqu.I("uvdv.group_id").Eq(groupId), + goqu.L("?::smallint = -1", groupId), + ), + ), + ) + } else { + targets = targets. + Where( + goqu.I("blocks.proposer").In(dashboardId.Validators), + ) + } - // Return the computed total median rewards - return totalMissedRewardsEl, nil + slots := utils.Config.Chain.ClConfig.SlotsPerEpoch / 2 + + // Define the `res` CTE + res := goqu. + From("targets"). + LeftJoin( + goqu.T("execution_rewards_finalized").As("b"), + goqu.On( + goqu.L(fmt.Sprintf( + `"b"."slot" >= "targets"."slot" - %d AND "b"."slot" < "targets"."slot" + %d`, + slots, slots, + )), + ), + ). + Select( + goqu.I("targets.slot"), + goqu.L("percentile_cont(0.5) WITHIN GROUP (ORDER BY b.value)::numeric(76,0)").As("v"), + ). + GroupBy(goqu.I("targets.slot")) + + // Build the final query + query := goqu.From("res"). + With("targets", targets). + With("res", res). + Select(goqu.L("COALESCE(SUM(v), 0)")) + + // Generate SQL and arguments + sql, args, err := query.Prepared(true).ToSQL() + if err != nil { + return 0, fmt.Errorf("failed to generate SQL: %w", err) } - getLastScheduledBlockAndSyncDate := func() (time.Time, time.Time, error) { - // we need to go to the all time table for last scheduled block/sync committee epoch - clickhouseTotalTable, _, err := d.getTablesForPeriod(enums.AllTime) - if err != nil { - return time.Time{}, time.Time{}, err - } + // Execute the query with the generated SQL and arguments + err = d.readerDb.GetContext(ctx, &totalMissedRewardsEl, sql, args...) + if err != nil { + return 0, fmt.Errorf("failed to execute query: %w", err) + } - ds := goqu.Dialect("postgres"). - Select( - goqu.L("MAX(last_scheduled_block_epoch) as last_scheduled_block_epoch"), - goqu.L("MAX(last_scheduled_sync_epoch) as last_scheduled_sync_epoch")). - From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTotalTable))) + // Return the computed total median rewards + return totalMissedRewardsEl, nil +} - if dashboardId.Validators == nil { - ds = ds. - With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ? AND (group_id = ? OR ?::smallint = -1))", dashboardId.Id, groupId, groupId)). - InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("validator_index IN (SELECT validator_index FROM validators)")) - } else { - ds = ds. - Where(goqu.L("validator_index IN ?", dashboardId.Validators)) - } +func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Context, dashboardId t.VDBId, groupId int64, period enums.TimePeriod, protocolModes t.VDBProtocolModes) (*t.VDBGroupSummaryData, error) { + // TODO: implement data retrieval for the following new field + // Fetch validator list for user dashboard from the dashboard table when querying the past sync committees as the rolling table might miss exited validators + // TotalMissedRewards + // @DATA-ACCESS incorporate protocolModes + // @DATA-ACCESS implement data retrieval for Rocket Pool stats (if present) - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return time.Time{}, time.Time{}, err - } + var err error + ret := &t.VDBGroupSummaryData{} - var row struct { - LastScheduledBlockEpoch *int64 `db:"last_scheduled_block_epoch"` - LastSyncEpoch *int64 `db:"last_scheduled_sync_epoch"` - } - err = d.clickhouseReader.GetContext(ctx, &row, query, args...) - if err != nil { - return time.Time{}, time.Time{}, err - } + if dashboardId.AggregateGroups { + // If we are aggregating groups then ignore the group id and sum up everything + groupId = t.AllGroups + } - if row.LastScheduledBlockEpoch == nil || row.LastSyncEpoch == nil { - return time.Time{}, time.Time{}, nil - } + // Get the current and next sync committee validators + latestEpoch := cache.LatestEpoch.Get() + currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, err := d.getCurrentAndUpcomingSyncCommittees(ctx, latestEpoch) + if err != nil { + return nil, err + } - return utils.EpochToTime(uint64(*row.LastScheduledBlockEpoch)), - utils.EpochToTime(uint64(*row.LastSyncEpoch)), - nil + // Get the table names based on the period + clickhouseTable, hours, err := d.getTablesForPeriod(period) + if err != nil { + return nil, err } ds := goqu.Dialect("postgres"). @@ -769,14 +769,14 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex var lastBlockTs, lastSyncTs time.Time errGroup.Go(func() error { var err error - lastBlockTs, lastSyncTs, err = getLastScheduledBlockAndSyncDate() + lastBlockTs, lastSyncTs, err = d.getLastScheduledBlockAndSyncDate(ctx, dashboardId, groupId) return err }) var minEpochStart, maxEpochEnd uint64 errGroup.Go(func() error { var err error - minEpochStart, maxEpochEnd, err = getMinMaxEpochs() + minEpochStart, maxEpochEnd, err = d.getMinMaxEpochs(ctx, dashboardId, groupId, clickhouseTable) return err }) @@ -875,7 +875,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } } - totalMissedRewardsEl, err := getMissedELRewards(minEpochStart, maxEpochEnd) + totalMissedRewardsEl, err := d.getMissedELRewards(ctx, dashboardId, groupId, minEpochStart, maxEpochEnd) if err != nil { return nil, err } @@ -1000,26 +1000,6 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex return ret, nil } - // log.Infof("retrieving data between %v and %v for aggregation %v", time.Unix(int64(afterTs), 0), time.Unix(int64(beforeTs), 0), aggregation) - dataTable := "" - dateColumn := "" - switch aggregation { - case enums.IntervalEpoch: - dataTable = "validator_dashboard_data_epoch" - dateColumn = "epoch_timestamp" - case enums.IntervalHourly: - dataTable = "validator_dashboard_data_hourly" - dateColumn = "t" - case enums.IntervalDaily: - dataTable = "validator_dashboard_data_daily" - dateColumn = "t" - case enums.IntervalWeekly: - dataTable = "validator_dashboard_data_weekly" - dateColumn = "t" - default: - return nil, fmt.Errorf("unexpected aggregation type: %v", aggregation) - } - var queryResults []*t.VDBValidatorSummaryChartRow containsGroups := false @@ -1038,6 +1018,11 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex return ret, nil } + dataTable, dateColumn, err := d.getTableAndDateColumn(aggregation) + if err != nil { + return nil, err + } + totalLineRequested := requestedGroupsMap[t.AllGroups] || dashboardId.AggregateGroups averageNetworkLineRequested := requestedGroupsMap[t.NetworkAverage] @@ -1198,28 +1183,14 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex } func (d *DataAccessService) GetLatestExportedChartTs(ctx context.Context, aggregation enums.ChartAggregation) (uint64, error) { - var table string - var dateColumn string - switch aggregation { - case enums.IntervalEpoch: - table = "view_validator_dashboard_data_epoch_max_ts" - dateColumn = "t" - case enums.IntervalHourly: - table = "view_validator_dashboard_data_hourly_max_ts" - dateColumn = "t" - case enums.IntervalDaily: - table = "view_validator_dashboard_data_daily_max_ts" - dateColumn = "t" - case enums.IntervalWeekly: - table = "view_validator_dashboard_data_weekly_max_ts" - dateColumn = "t" - default: - return 0, fmt.Errorf("unexpected aggregation type: %v", aggregation) + table, dateColumn, err := d.getViewAndDateColumn(aggregation) + if err != nil { + return 0, err } query := fmt.Sprintf(`SELECT max(%s) FROM %s`, dateColumn, table) var ts time.Time - err := d.clickhouseReader.GetContext(ctx, &ts, query) + err = d.clickhouseReader.GetContext(ctx, &ts, query) if err != nil { return 0, fmt.Errorf("error retrieving latest exported chart timestamp: %w", err) } @@ -1347,27 +1318,19 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte var resultMutex = &sync.RWMutex{} wg := errgroup.Group{} - // Get the table name based on the period - clickhouseTable, _, err := d.getTablesForPeriod(period) - if err != nil { - return nil, err - } - // Get the validator indices var groupIds []uint64 if !dashboardId.AggregateGroups && groupId != t.AllGroups { groupIds = append(groupIds, uint64(groupId)) } - validatorIndices, err := d.getDashboardValidators(ctx, dashboardId, groupIds) if err != nil { return nil, err } // Get the current and next sync committee validators - latestEpoch := cache.LatestEpoch.Get() wg.Go(func() error { - currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, err := d.getCurrentAndUpcomingSyncCommittees(ctx, latestEpoch) + currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, err := d.getCurrentAndUpcomingSyncCommittees(ctx, cache.LatestEpoch.Get()) if err != nil { return err } @@ -1388,48 +1351,14 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte // Get the past sync committee validators wg.Go(func() error { - // Get the cutoff period for past sync committees - ds := goqu.Dialect("postgres"). - Select( - goqu.L("epoch_start")). - From(goqu.L(fmt.Sprintf("%s FINAL", clickhouseTable))). - Order(goqu.L("epoch_start").Asc()). - Limit(1) - - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return fmt.Errorf("error preparing query: %w", err) - } - - var epochStart uint64 - err = d.clickhouseReader.GetContext(ctx, &epochStart, query, args...) + epochStart, err := d.getEpochStart(ctx, period) if err != nil { - return fmt.Errorf("error retrieving cutoff epoch for past sync committees: %w", err) - } - pastSyncPeriodCutoff := utils.SyncPeriodOfEpoch(epochStart) - - // Get the past sync committee validators - currentSyncPeriod := utils.SyncPeriodOfEpoch(latestEpoch) - ds = goqu.Dialect("postgres"). - Select( - goqu.L("sc.validatorindex")). - From(goqu.L("sync_committees sc")). - Where(goqu.L("period >= ? AND period < ? AND validatorindex = ANY(?)", pastSyncPeriodCutoff, currentSyncPeriod, pq.Array(validatorIndices))) - - query, args, err = ds.Prepared(true).ToSQL() - if err != nil { - return fmt.Errorf("error preparing query: %w", err) + return err } - var validatorIndices []uint64 - err = d.alloyReader.SelectContext(ctx, &validatorIndices, query, args...) + validatorCountMap, err := d.getPastSyncCommittees(ctx, validatorIndices, epochStart, cache.LatestEpoch.Get()) if err != nil { - return fmt.Errorf("error retrieving data for past sync committees: %w", err) - } - - validatorCountMap := make(map[uint64]uint64) - for _, validatorIndex := range validatorIndices { - validatorCountMap[validatorIndex]++ + return err } resultMutex.Lock() @@ -1829,29 +1758,131 @@ func (d *DataAccessService) getCurrentAndUpcomingSyncCommittees(ctx context.Cont return currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, nil } +func (d *DataAccessService) getEpochStart(ctx context.Context, period enums.TimePeriod) (uint64, error) { + clickhouseTable, _, err := d.getTablesForPeriod(period) + if err != nil { + return 0, err + } + + ds := goqu.Dialect("postgres"). + Select(goqu.L("epoch_start")). + From(goqu.L(fmt.Sprintf("%s FINAL", clickhouseTable))). + Order(goqu.L("epoch_start").Asc()). + Limit(1) + + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return 0, fmt.Errorf("error preparing query: %w", err) + } + + var epochStart uint64 + err = d.clickhouseReader.GetContext(ctx, &epochStart, query, args...) + if err != nil { + return 0, fmt.Errorf("error retrieving cutoff epoch for past sync committees: %w", err) + } + + return epochStart, nil +} + +func (d *DataAccessService) getPastSyncCommittees(ctx context.Context, indicies []uint64, epochStart uint64, latestEpoch uint64) (map[uint64]uint64, error) { + pastSyncPeriodCutoff := utils.SyncPeriodOfEpoch(epochStart) + currentSyncPeriod := utils.SyncPeriodOfEpoch(latestEpoch) + + // Get the past sync committee validators + ds := goqu.Dialect("postgres"). + Select( + goqu.L("sc.validatorindex")). + From(goqu.L("sync_committees sc")). + Where(goqu.L("period >= ? AND period < ? AND validatorindex = ANY(?)", pastSyncPeriodCutoff, currentSyncPeriod, pq.Array(indicies))) + + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return nil, fmt.Errorf("error preparing query: %w", err) + } + + var validatorIndices []uint64 + err = d.alloyReader.SelectContext(ctx, &validatorIndices, query, args...) + if err != nil { + return nil, fmt.Errorf("error retrieving data for past sync committees: %w", err) + } + + validatorCountMap := make(map[uint64]uint64) + for _, validatorIndex := range validatorIndices { + validatorCountMap[validatorIndex]++ + } + + return validatorCountMap, nil +} + func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, int, error) { - clickhouseTable := "" + table := "" hours := 0 switch period { case enums.TimePeriods.Last1h: - clickhouseTable = "validator_dashboard_data_rolling_1h" + table = "validator_dashboard_data_rolling_1h" hours = 1 case enums.TimePeriods.Last24h: - clickhouseTable = "validator_dashboard_data_rolling_24h" + table = "validator_dashboard_data_rolling_24h" hours = 24 case enums.TimePeriods.Last7d: - clickhouseTable = "validator_dashboard_data_rolling_7d" + table = "validator_dashboard_data_rolling_7d" hours = 7 * 24 case enums.TimePeriods.Last30d: - clickhouseTable = "validator_dashboard_data_rolling_30d" + table = "validator_dashboard_data_rolling_30d" hours = 30 * 24 case enums.TimePeriods.AllTime: - clickhouseTable = "validator_dashboard_data_rolling_total" + table = "validator_dashboard_data_rolling_total" hours = -1 default: return "", 0, fmt.Errorf("not-implemented time period: %v", period) } - return clickhouseTable, hours, nil + return table, hours, nil +} + +func (d *DataAccessService) getTableAndDateColumn(aggregation enums.ChartAggregation) (string, string, error) { + var table, dateColumn string + + switch aggregation { + case enums.IntervalEpoch: + table = "validator_dashboard_data_epoch" + dateColumn = "epoch_timestamp" + case enums.IntervalHourly: + table = "validator_dashboard_data_hourly" + dateColumn = "t" + case enums.IntervalDaily: + table = "validator_dashboard_data_daily" + dateColumn = "t" + case enums.IntervalWeekly: + table = "validator_dashboard_data_weekly" + dateColumn = "t" + default: + return "", "", fmt.Errorf("unexpected aggregation type: %v", aggregation) + } + + return table, dateColumn, nil +} + +func (d *DataAccessService) getViewAndDateColumn(aggregation enums.ChartAggregation) (string, string, error) { + var view, dateColumn string + + switch aggregation { + case enums.IntervalEpoch: + view = "view_validator_dashboard_data_epoch_max_ts" + dateColumn = "t" + case enums.IntervalHourly: + view = "view_validator_dashboard_data_hourly_max_ts" + dateColumn = "t" + case enums.IntervalDaily: + view = "view_validator_dashboard_data_daily_max_ts" + dateColumn = "t" + case enums.IntervalWeekly: + view = "view_validator_dashboard_data_weekly_max_ts" + dateColumn = "t" + default: + return "", "", fmt.Errorf("unexpected aggregation type: %v", aggregation) + } + + return view, dateColumn, nil } diff --git a/backend/pkg/api/data_access/vdb_summary_test.go b/backend/pkg/api/data_access/vdb_summary_test.go new file mode 100644 index 000000000..c0830e2b8 --- /dev/null +++ b/backend/pkg/api/data_access/vdb_summary_test.go @@ -0,0 +1,473 @@ +package dataaccess + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gobitfly/beaconchain/pkg/api/enums" + "github.com/gobitfly/beaconchain/pkg/commons/types" + "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestGetCurrentAndUpcomingSyncCommittees(t *testing.T) { + mockDB, mock, err := sqlmock.New() + assert.NoError(t, err) + defer mockDB.Close() + + // Wrap the mock DB with sqlx + sqlxDB := sqlx.NewDb(mockDB, "sqlmock") + + cfg := types.Config{ + Chain: types.Chain{ + ClConfig: types.ClChainConfig{ + EpochsPerSyncCommitteePeriod: 32, + AltairForkEpoch: 0, + }, + }, + } + utils.Config = &cfg + + // Create the DataAccessService with the mock DB + dataService := &DataAccessService{ + readerDb: sqlxDB, + } + + latestEpoch := uint64(128) + currentSyncPeriod := utils.SyncPeriodOfEpoch(latestEpoch) + + tests := []struct { + name string + mockRows *sqlmock.Rows + mockError error + expectedCurrent map[uint64]bool + expectedUpcoming map[uint64]bool + expectedError string + }{ + { + name: "Valid data", + mockRows: sqlmock.NewRows([]string{"validatorindex", "period"}). + AddRow(1, currentSyncPeriod). + AddRow(2, currentSyncPeriod). + AddRow(3, currentSyncPeriod+1). + AddRow(4, currentSyncPeriod+1), + mockError: nil, + expectedCurrent: map[uint64]bool{ + 1: true, + 2: true, + }, + expectedUpcoming: map[uint64]bool{ + 3: true, + 4: true, + }, + expectedError: "", + }, + { + name: "Database error", + mockRows: nil, + mockError: errors.New("db query failed"), + expectedCurrent: nil, + expectedUpcoming: nil, + expectedError: "error retrieving sync committee current and next period data: db query failed", + }, + { + name: "Empty result", + mockRows: sqlmock.NewRows([]string{"validatorindex", "period"}), + mockError: nil, + expectedCurrent: map[uint64]bool{}, + expectedUpcoming: map[uint64]bool{}, + expectedError: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := `SELECT validatorindex, period FROM "sync_committees" WHERE period IN \(\$1, \$2\)` + if tt.mockError != nil { + mock.ExpectQuery(query).WillReturnError(tt.mockError) + } else { + mock.ExpectQuery(query).WillReturnRows(tt.mockRows) + } + + ctx := context.Background() + current, upcoming, err := dataService.getCurrentAndUpcomingSyncCommittees(ctx, latestEpoch) + + if tt.expectedError == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tt.expectedError) + } + + assert.Equal(t, tt.expectedCurrent, current) + assert.Equal(t, tt.expectedUpcoming, upcoming) + + assert.NoError(t, mock.ExpectationsWereMet()) + }) + } +} + +func TestGetLatestExportedChartTs(t *testing.T) { + mockDB, mock, err := sqlmock.New() + assert.NoError(t, err) + defer mockDB.Close() + + sqlxDB := sqlx.NewDb(mockDB, "sqlmock") + dataService := &DataAccessService{ + clickhouseReader: sqlxDB, + } + + // Define the test cases + tests := []struct { + name string + aggregation enums.ChartAggregation + mockRows *sqlmock.Rows + mockError error + expectedTs uint64 + expectedError string + }{ + { + name: "Valid data", + aggregation: enums.IntervalDaily, + mockRows: sqlmock.NewRows([]string{"max"}). + AddRow(time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)), + mockError: nil, + expectedTs: uint64(time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC).Unix()), + expectedError: "", + }, + { + name: "Invalid data type", + aggregation: enums.IntervalDaily, + mockRows: sqlmock.NewRows([]string{"max"}). + AddRow("invalid_timestamp"), + mockError: nil, + expectedTs: 0, + expectedError: "error retrieving latest exported chart timestamp: sql: Scan error on column index 0, name \"max\": unsupported Scan, storing driver.Value type string into type *time.Time", + }, + { + name: "Empty result", + aggregation: enums.IntervalDaily, + mockRows: sqlmock.NewRows([]string{"max"}), + mockError: nil, + expectedTs: 0, + expectedError: "error retrieving latest exported chart timestamp: sql: no rows in result set", + }, + { + name: "Query error", + aggregation: enums.IntervalDaily, + mockRows: nil, + mockError: fmt.Errorf("db query failed"), + expectedTs: 0, + expectedError: "error retrieving latest exported chart timestamp: db query failed", + }, + } + + // Run the test cases + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + view, dateColumn, err := dataService.getViewAndDateColumn(tt.aggregation) + assert.NoError(t, err) + + // Mock the query + query := fmt.Sprintf("SELECT max\\(%s\\) FROM %s", dateColumn, view) + if tt.mockError != nil { + mock.ExpectQuery(query).WillReturnError(tt.mockError) + } else { + mock.ExpectQuery(query).WillReturnRows(tt.mockRows) + } + + // Run the function under test + ctx := context.Background() + ts, err := dataService.GetLatestExportedChartTs(ctx, enums.IntervalDaily) + + // Validate the result + if tt.expectedError == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tt.expectedError) + } + assert.Equal(t, tt.expectedTs, ts) + + // Ensure all expectations are met + assert.NoError(t, mock.ExpectationsWereMet()) + }) + } +} + +func TestGetEpochStart(t *testing.T) { + mockDB, mock, err := sqlmock.New() + assert.NoError(t, err) + defer mockDB.Close() + + sqlxDB := sqlx.NewDb(mockDB, "sqlmock") + + dataService := &DataAccessService{ + clickhouseReader: sqlxDB, + } + + tests := []struct { + name string + period enums.TimePeriod + mockRows *sqlmock.Rows + mockError error + expectedVal uint64 + expectedError string + }{ + { + name: "Valid data", + period: enums.Last24h, + mockRows: sqlmock.NewRows([]string{"epoch_start"}). + AddRow(12345), + mockError: nil, + expectedVal: 12345, + expectedError: "", + }, + { + name: "Invalid data type", + period: enums.Last24h, + mockRows: sqlmock.NewRows([]string{"epoch_start"}). + AddRow("invalid_value"), + mockError: nil, + expectedVal: 0, + expectedError: "error retrieving cutoff epoch for past sync committees: sql: Scan error on column index 0, name \"epoch_start\": converting driver.Value type string (\"invalid_value\") to a uint64: invalid syntax", + }, + { + name: "Empty result", + period: enums.Last24h, + mockRows: sqlmock.NewRows([]string{"epoch_start"}), + mockError: nil, + expectedVal: 0, + expectedError: "error retrieving cutoff epoch for past sync committees: sql: no rows in result set", + }, + { + name: "Query error", + period: enums.Last24h, + mockRows: nil, + mockError: fmt.Errorf("db query failed"), + expectedVal: 0, + expectedError: "error retrieving cutoff epoch for past sync committees: db query failed", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + table, _, err := dataService.getTablesForPeriod(tt.period) + assert.NoError(t, err) + + query := fmt.Sprintf("SELECT epoch_start FROM %s FINAL ORDER BY epoch_start ASC LIMIT \\$1", table) + if tt.mockError != nil { + mock.ExpectQuery(query).WillReturnError(tt.mockError) + } else { + mock.ExpectQuery(query).WithArgs(1).WillReturnRows(tt.mockRows) + } + + ctx := context.Background() + val, err := dataService.getEpochStart(ctx, tt.period) + + if tt.expectedError == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tt.expectedError) + } + assert.Equal(t, tt.expectedVal, val) + assert.NoError(t, mock.ExpectationsWereMet()) + }) + } +} + +func TestGetTablesForPeriod(t *testing.T) { + d := &DataAccessService{} + + tests := []struct { + name string + input enums.TimePeriod + expectedTable string + expectedHours int + expectedError error + }{ + { + name: "Last 1 hour", + input: enums.TimePeriods.Last1h, + expectedTable: "validator_dashboard_data_rolling_1h", + expectedHours: 1, + expectedError: nil, + }, + { + name: "Last 24 hours", + input: enums.TimePeriods.Last24h, + expectedTable: "validator_dashboard_data_rolling_24h", + expectedHours: 24, + expectedError: nil, + }, + { + name: "Last 7 days", + input: enums.TimePeriods.Last7d, + expectedTable: "validator_dashboard_data_rolling_7d", + expectedHours: 7 * 24, + expectedError: nil, + }, + { + name: "Last 30 days", + input: enums.TimePeriods.Last30d, + expectedTable: "validator_dashboard_data_rolling_30d", + expectedHours: 30 * 24, + expectedError: nil, + }, + { + name: "All time", + input: enums.TimePeriods.AllTime, + expectedTable: "validator_dashboard_data_rolling_total", + expectedHours: -1, + expectedError: nil, + }, + { + name: "Invalid time period", + input: enums.TimePeriod(999), + expectedTable: "", + expectedHours: 0, + expectedError: fmt.Errorf("not-implemented time period: %v", enums.TimePeriod(999)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + table, hours, err := d.getTablesForPeriod(tt.input) + + assert.Equal(t, tt.expectedTable, table) + assert.Equal(t, tt.expectedHours, hours) + if tt.expectedError != nil { + assert.EqualError(t, err, tt.expectedError.Error()) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestGetTableAndDateColumn(t *testing.T) { + tests := []struct { + name string + aggregation enums.ChartAggregation + expectedTable string + expectedDateColumn string + expectError error + }{ + { + name: "Epoch aggregation", + aggregation: enums.IntervalEpoch, + expectedTable: "validator_dashboard_data_epoch", + expectedDateColumn: "epoch_timestamp", + expectError: nil, + }, + { + name: "Hourly aggregation", + aggregation: enums.IntervalHourly, + expectedTable: "validator_dashboard_data_hourly", + expectedDateColumn: "t", + expectError: nil, + }, + { + name: "Daily aggregation", + aggregation: enums.IntervalDaily, + expectedTable: "validator_dashboard_data_daily", + expectedDateColumn: "t", + expectError: nil, + }, + { + name: "Weekly aggregation", + aggregation: enums.IntervalWeekly, + expectedTable: "validator_dashboard_data_weekly", + expectedDateColumn: "t", + expectError: nil, + }, + { + name: "Invalid aggregation", + aggregation: enums.ChartAggregation(999), // An invalid aggregation value + expectedTable: "", + expectedDateColumn: "", + expectError: fmt.Errorf("unexpected aggregation type: %v", enums.ChartAggregation(999)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := &DataAccessService{} + table, dateColumn, err := d.getTableAndDateColumn(tt.aggregation) + + if tt.expectError != nil { + assert.Error(t, err) + assert.Contains(t, err.Error(), "unexpected aggregation type") + } else { + assert.NoError(t, err) + } + + assert.Equal(t, tt.expectedTable, table) + assert.Equal(t, tt.expectedDateColumn, dateColumn) + }) + } +} + +func TestGetViewAndDateColumn(t *testing.T) { + tests := []struct { + name string + aggregation enums.ChartAggregation + expectedView string + expectedColumn string + expectedError error + }{ + { + name: "Epoch aggregation", + aggregation: enums.IntervalEpoch, + expectedView: "view_validator_dashboard_data_epoch_max_ts", + expectedColumn: "t", + expectedError: nil, + }, + { + name: "Hourly aggregation", + aggregation: enums.IntervalHourly, + expectedView: "view_validator_dashboard_data_hourly_max_ts", + expectedColumn: "t", + expectedError: nil, + }, + { + name: "Daily aggregation", + aggregation: enums.IntervalDaily, + expectedView: "view_validator_dashboard_data_daily_max_ts", + expectedColumn: "t", + expectedError: nil, + }, + { + name: "Weekly aggregation", + aggregation: enums.IntervalWeekly, + expectedView: "view_validator_dashboard_data_weekly_max_ts", + expectedColumn: "t", + expectedError: nil, + }, + { + name: "Invalid aggregation", + aggregation: enums.ChartAggregation(999), + expectedView: "", + expectedColumn: "", + expectedError: fmt.Errorf("unexpected aggregation type: %v", enums.ChartAggregation(999)), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := &DataAccessService{} + view, column, err := d.getViewAndDateColumn(tt.aggregation) + + assert.Equal(t, tt.expectedView, view) + assert.Equal(t, tt.expectedColumn, column) + if tt.expectedError != nil { + assert.EqualError(t, err, tt.expectedError.Error()) + } else { + assert.NoError(t, err) + } + }) + } +}