Skip to content

Commit 7c89623

Browse files
authored
feat(shard-distributor): add shard handover stats (#7495)
<!-- Describe what has changed in this PR --> **What changed?** * ShardHandoverStats have been added to AssignedState * Add comments to Store's methods and State structures <!-- Tell your future self why have you made these changes --> **Why?** * ShardHandoverStats are going to be used to track Shard Assignment Distribution Latency and Shard Handover Latency which are going to be used as health indicators of a cluster <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** * Unit tests <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes**
1 parent eb79763 commit 7c89623

File tree

8 files changed

+731
-43
lines changed

8 files changed

+731
-43
lines changed

common/types/sharddistributor.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222

2323
package types
2424

25-
import "fmt"
25+
import (
26+
"fmt"
27+
)
2628

27-
//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go
29+
//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode,HandoverType -json -output sharddistributor_statuses_enumer_generated.go
2830

2931
type GetShardOwnerRequest struct {
3032
ShardKey string
@@ -198,6 +200,7 @@ func (v *ExecutorHeartbeatResponse) GetMigrationPhase() (o MigrationMode) {
198200
}
199201

200202
type ShardAssignment struct {
203+
// Status indicates the current assignment status of the shard.
201204
Status AssignmentStatus `json:"status"`
202205
}
203206

@@ -217,6 +220,30 @@ const (
217220
AssignmentStatusREADY AssignmentStatus = 1
218221
)
219222

223+
// HandoverType is used to indicate the type of handover that occurred during shard reassignment.
224+
// Type is persisted to the DB with a string value mapping.
225+
// Beware - if we want to change the name - it should be backward compatible and should be done in two steps.
226+
type HandoverType int32
227+
228+
const (
229+
HandoverTypeINVALID HandoverType = 0
230+
231+
// HandoverTypeGRACEFUL
232+
// Graceful handover indicates that the shard was transferred in a way that allowed
233+
// the previous owner had a chance to finish processing before the shard was reassigned.
234+
HandoverTypeGRACEFUL HandoverType = 1
235+
236+
// HandoverTypeEMERGENCY
237+
// Emergency handover indicates that the shard was transferred abruptly without
238+
// allowing the previous owner to finish processing.
239+
HandoverTypeEMERGENCY HandoverType = 2
240+
)
241+
242+
// Ptr returns a pointer to the HandoverType value.
243+
func (t HandoverType) Ptr() *HandoverType {
244+
return &t
245+
}
246+
220247
type MigrationMode int32
221248

222249
const (

common/types/sharddistributor_statuses_enumer_generated.go

Lines changed: 92 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/sharddistributor/leader/process/processor.go

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package process
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"maps"
78
"math/rand"
@@ -337,7 +338,6 @@ func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) {
337338
}
338339

339340
func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoopScope metrics.Scope) (err error) {
340-
341341
namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name)
342342
if err != nil {
343343
return fmt.Errorf("get state: %w", err)
@@ -378,8 +378,8 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo
378378
}
379379

380380
p.addAssignmentsToNamespaceState(namespaceState, currentAssignments)
381-
382381
p.logger.Info("Applying new shard distribution.")
382+
383383
// Use the leader guard for the assign and delete operation.
384384
err = p.shardStore.AssignShards(ctx, p.namespaceCfg.Name, store.AssignShardsRequest{
385385
NewState: namespaceState,
@@ -473,27 +473,105 @@ func (*namespaceProcessor) updateAssignments(shardsToReassign []string, activeEx
473473
}
474474

475475
func (p *namespaceProcessor) addAssignmentsToNamespaceState(namespaceState *store.NamespaceState, currentAssignments map[string][]string) {
476-
newState := make(map[string]store.AssignedState)
476+
newState := make(map[string]store.AssignedState, len(currentAssignments))
477+
477478
for executorID, shards := range currentAssignments {
478479
assignedShardsMap := make(map[string]*types.ShardAssignment)
480+
479481
for _, shardID := range shards {
480482
assignedShardsMap[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY}
481483
}
484+
482485
modRevision := int64(0) // Should be 0 if we have not seen it yet
483486
if namespaceAssignments, ok := namespaceState.ShardAssignments[executorID]; ok {
484487
modRevision = namespaceAssignments.ModRevision
485488
}
486489

487490
newState[executorID] = store.AssignedState{
488-
AssignedShards: assignedShardsMap,
489-
LastUpdated: p.timeSource.Now().UTC(),
490-
ModRevision: modRevision,
491+
AssignedShards: assignedShardsMap,
492+
LastUpdated: p.timeSource.Now().UTC(),
493+
ModRevision: modRevision,
494+
ShardHandoverStats: p.addHandoverStatsToExecutorAssignedState(namespaceState, executorID, shards),
491495
}
492496
}
493497

494498
namespaceState.ShardAssignments = newState
495499
}
496500

501+
func (p *namespaceProcessor) addHandoverStatsToExecutorAssignedState(
502+
namespaceState *store.NamespaceState,
503+
executorID string, shardIDs []string,
504+
) map[string]store.ShardHandoverStats {
505+
var newStats = make(map[string]store.ShardHandoverStats)
506+
507+
// Prepare handover stats for each shard
508+
for _, shardID := range shardIDs {
509+
handoverStats := p.newHandoverStats(namespaceState, shardID, executorID)
510+
511+
// If there is no handover (first assignment), we skip adding handover stats
512+
if handoverStats != nil {
513+
newStats[shardID] = *handoverStats
514+
}
515+
}
516+
517+
return newStats
518+
}
519+
520+
// newHandoverStats creates shard handover statistics if a handover occurred.
521+
func (p *namespaceProcessor) newHandoverStats(
522+
namespaceState *store.NamespaceState,
523+
shardID string,
524+
newExecutorID string,
525+
) *store.ShardHandoverStats {
526+
logger := p.logger.WithTags(
527+
tag.ShardNamespace(p.namespaceCfg.Name),
528+
tag.ShardKey(shardID),
529+
tag.ShardExecutor(newExecutorID),
530+
)
531+
532+
// Fetch previous shard owners from cache
533+
prevExecutor, err := p.shardStore.GetShardOwner(context.Background(), p.namespaceCfg.Name, shardID)
534+
if err != nil && !errors.Is(err, store.ErrShardNotFound) {
535+
logger.Warn("failed to get shard owner for shard statistic", tag.Error(err))
536+
return nil
537+
}
538+
// previous executor is not found in cache
539+
// meaning this is the first assignment of the shard
540+
// so we skip updating handover stats
541+
if prevExecutor == nil {
542+
return nil
543+
}
544+
545+
// No change in assignment
546+
// meaning no handover occurred
547+
// skip updating handover stats
548+
if prevExecutor.ExecutorID == newExecutorID {
549+
return nil
550+
}
551+
552+
// previous executor heartbeat is not found in namespace state
553+
// meaning the executor has already been cleaned up
554+
// skip updating handover stats
555+
prevExecutorHeartbeat, ok := namespaceState.Executors[prevExecutor.ExecutorID]
556+
if !ok {
557+
logger.Info("previous executor heartbeat not found, skipping handover stats")
558+
return nil
559+
}
560+
561+
handoverType := types.HandoverTypeEMERGENCY
562+
563+
// Consider it a graceful handover if the previous executor was in DRAINING or DRAINED status
564+
// otherwise, it's an emergency handover
565+
if prevExecutorHeartbeat.Status == types.ExecutorStatusDRAINING || prevExecutorHeartbeat.Status == types.ExecutorStatusDRAINED {
566+
handoverType = types.HandoverTypeGRACEFUL
567+
}
568+
569+
return &store.ShardHandoverStats{
570+
HandoverType: handoverType,
571+
PreviousExecutorLastHeartbeatTime: prevExecutorHeartbeat.LastHeartbeat,
572+
}
573+
}
574+
497575
func (*namespaceProcessor) getActiveExecutors(namespaceState *store.NamespaceState, staleExecutors map[string]int64) []string {
498576
var activeExecutors []string
499577
for id, state := range namespaceState.Executors {

0 commit comments

Comments
 (0)