-
Notifications
You must be signed in to change notification settings - Fork 0
Client watcher changes #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @shivaspeaks, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the gRPC xDS client's resource watching and error handling capabilities by implementing the new API changes defined in gRFC A88. It also adds support for fine-grained control over ORCA backend metric propagation as per gRFC A85. Additionally, it includes internal improvements to future management within the Binder transport and a specific configuration for UDS channel builders. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements two significant gRFCs: A88, which refactors the XdsClient watcher API for better error handling, and another one (likely A85) for ORCA metrics propagation. The changes are extensive, touching core xDS client logic, load balancers, and their configurations. The new watcher API with onResourceChanged(StatusOr<T>) and onAmbientError(Status) is adopted correctly across the codebase, improving the distinction between definitive resource errors and transient ambient errors. The ORCA metrics propagation feature is also well-integrated. Overall, the changes are well-structured and align with the gRFCs. However, I've found a critical issue in the retry logic for the xDS control plane client that needs to be addressed.
| rpcRetryTimer = | ||
| syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); | ||
|
|
||
| xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived); | ||
| // Notify the handler of the stream closure before cleaning up the stream state. | ||
| xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived); | ||
| close(status.asException()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a critical issue in the retry logic. The close(status.asException()) call at the end of this block cancels the rpcRetryTimer that was scheduled just a few lines above. This means that the retry for a closed ADS stream will never actually happen, as it's cancelled immediately after being scheduled. The close() call should be moved before scheduling the retry timer to ensure that the stream is cleaned up and then a new retry is correctly scheduled.
| rpcRetryTimer = | |
| syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); | |
| xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived); | |
| // Notify the handler of the stream closure before cleaning up the stream state. | |
| xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived); | |
| close(status.asException()); | |
| close(status.asException()); | |
| rpcRetryTimer = | |
| syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService); | |
| // Notify the handler of the stream closure before cleaning up the stream state. | |
| xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived); |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the new XdsClient watcher API as specified in gRFC A88, which is a significant and welcome improvement to the error handling logic. The changes from onChanged/onError/onResourceDoesNotExist to onResourceChanged(StatusOr<T>) and onAmbientError(Status) have been propagated consistently throughout the codebase. Additionally, this PR introduces support for ORCA LRS metric propagation, which is a valuable new feature. The refactoring in the binder transport to better manage future cancellations is also a good enhancement for maintainability. Overall, the changes are well-implemented, though I found one critical issue that will break the build.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements the new xDS client watcher API as specified in gRFC A88, which is a significant and positive change for error handling. The old onChanged, onError, and onResourceDoesNotExist methods are replaced by onResourceChanged(StatusOr<T>) and onAmbientError(Status). This provides a clearer distinction between definitive resource updates/errors and transient ambient errors. The implementation of this new API throughout the xDS client, dependency manager, and server wrapper appears correct and consistent with the gRFC.
Additionally, this PR introduces support for ORCA metrics propagation as per gRFC A85, including a new BackendMetricPropagation configuration object and related logic in the load statistics manager. This is also a valuable feature addition.
The refactoring in the binder transport to centralize future cancellation is a nice improvement for maintainability.
My main concerns are several commented-out assertions in the test files, marked as "flaky". While the implementation logic seems sound, these flaky tests could be hiding underlying race conditions or other bugs. It's important to investigate and fix these to ensure the reliability of the test suite and the correctness of the implementation. I've left specific comments on these occurrences.
| // okshiva: the changes introduced flakyness for the below assert | ||
| // assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion is commented out due to flakiness. This might be caused by other tests modifying the GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING system property, which affects the default value of ignoreResourceDeletion. To ensure test isolation, it's a good practice for tests that modify global state like system properties to save and restore them, for example using a JUnit rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist can you help me with solving this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I can help with that! The flakiness you're observing is indeed due to the GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING system property being modified by other tests, as it's a global state.
To ensure test isolation and fix the flakiness, you should save the original value of this system property before each test and restore it afterwards. This can be achieved by adding @Before and @After methods to your GrpcBootstrapperImplTest class. This will ensure that the ignoreResourceDeletion() method behaves predictably for your tests.
Here's how you can modify your test class:
import org.junit.After;
import org.junit.Before;
// ... other imports
public class GrpcBootstrapperImplTest {
private String originalXdsDataErrorHandling;
@Before
public void setUp() {
originalXdsDataErrorHandling = System.getProperty("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING");
System.setProperty("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING", "false");
}
@After
public void tearDown() {
if (originalXdsDataErrorHandling != null) {
System.setProperty("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING", originalXdsDataErrorHandling);
} else {
System.clearProperty("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING");
}
}
// ... your existing test methods
@Test
public void useV2ProtocolByDefault() throws XdsInitializationException {
// ... existing test code
assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); // This line can now be uncommented
}
@Test
public void useV3ProtocolIfV3FeaturePresent() throws XdsInitializationException {
// ... existing test code
assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); // This line can now be uncommented
}
}Once you've added the setUp and tearDown methods, you can safely uncomment the assertion in your test method. I've provided a suggestion below to uncomment the specific line.
| // okshiva: the changes introduced flakyness for the below assert | |
| // assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); | |
| assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); |
| // okshiva: the changes introduced flakyness for the below assert | ||
| // assertThat(serverInfo.ignoreResourceDeletion()).isFalse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gemini-code-assist can you help me with solving this?
| // okshiva: flaky | ||
| // assertThat(cdsUpdateCaptor.getValue().getStatus().isOk()).isTrue(); | ||
| // okshiva: I'm skeptical about this behaviour(commented in next line) | ||
| // assertThat(getLrsServerInfo("localhost:" + fallbackServer.getServer().getPort())).isNull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These assertions are commented out, with notes indicating flakiness and skepticism about the behavior. This suggests there might be an underlying issue, either with the test's timing assumptions or the implementation itself. It's important to resolve this uncertainty. If the behavior is incorrect, the implementation should be fixed. If the test is flaky, it should be stabilized. Leaving assertions commented out weakens the test suite.
| // okshiva: flaky | ||
| // verify(mockWatcher, timeout(20000)).onAmbientError(errorCaptor.capture()); | ||
| // assertThat(errorCaptor.getValue().getCode()).isEqualTo(Status.Code.NOT_FOUND); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These lines seem to be commented out due to flakiness. Flaky tests can hide real regressions. It would be better to investigate the root cause of the flakiness and fix it, or if that's not immediately possible, use a test framework annotation to mark the test as flaky (e.g., @FlakyTest) and file a ticket to track fixing it. Simply commenting out assertions makes the test less effective.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements changes to the XdsClient watcher API as per gRFC A88, replacing the existing methods with OnResourceChanged and OnAmbientError for improved error handling. The changes include modifications to several files to accommodate the new API and backend metric propagation. The review focuses on identifying potential issues related to correctness, efficiency, and maintainability, particularly concerning the new error handling mechanisms and the integration of backend metric propagation.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request implements changes to the XdsClient watcher API as defined in gRFC A88, focusing on improved error handling and backend metric propagation. The changes involve replacing the existing watcher methods with OnResourceChanged and OnAmbientError, and adding backend metric propagation to LRS. The review focuses on correctness and maintainability, with a particular emphasis on the new watcher APIs and the integration of backend metric propagation.
| (lrsServerInfo == null) | ||
| ? null | ||
| : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, | ||
| edsServiceName, locality); | ||
| edsServiceName, locality, backendMetricPropagation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ambientError = error.withDescription( | ||
| String.format("Ambient error for %s: %s. Details: %s%s", | ||
| toContextString(), | ||
| error.getCode(), | ||
| error.getDescription(), | ||
| nodeInfo())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onAmbientError method now handles transient errors. Ensure that the error.getDescription() is properly sanitized to prevent potential injection attacks, as this description is included in log messages.
ambientError = error.withDescription(
String.format("Ambient error for %s: %s. Details: %s%s",
toContextString(),
error.getCode(),
sanitize(error.getDescription()),
nodeInfo()));Rules References
| ListenableFuture<Status> preAuthResultFuture = | ||
| register(checkServerAuthorizationAsync(serviceInfo.applicationInfo.uid)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The register method is used here to ensure that the preAuthResultFuture is properly managed for cancellation. This is a good practice to prevent resource leaks, especially in long-lived transports. However, it's crucial to ensure that the register method itself is thread-safe and handles exceptions appropriately.
| ListenableFuture<Status> authResultFuture = | ||
| register(checkServerAuthorizationAsync(remoteUid)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (isEnabledOrcaLrsPropagation) { | ||
| stats.recordTopLevelMetrics( | ||
| report.getCpuUtilization(), | ||
| report.getMemoryUtilization(), | ||
| report.getApplicationUtilization()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| boolean isHttp11ProxyAvailable, | ||
| BackendMetricPropagation backendMetricPropagation) { | ||
| return newBuilder(clusterName) | ||
| .clusterType(ClusterType.LOGICAL_DNS) | ||
| .dnsHostName(dnsHostName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Status translatedStatus = Status.UNAVAILABLE.withDescription( | ||
| String.format("Error retrieving %s: %s. Details: %s%s", | ||
| toContextString(), | ||
| status.getCode(), | ||
| status.getDescription() != null ? status.getDescription() : "", | ||
| nodeInfo())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new onResourceChanged method consolidates the logic for handling both resource updates and errors. The use of StatusOr is appropriate for this purpose. However, the translated status is always UNAVAILABLE. It might be useful to propagate the original status code to provide more context about the error.
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s%s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : "",
nodeInfo()));
data = StatusOr.fromStatus(translatedStatus);Rules References
| if (!update.hasValue()) { | ||
| // This is a definitive resource error (e.g., NOT_FOUND). | ||
| // We must treat the resource as unavailable and tear down the server. | ||
| Status status = update.getStatus(); | ||
| StatusException statusException = Status.UNAVAILABLE.withDescription( | ||
| String.format("Listener %s unavailable: %s", resourceName, status.getDescription())) | ||
| .withCause(status.asException()) | ||
| .asException(); | ||
| handleConfigNotFoundOrMismatch(statusException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handling of !update.hasValue() is correct for treating resource errors. However, the comment "This check is now covered by the '!update.hasValue()' block above" is misleading, as the original check if (update.listener() == null) is still necessary to handle cases where the update itself is valid but contains a null listener.
| // Notify the handler of the stream closure before cleaning up the stream state. | ||
| xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code uses newStatus for logging the error, but the stream closure is notified with the original status. This could lead to confusion when debugging. It's better to use the statusToPropagate for both logging and notification.
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
statusToPropagate.getCode(), statusToPropagate.getDescription(), statusToPropagate.getCause());
}
close(status.asException());
// Notify the handler of the stream closure before cleaning up the stream state.
xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);Rules References
| if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) { | ||
| ignoreResourceDeletion = true; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for setting ignoreResourceDeletion seems a bit convoluted. It's initialized to xdsDataErrorHandlingEnabled and then potentially overridden based on the serverFeatures. It might be clearer to directly set it based on the presence of the feature, rather than using a double negative.
boolean ignoreResourceDeletion = false;
if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
ignoreResourceDeletion = true;
} else {
ignoreResourceDeletion = xdsDataErrorHandlingEnabled;
}
Implements XdsClient watcher API changes of gRFC A88- https://github.com/grpc/proposal/blob/master/A88-xds-data-error-handling.md
Changes to XdsClient Watcher APIs
The current XdsClient API has three methods:
OnResourceChanged(ResourceType resource): Invoked whenever a new version of the resource is received from the xDS server.
OnError(Status status): Invoked for all transient errors and for some data errors (NACKs). The watcher is generally expected to ignore the error if it already has a valid cached resource.
OnResourceDoesNotExist(): Invoked specifically for the does-not-exist case. The watcher is generally expected to stop using any previously cached resource and put itself into a failing state.
Note: The mechanism described in gRFC A53 inhibits these notifications for only the case where we already had a cached version of the resource, which in the SotW protocol applies only to LDS and CDS. That mechanism does not affect the case where we do not already have a cached version of the resource, which is where the does-not-exist timer is used.
These methods do not map well to the desired error handling behavior defined above.
This proposal replaces those methods with the following two methods:
OnResourceChanged(StatusOr resource): Will be invoked to notify the watcher of a new version of the resource received from the xDS server or an error indicating the reason why the resource cannot be obtained. Note that if an error is passed to this method after the watcher has previously seen a valid resource, the watcher is expected to stop using that previously delivered resource. In this case, the XdsClient will remove the resource from its cache, so that CSDS (see gRFC A40) and XdsClient metrics (see gRFC A78) will not reflect a resource that the client is not actually using.
OnAmbientError(Status status): Will be invoked to notify the watcher of an error that occurs after a resource has been received that should not modify the watcher's use of that resource but that may be useful information about the ambient state of the XdsClient. In particular, the watcher should not stop using the previously seen resource, and the XdsClient will not remove the resource from its cache. However, the error message may be useful as additional context to include in errors that are being generated for other reasons. For example, if failing a data plane RPC due to all endpoints being unreachable, it may be useful to also report that the client has lost contact with the xDS server. The error condition is considered to be cleared when either OnResourceChanged() is invoked or OnAmbientError() is invoked again with an OK status.