-
Notifications
You must be signed in to change notification settings - Fork 985
Add health signal recording to interceptor #7773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces health signal recording for history service RPCs to mirror persistence health aggregator functionality. Key changes include adding a new health signal aggregator implementation, integrating it into the service FX providers, and updating dynamic configuration values for better RPC health monitoring.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
service/history/health_signal_aggregator.go | Added a new aggregator for recording and emitting RPC health metrics. |
service/history/fx.go | Integrated health signal aggregator into dependency injection. |
service/history/configs/config.go | Updated configuration to include RPC latency failure thresholds. |
common/dynamicconfig/constants.go | Added new dynamic config constant for history RPC latency failure. |
Comments suppressed due to low confidence (1)
service/history/fx.go:308
- The dynamic config parameter 'ShardPerNsRPSWarnPercent' is used as the 'perMethodErrorWarnLimit' value, which may be inconsistent with its intended purpose. Please verify that this mapping reflects the desired error ratio warning threshold.
perMethodErrorWarnLimit: dynamicconfig.ShardPerNsRPSWarnPercent.Get(dynamicCollection),
metricsHandler metrics.Handler | ||
emitMetricsTimer *time.Ticker | ||
perMethodRPSWarnLimit dynamicconfig.IntPropertyFn | ||
perMethodErrorWarnLimit dynamicconfig.FloatPropertyFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field 'perMethodErrorWarnLimit' is defined but never used within the aggregator. Consider either removing it or incorporating it into the metrics/warning logic if it is intended to monitor error ratios.
perMethodErrorWarnLimit dynamicconfig.FloatPropertyFn |
Copilot uses AI. Check for mistakes.
b9e0f97
to
cacda23
Compare
service/history/handler.go
Outdated
@@ -123,6 +125,7 @@ type ( | |||
TaskQueueManager persistence.HistoryTaskQueueManager | |||
TaskCategoryRegistry tasks.TaskCategoryRegistry | |||
DLQMetricsEmitter *persistence.DLQMetricsEmitter | |||
HealthSignalAggregator HealthSignalAggregator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we have PersistenceHealthSignal and HealthSignalAggregator in this struct.
How about we name them to make it clear what they aggregate? Something like PersistenceHealthSignal and RpcHealthSignal?
Also, we can use the same persistence.HealthSignalAggregator type for both of these, if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can absolutely name it something more descriptive. I would like to push back against reusing the same type, on the basis that this isn't persistence, though. We could talk about moving this to a shared package in the future, or I could do this on this PR if you have a good idea of where it could live.
s.emitMetricsTimer.Stop() | ||
} | ||
|
||
func (s *HealthSignalAggregatorImpl) Record(method string, latency time.Duration, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that we are tracking latency for each method. Do we need that?
For persistence, we are emitting per-shard latency. That is because it is not possible to emit shard-id tag in metrics. That will increase the cardinality of that metric beyond what we can handle. So we log that every few seconds.
We are already emitting per method metrics. So we might not have to run emitMetricsLoop() for history rpc health tracking.
It would be better if we reuse persistence.HealthSignalAggregator here. We can pass a boolean value to it's constructor to disable emitMetricsLoop(). If we don't care about shard-id and namespace, we can pass empty values to it while calling Record()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we're tracking latency for each method, but the actual aggregation and then subsequent Latency number produced is looking at all of them, regardless of method.
I've converted the emitMetricsLoop() to be a noop -- you're right on us not needing to emit the metrics because we're already doing this elsewhere where we can consume them.
I would prefer not to reuse the persistence HealthSignalAggregator, because this is a bit confusing with naming/package placement. Will keep separate for now until we find a good shared spot for this.
service/history/handler.go
Outdated
@@ -264,6 +286,11 @@ func (h *Handler) IsActivityTaskValid(ctx context.Context, request *historyservi | |||
return nil, h.convertError(err) | |||
} | |||
|
|||
startTime := h.timeSource.Now() | |||
defer func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can combine this and metrics.CapturePanic(h.logger, h.metricsHandler, &retError)
above in a single deferred function. It is better to record the start time at the beginning of the function. We can treat the total time spent in this function as the request latency.
service/history/handler.go
Outdated
startTime := h.timeSource.Now() | ||
defer func() { | ||
h.healthSignalAggregator.Record("IsWorkflowTaskValid", time.Since(startTime), retError) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have it as an interceptor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would have to pull from prometheus and we risk it being down. Let me know if I'm misunderstanding the suggestion
for method, count := range requestCounts { | ||
methodRPS := int64(float64(count) / emitMetricsInterval.Seconds()) | ||
s.metricsHandler.Histogram(HistoryServiceRPS.Name(), HistoryServiceRPS.Unit()).Record(methodRPS) | ||
if methodRPS > int64(s.perMethodRPSWarnLimit()) { | ||
s.logger.Warn("Per method RPS warn limit exceeded", | ||
tag.NewStringTag("method", method), | ||
tag.NewInt64("rps", methodRPS)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the reason to emit those metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with @prathyushpv and there's no need as we're already doing this. Moved to noop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds RPC health signal recording for the history service by introducing a gRPC interceptor and integrating its signals into the deep health check.
- Introduce
HealthCheckInterceptor
andHealthSignalAggregator
incommon/rpc/interceptor
- Wire up an RPC health signal provider and interceptor in the FX module
- Add dynamic config flags and DeepHealthCheck logic for RPC latency/error thresholds
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
service/history/handler.go | Injects historyHealthSignal and calls checkHistoryHealthSignals in DeepHealthCheck |
service/history/fx.go | Adds HealthSignalAggregatorProvider and HealthCheckInterceptorProvider to FX |
service/history/configs/config.go | Introduces HealthRPCLatencyFailure and HealthRPCErrorRatio in config |
common/rpc/interceptor/health_check.go | Implements HealthCheckInterceptor and HealthSignalAggregator |
common/rpc/interceptor/health.go | Adds a comment on HealthInterceptor |
common/persistence/health_signal_aggregator.go | Documents emitMetricsLoop |
common/dynamicconfig/constants.go | Defines new dynamicconfig keys for history RPC health signals |
Comments suppressed due to low confidence (2)
common/dynamicconfig/constants.go:206
- [nitpick] The dynamic config key prefix is inconsistent: this uses "system.history...", while the RPC thresholds use "history...". Consider unifying all history health-signal keys under the same namespace.
HistoryHealthSignalMetricsEnabled = NewGlobalBoolSetting(
service/history/handler.go:229
- [nitpick] The comment above this function refers to
checkHistoryHealthSignal
(singular) but the function is namedcheckHistoryHealthSignals
(plural). Rename one for consistency.
func (h *Handler) checkHistoryHealthSignals() *historyservice.DeepHealthCheckResponse {
service/history/handler.go
Outdated
func (h *Handler) checkHistoryHealthSignals() *historyservice.DeepHealthCheckResponse { | ||
// Check that the RPC latency doesn't exceed the threshold. | ||
if _, ok := h.historyHealthSignal.(*interceptor.NoopSignalAggregator); ok { | ||
h.logger.Warn("health signal aggregator is using noop implementation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This warn log runs on every deep health check when using the noop aggregator, which could spam logs. Consider logging it only once or at a debug level.
h.logger.Warn("health signal aggregator is using noop implementation") | |
if !h.noopAggregatorLogged { | |
h.logger.Debug("health signal aggregator is using noop implementation") | |
h.noopAggregatorLogged = true | |
} |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I don't think we need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces health signal recording into the history service interceptor, enabling the recording of RPC latency and error ratios to determine service health. Key changes include the addition of a new health check path in the handler, the wiring-up of health signal and check interceptors, and the dynamic configuration of corresponding thresholds.
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
service/history/handler.go | Added health signal recording and a new deep health check method |
service/history/fx.go | Provided new dependency providers for the health aggregator and interceptor |
service/history/configs/config.go | Updated configuration to support new RPC health thresholds |
common/rpc/interceptor/health_check.go | Introduced the health check interceptor to record RPC metrics |
common/rpc/interceptor/health.go | Added comment documentation for the health interceptor |
common/persistence/health_signal_aggregator.go | Extended aggregator with metric emission functionality |
common/dynamicconfig/constants.go | Added dynamic config settings for RPC latency and error thresholds |
service/history/handler.go
Outdated
@@ -217,6 +225,25 @@ func (h *Handler) DeepHealthCheck( | |||
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_SERVING}, nil | |||
} | |||
|
|||
// checkHistoryHealthSignal checks the history health signal that is captured by the interceptor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider updating the comment to match the function name (checkHistoryHealthSignals) to avoid any potential confusion.
Copilot uses AI. Check for mistakes.
@@ -10,6 +10,7 @@ import ( | |||
"google.golang.org/grpc" | |||
) | |||
|
|||
// HealthInterceptor rejects frontend requests if the host has not been marked as healthy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the common pkg. It is not specific for frontend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you read the code within the interceptor, it returns a frontend error (notHealthyErr) and is only used in the frontend. https://github.com/temporalio/temporal/blob/lanie/history-rpc-health-aggregator/common/rpc/interceptor/health.go#L39
return resp, err | ||
} | ||
|
||
type ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to make this its own file and share between the interceptor and persistence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where would be a good shared package? Mentioned in an earlier comment that I didn't do this on purpose for simplicity of the PR #7773 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking a new pkg under common/health
} | ||
|
||
// HealthSignalAggregatorImpl implements HealthSignalAggregator | ||
HealthSignalAggregatorImpl struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: healthSignalAggregatorImpl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to keep exported so that fx has access to it in the history package https://github.com/temporalio/temporal/blob/lanie/history-rpc-health-aggregator/service/history/fx.go#L190
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lmk if you think fx will keep a pointer to the same aggregator impl if it's passed in as an interface -- fx is a bit of a black box to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I think as long as the interface is accessible. You might need to declare a new interface for the health signal aggregator for persistence and history if you are going to have a shared package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed this to be unexported along with the other impl -- it needed an interface, not a struct
service/history/handler.go
Outdated
func (h *Handler) checkHistoryHealthSignals() *historyservice.DeepHealthCheckResponse { | ||
// Check that the RPC latency doesn't exceed the threshold. | ||
if _, ok := h.historyHealthSignal.(*interceptor.NoopSignalAggregator); ok { | ||
h.logger.Warn("health signal aggregator is using noop implementation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I don't think we need this.
metrics.HistoryHostHealthGauge.With(h.metricsHandler).Record(float64(enumsspb.HEALTH_STATE_NOT_SERVING)) | ||
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_NOT_SERVING} | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_SERVING}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I return nil, because the caller looks for a non nil response to determine whether to fail early. Otherwise, it does the other health checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes more sense to return the response and check the health state. Return if the health state if not serving
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we have more health state to check -- this is just a helper to encapsulate functionality. We don't know if it's serving or not yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case, could you return an error instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't an error though -- there was no error that occurred, just a lack of signal that anything is wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds a reusable gRPC interceptor to capture and expose both persistence and RPC health signals in the history service’s DeepHealthCheck endpoint.
- Introduces a
HealthCheckInterceptor
to log per-RPC latency and error ratios - Wires a history-specific
HealthSignalAggregator
into the handler and FX module - Defines new dynamic-config entries for RPC health thresholds
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
service/history/handler.go | Injects historyHealthSignal and checks RPC metrics in DeepHealthCheck via checkHistoryHealthSignals |
service/history/fx.go | Provides HealthSignalAggregatorProvider and HealthCheckInterceptorProvider in FX module |
service/history/configs/config.go | Adds HealthRPCLatencyFailure and HealthRPCErrorRatio config properties |
common/rpc/interceptor/health_check.go | Implements HealthCheckInterceptor to record and aggregate gRPC call health signals |
common/rpc/interceptor/health.go | Adds documentation for HealthInterceptor |
common/persistence/health_signal_aggregator.go | Renames persistence constructor to NewHealthSignalAggregator and adds doc for emitMetricsLoop |
common/persistence/client/fx.go | Updates persistence HealthSignalAggregatorProvider to call renamed constructor |
common/dynamicconfig/constants.go | Defines new dynamic-config keys for history RPC health settings |
Comments suppressed due to low confidence (1)
service/history/handler.go:229
- The new
checkHistoryHealthSignals
logic for RPC latency and error thresholds isn't currently covered by unit tests. Consider adding tests to validate each threshold condition.
func (h *Handler) checkHistoryHealthSignals() *historyservice.DeepHealthCheckResponse {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a health signal recording mechanism by adding an interceptor that monitors RPC latency and error ratios from both persistence and history operations. Key changes include:
- Adding a health signal aggregator for the history service.
- Integrating new health check interceptor providers in the fx module.
- Updating configuration and dynamic settings to support RPC-level health metrics.
Reviewed Changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
service/history/handler.go | Introduces history health signal recording and health check logic in health checks |
service/history/fx.go | Adds providers for health signal aggregator and health check interceptor |
service/history/configs/config.go | Adds new RPC health check configuration settings |
common/rpc/interceptor/health_check.go | Implements gRPC interceptor for recording health metrics |
common/rpc/interceptor/health.go | Adds documentation for HealthInterceptor |
common/persistence/health_signal_aggregator.go | Renames constructor for HealthSignalAggregator implementation |
common/persistence/client/fx.go | Updates provider call to the HealthSignalAggregator constructor |
common/dynamicconfig/constants.go | Introduces new dynamic config settings for RPC health thresholds |
Comments suppressed due to low confidence (2)
common/persistence/health_signal_aggregator.go:50
- The constructor has been renamed from NewHealthSignalAggregatorImpl to NewHealthSignalAggregator; verify that this naming change is consistently reflected across the codebase and documentation.
func NewHealthSignalAggregator(
common/dynamicconfig/constants.go:2441
- [nitpick] Review the default threshold of 500 for HealthRPCLatencyFailure to ensure it aligns with the expected service behavior and document the rationale if necessary.
HealthRPCLatencyFailure = NewGlobalFloatSetting(
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_NOT_SERVING}, nil | ||
} | ||
metrics.HistoryHostHealthGauge.With(h.metricsHandler).Record(float64(enumsspb.HEALTH_STATE_SERVING)) | ||
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_SERVING}, nil | ||
} | ||
|
||
// checkHistoryHealthSignals checks the history health signal that is captured by the interceptor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider expanding the inline comment for checkHistoryHealthSignals to explain the selection of latency and error ratio thresholds, which would aid future maintainability.
Copilot uses AI. Check for mistakes.
This adds an interceptor to the handler service that logs latency and errors from handlers, and can be reused in other services.
Adds the interceptor to the history service, that takes a health aggregator provider. This provider is also added to the handler such that we can read these values in the DeepHealthCheck handler within the history service.
A future PR will add this check to the other services.