From 16c5311ecfc99ff4f1fdbf9484c914a7134be59e Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Wed, 8 Jan 2025 10:59:21 -0800 Subject: [PATCH] [xDS] replace "ignore_resource_deletion" server feature with "fail_on_data_errors" (#38278) Another piece of gRFC A88 (https://github.com/grpc/proposal/pull/466). Note that even with the feature disabled, this changes the way that ignored resource deletions are reflected in CSDS and XdsClient metrics. As a side effect, also fixes one of the edge cases described in #38094. Closes #38278 COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/38278 from markdroth:xds_client_fail_on_data_errors 7d31238b27b2fb1c02c76799da814da2e73642b5 PiperOrigin-RevId: 713348335 --- build_autogenerated.yaml | 2 + src/core/xds/grpc/xds_server_grpc.cc | 9 + src/core/xds/grpc/xds_server_grpc.h | 1 + src/core/xds/xds_client/xds_bootstrap.cc | 9 + src/core/xds/xds_client/xds_bootstrap.h | 6 + src/core/xds/xds_client/xds_client.cc | 167 ++++---- src/core/xds/xds_client/xds_client.h | 27 +- test/core/xds/xds_bootstrap_test.cc | 52 ++- test/core/xds/xds_client_test.cc | 405 +++++++++++++++--- test/cpp/end2end/xds/BUILD | 2 + .../end2end/xds/xds_cluster_end2end_test.cc | 73 +++- .../xds/xds_enabled_server_end2end_test.cc | 122 +++++- .../end2end/xds/xds_routing_end2end_test.cc | 83 +++- test/cpp/end2end/xds/xds_utils.cc | 3 + test/cpp/end2end/xds/xds_utils.h | 5 + 15 files changed, 774 insertions(+), 192 deletions(-) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index fbe0551dba2d3..f379693297bf8 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -25148,6 +25148,7 @@ targets: run: false language: c++ headers: + - test/core/test_util/scoped_env_var.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h - test/cpp/end2end/xds/xds_end2end_test_lib.h @@ -27312,6 +27313,7 @@ targets: run: false language: c++ headers: + - test/core/test_util/scoped_env_var.h - test/cpp/end2end/counted_service.h - test/cpp/end2end/test_service_impl.h - test/cpp/end2end/xds/xds_end2end_test_lib.h diff --git a/src/core/xds/grpc/xds_server_grpc.cc b/src/core/xds/grpc/xds_server_grpc.cc index 00541208036b2..b6e41374befcc 100644 --- a/src/core/xds/grpc/xds_server_grpc.cc +++ b/src/core/xds/grpc/xds_server_grpc.cc @@ -36,6 +36,9 @@ namespace { constexpr absl::string_view kServerFeatureIgnoreResourceDeletion = "ignore_resource_deletion"; +constexpr absl::string_view kServerFeatureFailOnDataErrors = + "fail_on_data_errors"; + constexpr absl::string_view kServerFeatureTrustedXdsServer = "trusted_xds_server"; @@ -46,6 +49,11 @@ bool GrpcXdsServer::IgnoreResourceDeletion() const { kServerFeatureIgnoreResourceDeletion)) != server_features_.end(); } +bool GrpcXdsServer::FailOnDataErrors() const { + return server_features_.find(std::string(kServerFeatureFailOnDataErrors)) != + server_features_.end(); +} + bool GrpcXdsServer::TrustedXdsServer() const { return server_features_.find(std::string(kServerFeatureTrustedXdsServer)) != server_features_.end(); @@ -126,6 +134,7 @@ void GrpcXdsServer::JsonPostLoad(const Json& json, const JsonArgs& args, for (const Json& feature_json : array) { if (feature_json.type() == Json::Type::kString && (feature_json.string() == kServerFeatureIgnoreResourceDeletion || + feature_json.string() == kServerFeatureFailOnDataErrors || feature_json.string() == kServerFeatureTrustedXdsServer)) { server_features_.insert(feature_json.string()); } diff --git a/src/core/xds/grpc/xds_server_grpc.h b/src/core/xds/grpc/xds_server_grpc.h index 9efc7fe8bd7a8..7b58e2a75b11b 100644 --- a/src/core/xds/grpc/xds_server_grpc.h +++ b/src/core/xds/grpc/xds_server_grpc.h @@ -35,6 +35,7 @@ class GrpcXdsServer final : public XdsBootstrap::XdsServer { const std::string& server_uri() const override { return server_uri_; } bool IgnoreResourceDeletion() const override; + bool FailOnDataErrors() const override; bool TrustedXdsServer() const; diff --git a/src/core/xds/xds_client/xds_bootstrap.cc b/src/core/xds/xds_client/xds_bootstrap.cc index 718ac64bab326..46ddcaef19402 100644 --- a/src/core/xds/xds_client/xds_bootstrap.cc +++ b/src/core/xds/xds_client/xds_bootstrap.cc @@ -34,4 +34,13 @@ bool XdsFederationEnabled() { return parse_succeeded && parsed_value; } +// TODO(roth): Remove this once the feature passes interop tests. +bool XdsDataErrorHandlingEnabled() { + auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"); + if (!value.has_value()) return false; + bool parsed_value; + bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value); + return parse_succeeded && parsed_value; +} + } // namespace grpc_core diff --git a/src/core/xds/xds_client/xds_bootstrap.h b/src/core/xds/xds_client/xds_bootstrap.h index 62f929b4c9d14..fedc10bc92fe6 100644 --- a/src/core/xds/xds_client/xds_bootstrap.h +++ b/src/core/xds/xds_client/xds_bootstrap.h @@ -26,6 +26,7 @@ namespace grpc_core { bool XdsFederationEnabled(); +bool XdsDataErrorHandlingEnabled(); class XdsBootstrap { public: @@ -46,8 +47,13 @@ class XdsBootstrap { virtual ~XdsServer() = default; virtual const std::string& server_uri() const = 0; + + // TODO(roth): Remove this method once the data error handling + // feature passes interop tests. virtual bool IgnoreResourceDeletion() const = 0; + virtual bool FailOnDataErrors() const = 0; + virtual bool Equals(const XdsServer& other) const = 0; // Returns a key to be used for uniquely identifying this XdsServer. diff --git a/src/core/xds/xds_client/xds_client.cc b/src/core/xds/xds_client/xds_client.cc index 4261f6519c4ed..51ba55c1ca1f4 100644 --- a/src/core/xds/xds_client/xds_client.cc +++ b/src/core/xds/xds_client/xds_client.cc @@ -236,9 +236,9 @@ class XdsClient::XdsChannel::AdsCall final name_.authority, type_->type_url(), name_.key) << "} from xds server"; resource_seen_ = true; - state.SetDoesNotExist(); + state.SetDoesNotExistOnTimeout(); ads_call_->xds_client()->NotifyWatchersOnResourceChanged( - absl::NotFoundError("does not exist"), state.watchers(), + state.failed_status(), state.watchers(), ReadDelayHandle::NoWait()); } } @@ -978,8 +978,8 @@ void XdsClient::XdsChannel::AdsCall::ParseResource( if (authority_it == xds_client()->authority_state_map_.end()) { return; // Skip resource -- we don't have a subscription for it. } - // Found authority, so look up type. AuthorityState& authority_state = authority_it->second; + // Found authority, so look up type. auto type_it = authority_state.resource_map.find(context->type); if (type_it == authority_state.resource_map.end()) { return; // Skip resource -- we don't have a subscription for it. @@ -996,32 +996,17 @@ void XdsClient::XdsChannel::AdsCall::ParseResource( context->resources_seen[parsed_resource_name->authority].insert( parsed_resource_name->key); } - // If we previously ignored the resource's deletion, log that we're - // now re-adding it. - if (resource_state.ignored_deletion()) { - LOG(INFO) << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": server returned new version of resource for which we " - "previously ignored a deletion: type " - << type_url << " name " << resource_name; - resource_state.set_ignored_deletion(false); - } // Update resource state based on whether the resource is valid. - absl::Status status = absl::InvalidArgumentError( - absl::StrCat("invalid resource: ", decode_status.ToString())); if (!decode_status.ok()) { - if (!resource_state.HasResource()) { - xds_client()->NotifyWatchersOnResourceChanged(std::move(status), - resource_state.watchers(), - context->read_delay_handle); - } else { - xds_client()->NotifyWatchersOnAmbientError(std::move(status), - resource_state.watchers(), - context->read_delay_handle); - } - resource_state.SetNacked(context->version, decode_status.ToString(), - context->update_time); ++context->num_invalid_resources; + // If the fail_on_data_errors server feature is present, drop the + // existing cached resource, if any. + const bool drop_cached_resource = XdsDataErrorHandlingEnabled() && + xds_channel()->server_.FailOnDataErrors(); + resource_state.SetNacked(context->version, decode_status.message(), + context->update_time, drop_cached_resource); + xds_client()->NotifyWatchersOnError(resource_state, + context->read_delay_handle); return; } // Resource is valid. @@ -1216,23 +1201,14 @@ void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) { // that the resource does not exist. For that case, we rely on // the request timeout instead. if (!resource_state.HasResource()) continue; - if (xds_channel()->server_.IgnoreResourceDeletion()) { - if (!resource_state.ignored_deletion()) { - LOG(ERROR) - << "[xds_client " << xds_client() << "] xds server " - << xds_channel()->server_.server_uri() - << ": ignoring deletion for resource type " - << context.type_url << " name " - << XdsClient::ConstructFullXdsResourceName( - authority, context.type_url.c_str(), resource_key); - resource_state.set_ignored_deletion(true); - } - } else { - resource_state.SetDoesNotExist(); - xds_client()->NotifyWatchersOnResourceChanged( - absl::NotFoundError("does not exist"), - resource_state.watchers(), context.read_delay_handle); - } + const bool drop_cached_resource = + XdsDataErrorHandlingEnabled() + ? xds_channel()->server_.FailOnDataErrors() + : !xds_channel()->server_.IgnoreResourceDeletion(); + resource_state.SetDoesNotExistOnLdsOrCdsDeletion( + context.version, context.update_time, drop_cached_resource); + xds_client()->NotifyWatchersOnError(resource_state, + context.read_delay_handle); } } } @@ -1329,31 +1305,50 @@ void XdsClient::ResourceState::SetAcked( update_time_ = update_time; version_ = std::move(version); failed_version_.clear(); - failed_details_.clear(); + failed_status_ = absl::OkStatus(); } void XdsClient::ResourceState::SetNacked(const std::string& version, - const std::string& details, - Timestamp update_time) { + absl::string_view details, + Timestamp update_time, + bool drop_cached_resource) { + if (drop_cached_resource) { + resource_.reset(); + serialized_proto_.clear(); + } client_status_ = ClientResourceStatus::NACKED; failed_version_ = version; - failed_details_ = details; + failed_status_ = + absl::InvalidArgumentError(absl::StrCat("invalid resource: ", details)); failed_update_time_ = update_time; } -void XdsClient::ResourceState::SetDoesNotExist() { - resource_.reset(); - serialized_proto_.clear(); +void XdsClient::ResourceState::SetDoesNotExistOnTimeout() { client_status_ = ClientResourceStatus::DOES_NOT_EXIST; + failed_status_ = absl::NotFoundError("does not exist"); failed_version_.clear(); } +void XdsClient::ResourceState::SetDoesNotExistOnLdsOrCdsDeletion( + const std::string& version, Timestamp update_time, + bool drop_cached_resource) { + if (drop_cached_resource) { + resource_.reset(); + serialized_proto_.clear(); + } + client_status_ = ClientResourceStatus::DOES_NOT_EXIST; + failed_status_ = absl::NotFoundError("does not exist"); + failed_version_ = version; + failed_update_time_ = update_time; +} + absl::string_view XdsClient::ResourceState::CacheStateString() const { switch (client_status_) { case ClientResourceStatus::REQUESTED: return "requested"; case ClientResourceStatus::DOES_NOT_EXIST: - return "does_not_exist"; + return resource_ != nullptr ? "does_not_exist_but_cached" + : "does_not_exist"; case ClientResourceStatus::ACKED: return "acked"; case ClientResourceStatus::NACKED: @@ -1395,14 +1390,16 @@ void XdsClient::ResourceState::FillGenericXdsConfig( google_protobuf_Any_set_value(any_field, StdStringToUpbString(serialized_proto_)); } - if (client_status_ == ClientResourceStatus::NACKED) { + if (!failed_status_.ok()) { auto* update_failure_state = envoy_admin_v3_UpdateFailureState_new(arena); envoy_admin_v3_UpdateFailureState_set_details( - update_failure_state, StdStringToUpbString(failed_details_)); - envoy_admin_v3_UpdateFailureState_set_version_info( - update_failure_state, StdStringToUpbString(failed_version_)); - envoy_admin_v3_UpdateFailureState_set_last_update_attempt( - update_failure_state, EncodeTimestamp(failed_update_time_, arena)); + update_failure_state, StdStringToUpbString(failed_status_.message())); + if (!failed_version_.empty()) { + envoy_admin_v3_UpdateFailureState_set_version_info( + update_failure_state, StdStringToUpbString(failed_version_)); + envoy_admin_v3_UpdateFailureState_set_last_update_attempt( + update_failure_state, EncodeTimestamp(failed_update_time_, arena)); + } envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_error_state( entry, update_failure_state); } @@ -1527,7 +1524,6 @@ void XdsClient::WatchResource(const XdsResourceType* type, bool first_watcher_for_resource = it_is_new.second; ResourceState& resource_state = it_is_new.first->second; resource_state.AddWatcher(watcher); - bool notified_watcher = false; if (first_watcher_for_resource) { // We try to add new channels in 2 cases: // - This is the first resource for this authority (i.e., the list @@ -1563,26 +1559,13 @@ void XdsClient::WatchResource(const XdsResourceType* type, << name; NotifyWatchersOnResourceChanged(resource_state.resource(), {watcher}, ReadDelayHandle::NoWait()); - notified_watcher = true; - } else if (resource_state.client_status() == - ResourceState::ClientResourceStatus::DOES_NOT_EXIST) { - GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << this - << "] reporting cached does-not-exist for " << name; - NotifyWatchersOnResourceChanged(absl::NotFoundError("does not exist"), - {watcher}, ReadDelayHandle::NoWait()); - notified_watcher = true; - } else if (resource_state.client_status() == - ResourceState::ClientResourceStatus::NACKED) { + } + if (!resource_state.failed_status().ok()) { GRPC_TRACE_LOG(xds_client, INFO) - << "[xds_client " << this - << "] reporting cached validation failure for " << name << ": " - << resource_state.failed_details(); - NotifyWatchersOnResourceChanged( - absl::InvalidArgumentError(absl::StrCat( - "invalid resource: ", resource_state.failed_details())), - {watcher}, ReadDelayHandle::NoWait()); - notified_watcher = true; + << "[xds_client " << this << "] returning cached error for " << name + << ": " << resource_state.failed_status(); + NotifyWatchersOnError(resource_state, ReadDelayHandle::NoWait(), + {watcher}); } } // If the channel is not connected, report an error to the watcher. @@ -1591,13 +1574,8 @@ void XdsClient::WatchResource(const XdsResourceType* type, GRPC_TRACE_LOG(xds_client, INFO) << "[xds_client " << this << "] returning cached channel error for " << name << ": " << channel_status; - if (notified_watcher) { - NotifyWatchersOnAmbientError(std::move(channel_status), {watcher}, - ReadDelayHandle::NoWait()); - } else { - NotifyWatchersOnResourceChanged(std::move(channel_status), {watcher}, - ReadDelayHandle::NoWait()); - } + NotifyWatchersOnError(resource_state, ReadDelayHandle::NoWait(), + {watcher}, std::move(channel_status)); } } work_serializer_.DrainQueue(); @@ -1629,12 +1607,6 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type, resource_state.RemoveWatcher(watcher); // Clean up empty map entries, if any. if (!resource_state.HasWatchers()) { - if (resource_state.ignored_deletion()) { - LOG(INFO) << "[xds_client " << this - << "] unsubscribing from a resource for which we " - << "previously ignored a deletion: type " << type->type_url() - << " name " << name; - } for (const auto& xds_channel : authority_state.xds_channels) { xds_channel->UnsubscribeLocked(type, *resource_name, delay_unsubscription); @@ -1757,6 +1729,21 @@ void XdsClient::NotifyWatchersOnAmbientError( DEBUG_LOCATION); } +void XdsClient::NotifyWatchersOnError( + const ResourceState& resource_state, + RefCountedPtr read_delay_handle, WatcherSet watchers, + absl::Status status) { + if (watchers.empty()) watchers = resource_state.watchers(); + if (status.ok()) status = resource_state.failed_status(); + if (!resource_state.HasResource()) { + NotifyWatchersOnResourceChanged(std::move(status), std::move(watchers), + std::move(read_delay_handle)); + } else { + NotifyWatchersOnAmbientError(std::move(status), std::move(watchers), + std::move(read_delay_handle)); + } +} + void XdsClient::DumpClientConfig( std::set* string_pool, upb_Arena* arena, envoy_service_status_v3_ClientConfig* client_config) { diff --git a/src/core/xds/xds_client/xds_client.h b/src/core/xds/xds_client/xds_client.h index 4243685e05fa5..0f0e52a9daec1 100644 --- a/src/core/xds/xds_client/xds_client.h +++ b/src/core/xds/xds_client/xds_client.h @@ -299,12 +299,12 @@ class XdsClient : public DualRefCounted { void SetAcked(std::shared_ptr resource, std::string serialized_proto, std::string version, Timestamp update_time); - void SetNacked(const std::string& version, const std::string& details, - Timestamp update_time); - void SetDoesNotExist(); - - void set_ignored_deletion(bool value) { ignored_deletion_ = value; } - bool ignored_deletion() const { return ignored_deletion_; } + void SetNacked(const std::string& version, absl::string_view details, + Timestamp update_time, bool drop_cached_resource); + void SetDoesNotExistOnTimeout(); + void SetDoesNotExistOnLdsOrCdsDeletion(const std::string& version, + Timestamp update_time, + bool drop_cached_resource); ClientResourceStatus client_status() const { return client_status_; } absl::string_view CacheStateString() const; @@ -314,7 +314,7 @@ class XdsClient : public DualRefCounted { return resource_; } - absl::string_view failed_details() const { return failed_details_; } + const absl::Status& failed_status() const { return failed_status_; } void FillGenericXdsConfig( upb_StringView type_url, upb_StringView resource_name, upb_Arena* arena, @@ -335,11 +335,9 @@ class XdsClient : public DualRefCounted { // The rejected version string of the last failed update attempt. std::string failed_version_; // Details about the last failed update attempt. - std::string failed_details_; + absl::Status failed_status_; // Timestamp of the last failed update attempt. Timestamp failed_update_time_; - // If we've ignored deletion. - bool ignored_deletion_ = false; }; struct AuthorityState { @@ -359,6 +357,15 @@ class XdsClient : public DualRefCounted { void NotifyWatchersOnAmbientError( absl::Status status, WatcherSet watchers, RefCountedPtr read_delay_handle); + // Notifies watchers for resource_state of an error, using + // OnResourceChanged() if there is no cached resource or + // OnAmbientError() if there is a cached resource. + void NotifyWatchersOnError(const ResourceState& resource_state, + RefCountedPtr read_delay_handle, + // If empty, will use resource_state.watchers(). + WatcherSet watchers = {}, + // If OK, will use resource_state.failed_status(). + absl::Status status = absl::OkStatus()); void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); diff --git a/test/core/xds/xds_bootstrap_test.cc b/test/core/xds/xds_bootstrap_test.cc index 2eaa7bd5be855..191ce92de9ddc 100644 --- a/test/core/xds/xds_bootstrap_test.cc +++ b/test/core/xds/xds_bootstrap_test.cc @@ -53,8 +53,8 @@ namespace grpc_core { namespace testing { namespace { -MATCHER_P4(EqXdsServer, name, creds_config_type, ignore_resource_deletion, - trusted_xds_server, "equals XdsServer") { +MATCHER_P5(EqXdsServer, name, creds_config_type, ignore_resource_deletion, + fail_on_data_errors, trusted_xds_server, "equals XdsServer") { auto* server = static_cast(arg); if (!::testing::ExplainMatchResult(::testing::Ne(nullptr), server, result_listener)) { @@ -65,6 +65,8 @@ MATCHER_P4(EqXdsServer, name, creds_config_type, ignore_resource_deletion, ok |= ::testing::ExplainMatchResult(server->IgnoreResourceDeletion(), ignore_resource_deletion, result_listener); + ok |= ::testing::ExplainMatchResult(server->FailOnDataErrors(), + fail_on_data_errors, result_listener); ok |= ::testing::ExplainMatchResult(server->TrustedXdsServer(), trusted_xds_server, result_listener); auto creds_config = server->channel_creds_config(); @@ -125,8 +127,8 @@ TEST(XdsBootstrapTest, Basic) { " }" " ]," " \"server_features\": [" - " \"xds_v3\"," - " \"trusted_xds_server\"" + " \"trusted_xds_server\"," + " \"fail_on_data_errors\"" " ]" " }" " ]" @@ -153,8 +155,9 @@ TEST(XdsBootstrapTest, Basic) { auto bootstrap_or = GrpcXdsBootstrap::Create(json_str); ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status(); auto bootstrap = std::move(*bootstrap_or); - EXPECT_THAT(bootstrap->servers(), ::testing::ElementsAre(EqXdsServer( - "fake:///lb1", "fake", false, false))); + EXPECT_THAT(bootstrap->servers(), + ::testing::ElementsAre( + EqXdsServer("fake:///lb1", "fake", false, false, false))); EXPECT_EQ(bootstrap->authorities().size(), 2); auto* authority = static_cast( bootstrap->LookupAuthority("xds.example.com")); @@ -163,8 +166,8 @@ TEST(XdsBootstrapTest, Basic) { "xdstp://xds.example.com/envoy.config.listener.v3.Listener/grpc/" "server/%s"); EXPECT_THAT(authority->servers(), - ::testing::ElementsAre( - EqXdsServer("fake:///xds_server", "fake", true, false))); + ::testing::ElementsAre(EqXdsServer("fake:///xds_server", "fake", + true, false, false))); authority = static_cast( bootstrap->LookupAuthority("xds.example2.com")); ASSERT_NE(authority, nullptr); @@ -172,8 +175,8 @@ TEST(XdsBootstrapTest, Basic) { "xdstp://xds.example2.com/envoy.config.listener.v3.Listener/grpc/" "server/%s"); EXPECT_THAT(authority->servers(), - ::testing::ElementsAre( - EqXdsServer("fake:///xds_server3", "fake", false, true))); + ::testing::ElementsAre(EqXdsServer("fake:///xds_server3", "fake", + false, true, true))); ASSERT_NE(bootstrap->node(), nullptr); EXPECT_EQ(bootstrap->node()->id(), "foo"); EXPECT_EQ(bootstrap->node()->cluster(), "bar"); @@ -209,8 +212,9 @@ TEST(XdsBootstrapTest, ValidWithoutNode) { auto bootstrap_or = GrpcXdsBootstrap::Create(json_str); ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status(); auto bootstrap = std::move(*bootstrap_or); - EXPECT_THAT(bootstrap->servers(), ::testing::ElementsAre(EqXdsServer( - "fake:///lb", "fake", false, false))); + EXPECT_THAT(bootstrap->servers(), + ::testing::ElementsAre( + EqXdsServer("fake:///lb", "fake", false, false, false))); EXPECT_EQ(bootstrap->node(), nullptr); } @@ -229,7 +233,7 @@ TEST(XdsBootstrapTest, InsecureCreds) { auto bootstrap = std::move(*bootstrap_or); EXPECT_THAT(bootstrap->servers(), ::testing::ElementsAre( - EqXdsServer("fake:///lb", "insecure", false, false))); + EqXdsServer("fake:///lb", "insecure", false, false, false))); EXPECT_EQ(bootstrap->node(), nullptr); } @@ -263,8 +267,8 @@ TEST(XdsBootstrapTest, GoogleDefaultCreds) { ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status(); auto bootstrap = std::move(*bootstrap_or); EXPECT_THAT(bootstrap->servers(), - ::testing::ElementsAre( - EqXdsServer("fake:///lb", "google_default", false, false))); + ::testing::ElementsAre(EqXdsServer("fake:///lb", "google_default", + false, false, false))); EXPECT_EQ(bootstrap->node(), nullptr); } @@ -707,6 +711,7 @@ TEST(XdsBootstrapTest, XdsServerToJsonAndParse) { " ]," " \"ignore\": 0," " \"server_features\": [" + " \"fail_on_data_errors\"," " \"ignore_resource_deletion\"," " \"trusted_xds_server\"" " ]" @@ -794,17 +799,18 @@ TEST(XdsBootstrapTest, MultipleXdsServers) { auto bootstrap_or = GrpcXdsBootstrap::Create(json_str); ASSERT_TRUE(bootstrap_or.ok()) << bootstrap_or.status(); auto bootstrap = std::move(*bootstrap_or); - EXPECT_THAT( - bootstrap->servers(), - ::testing::ElementsAre(EqXdsServer("fake:///lb1", "fake", false, false), - EqXdsServer("fake:///lb2", "fake", false, false))); + EXPECT_THAT(bootstrap->servers(), + ::testing::ElementsAre( + EqXdsServer("fake:///lb1", "fake", false, false, false), + EqXdsServer("fake:///lb2", "fake", false, false, false))); auto* authority = static_cast( bootstrap->LookupAuthority("xds.example.com")); ASSERT_NE(authority, nullptr); - EXPECT_THAT(authority->servers(), - ::testing::ElementsAre( - EqXdsServer("fake:///xds_server", "fake", false, false), - EqXdsServer("fake:///xds_server2", "fake", false, false))); + EXPECT_THAT( + authority->servers(), + ::testing::ElementsAre( + EqXdsServer("fake:///xds_server", "fake", false, false, false), + EqXdsServer("fake:///xds_server2", "fake", false, false, false))); } } // namespace diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc index c15d3215d7048..28b4a98f27ed5 100644 --- a/test/core/xds/xds_client_test.cc +++ b/test/core/xds/xds_client_test.cc @@ -139,25 +139,26 @@ class XdsClientTest : public ::testing::Test { public: explicit FakeXdsServer( absl::string_view server_uri = kDefaultXdsServerUrl, - bool ignore_resource_deletion = false) + bool fail_on_data_errors = false) : server_uri_(server_uri), - ignore_resource_deletion_(ignore_resource_deletion) {} + fail_on_data_errors_(fail_on_data_errors) {} const std::string& server_uri() const override { return server_uri_; } bool IgnoreResourceDeletion() const override { - return ignore_resource_deletion_; + return !fail_on_data_errors_; } + bool FailOnDataErrors() const override { return fail_on_data_errors_; } bool Equals(const XdsServer& other) const override { const auto& o = static_cast(other); return server_uri_ == o.server_uri_ && - ignore_resource_deletion_ == o.ignore_resource_deletion_; + fail_on_data_errors_ == o.fail_on_data_errors_; } std::string Key() const override { - return absl::StrCat(server_uri_, "#", ignore_resource_deletion_); + return absl::StrCat(server_uri_, "#", fail_on_data_errors_); } private: std::string server_uri_; - bool ignore_resource_deletion_ = false; + bool fail_on_data_errors_ = false; }; class FakeAuthority : public Authority { @@ -1087,12 +1088,14 @@ MATCHER_P2(CsdsResourceRequested, type_url, name, arg, result_listener); } -// Convenient wrapper for DOES_NOT_EXIST resources in CSDS. -MATCHER_P2(CsdsResourceDoesNotExist, type_url, name, - "equals CSDS does-not-exist resource") { +// Convenient wrapper for DOES_NOT_EXIST resources in CSDS caused by +// the resource timer. +MATCHER_P2(CsdsResourceDoesNotExistOnTimeout, type_url, name, + "equals CSDS does-not-exist-on-timeout resource") { return ::testing::ExplainMatchResult( CsdsResourceEq(ClientResourceStatus::DOES_NOT_EXIST, type_url, name, - CsdsNoResourceFields(), CsdsNoErrorFields()), + CsdsNoResourceFields(), + CsdsErrorDetailsOnly("does not exist")), arg, result_listener); } @@ -1779,7 +1782,7 @@ TEST_F(XdsClientTest, ResourceValidationFailure) { ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "invalid resource: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; // Check metric data. @@ -1803,7 +1806,7 @@ TEST_F(XdsClientTest, ResourceValidationFailure) { ::testing::ElementsAre(CsdsResourceEq( ClientResourceStatus::NACKED, XdsFooResourceType::Get()->type_url(), "foo1", CsdsNoResourceFields(), - CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: " + CsdsErrorFields("invalid resource: errors validating JSON: " "[field:value error:is not a number]", "1", TimestampProtoEq(kTime0))))); // XdsClient should NACK the update. @@ -1826,7 +1829,7 @@ TEST_F(XdsClientTest, ResourceValidationFailure) { ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "invalid resource: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; // Now server sends an updated version of the resource. @@ -2030,14 +2033,14 @@ TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) { ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "invalid resource: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; error = watcher3->WaitForNextError(); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: JSON parsing failed: " + "invalid resource: JSON parsing failed: " "[JSON parse error at index 15] (node ID:xds_client_test)") << *error; // It cannot delivery an error for foo2, because the client doesn't know @@ -2087,7 +2090,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) { ClientResourceStatus::NACKED, XdsFooResourceType::Get()->type_url(), "foo1", CsdsNoResourceFields(), - CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: " + CsdsErrorFields("invalid resource: errors validating JSON: " "[field:value error:is not a number]", "1", TimestampProtoEq(kTime0))), CsdsResourceRequested(XdsFooResourceType::Get()->type_url(), "foo2"), @@ -2095,7 +2098,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureMultipleResources) { ClientResourceStatus::NACKED, XdsFooResourceType::Get()->type_url(), "foo3", CsdsNoResourceFields(), - CsdsErrorFields("INVALID_ARGUMENT: JSON parsing failed: " + CsdsErrorFields("invalid resource: JSON parsing failed: " "[JSON parse error at index 15]", "1", TimestampProtoEq(kTime0))), CsdsResourceAcked(XdsFooResourceType::Get()->type_url(), "foo4", @@ -2207,7 +2210,278 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); EXPECT_EQ(error->message(), - "invalid resource: INVALID_ARGUMENT: errors validating JSON: " + "invalid resource: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)") + << *error; + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::_)); + EXPECT_THAT(GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), + "nacked_but_cached"), + 1))); + // Check CSDS data. + csds = DumpCsds(); + EXPECT_THAT(csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceEq( + ClientResourceStatus::NACKED, + XdsFooResourceType::Get()->type_url(), "foo1", + CsdsResourceFields(resource->AsJsonString(), "1", + TimestampProtoEq(kTime0)), + CsdsErrorFields("invalid resource: errors validating JSON: " + "[field:value error:is not a number]", + "2", TimestampProtoEq(kTime1))))); + // XdsClient should NACK the update. + // Note that version_info is set to the previous version in this request, + // because there were no valid resources in it. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest( + *request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + // error_detail= + absl::InvalidArgumentError( + "xDS response validation errors: [" + "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number]]"), + /*resource_names=*/{"foo1"}); + // Start a second watcher for the same resource. The watcher should + // first get the cached resource and then the ambient error. + auto watcher2 = StartFooWatch("foo1"); + resource = watcher2->WaitForNextResource(); + ASSERT_NE(resource, nullptr); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + error = watcher2->WaitForNextAmbientError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ( + *error, + absl::InvalidArgumentError( + "invalid resource: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)")); + // Cancel watches. + CancelFooWatch(watcher.get(), "foo1"); + CancelFooWatch(watcher2.get(), "foo1"); + EXPECT_TRUE(stream->IsOrphaned()); +} + +TEST_F(XdsClientTest, + ResourceValidationFailureForCachedResourceWithFailOnDataErrors) { + testing::ScopedEnvVar env_var("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING", + "true"); + InitXdsClient(FakeXdsBootstrap::Builder().SetServers( + {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)})); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource("foo1", 6)) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_NE(resource, nullptr); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::ElementsAre(), ::testing::_)); + EXPECT_THAT( + GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), "acked"), + 1))); + // Check CSDS data. + ClientConfig csds = DumpCsds(); + EXPECT_THAT(csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceAcked( + XdsFooResourceType::Get()->type_url(), "foo1", + resource->AsJsonString(), "1", TimestampProtoEq(kTime0)))); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Send an update containing an invalid resource. + // We increment time to make sure that the CSDS data gets a new timestamp. + time_cache_.TestOnlySetNow(kTime1); + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddInvalidResource(XdsFooResourceType::Get()->type_url(), + "{\"name\":\"foo1\",\"value\":[]}") + .Serialize()); + // XdsClient should deliver an error to the watcher. + auto error = watcher->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); + EXPECT_EQ(error->message(), + "invalid resource: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)") + << *error; + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::_)); + EXPECT_THAT(GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), + "nacked"), + 1))); + // CSDS should show that the resource has been requested. + csds = DumpCsds(); + EXPECT_THAT( + csds.generic_xds_configs(), + ::testing::ElementsAre(CsdsResourceEq( + ClientResourceStatus::NACKED, XdsFooResourceType::Get()->type_url(), + "foo1", CsdsNoResourceFields(), + CsdsErrorFields("invalid resource: errors validating JSON: " + "[field:value error:is not a number]", + "2", TimestampProtoEq(kTime1))))); + // XdsClient should NACK the update. + // Note that version_info is set to the previous version in this request, + // because there were no valid resources in it. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest( + *request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"B", + // error_detail= + absl::InvalidArgumentError( + "xDS response validation errors: [" + "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " + "[field:value error:is not a number]]"), + /*resource_names=*/{"foo1"}); + // Start a second watcher for the same resource. This should deliver + // the error to the watcher immediately. + auto watcher2 = StartFooWatch("foo1"); + error = watcher2->WaitForNextError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); + EXPECT_EQ(error->message(), + "invalid resource: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)") + << *error; + // Cancel watches. + CancelFooWatch(watcher.get(), "foo1"); + CancelFooWatch(watcher2.get(), "foo1"); + EXPECT_TRUE(stream->IsOrphaned()); +} + +TEST_F(XdsClientTest, + ResourceValidationFailureForCachedResourceWithFailOnDataErrorsDisabled) { + InitXdsClient(FakeXdsBootstrap::Builder().SetServers( + {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)})); + // Start a watch for "foo1". + auto watcher = StartFooWatch("foo1"); + // Watcher should initially not see any resource reported. + EXPECT_FALSE(watcher->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"", /*response_nonce=*/"", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + CheckRequestNode(*request); // Should be present on the first request. + // Send a response. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource("foo1", 6)) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource = watcher->WaitForNextResource(); + ASSERT_NE(resource, nullptr); + EXPECT_EQ(resource->name, "foo1"); + EXPECT_EQ(resource->value, 6); + // Check metric data. + EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( + ::testing::ElementsAre(::testing::Pair( + ::testing::Pair(kDefaultXdsServerUrl, + XdsFooResourceType::Get()->type_url()), + 1)), + ::testing::ElementsAre(), ::testing::_)); + EXPECT_THAT( + GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq(XdsClient::kOldStyleAuthority, + XdsFooResourceType::Get()->type_url(), "acked"), + 1))); + // Check CSDS data. + ClientConfig csds = DumpCsds(); + EXPECT_THAT(csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceAcked( + XdsFooResourceType::Get()->type_url(), "foo1", + resource->AsJsonString(), "1", TimestampProtoEq(kTime0)))); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1"}); + // Send an update containing an invalid resource. + // We increment time to make sure that the CSDS data gets a new timestamp. + time_cache_.TestOnlySetNow(kTime1); + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddInvalidResource(XdsFooResourceType::Get()->type_url(), + "{\"name\":\"foo1\",\"value\":[]}") + .Serialize()); + // XdsClient should deliver an ambient error to the watcher. + auto error = watcher->WaitForNextAmbientError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code(), absl::StatusCode::kInvalidArgument); + EXPECT_EQ(error->message(), + "invalid resource: errors validating JSON: " "[field:value error:is not a number] (node ID:xds_client_test)") << *error; // Check metric data. @@ -2235,7 +2509,7 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { XdsFooResourceType::Get()->type_url(), "foo1", CsdsResourceFields(resource->AsJsonString(), "1", TimestampProtoEq(kTime0)), - CsdsErrorFields("INVALID_ARGUMENT: errors validating JSON: " + CsdsErrorFields("invalid resource: errors validating JSON: " "[field:value error:is not a number]", "2", TimestampProtoEq(kTime1))))); // XdsClient should NACK the update. @@ -2252,19 +2526,20 @@ TEST_F(XdsClientTest, ResourceValidationFailureForCachedResource) { "resource index 0: foo1: INVALID_ARGUMENT: errors validating JSON: " "[field:value error:is not a number]]"), /*resource_names=*/{"foo1"}); - // Start a second watcher for the same resource. Even though the last - // update was a NACK, we should still deliver the cached resource to - // the watcher. - // TODO(roth): Consider what the right behavior is here. It seems - // inconsistent that the watcher sees the error if it had started - // before the error was seen but does not if it was started afterwards. - // One option is to not send errors at all for already-cached resources; - // another option is to send the errors even for newly started watchers. + // Start a second watcher for the same resource. The watcher should + // first get the cached resource and then the ambient error. auto watcher2 = StartFooWatch("foo1"); resource = watcher2->WaitForNextResource(); ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "foo1"); EXPECT_EQ(resource->value, 6); + error = watcher2->WaitForNextAmbientError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ( + *error, + absl::InvalidArgumentError( + "invalid resource: errors validating JSON: " + "[field:value error:is not a number] (node ID:xds_client_test)")); // Cancel watches. CancelFooWatch(watcher.get(), "foo1"); CancelFooWatch(watcher2.get(), "foo1"); @@ -2346,9 +2621,11 @@ TEST_F(XdsClientTest, WildcardCapableResponseWithEmptyResource) { // This tests resource removal triggered by the server when using a // resource type that requires all resources to be present in every -// response, similar to LDS and CDS. -TEST_F(XdsClientTest, ResourceDeletion) { - InitXdsClient(); +// response, similar to LDS and CDS. It configures the +// fail_on_data_errors server feature. +TEST_F(XdsClientTest, ResourceDeletionWithFailOnDataErrors) { + InitXdsClient(FakeXdsBootstrap::Builder().SetServers( + {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)})); // Start a watch for "wc1". auto watcher = StartWildcardCapableWatch("wc1"); // Watcher should initially not see any resource reported. @@ -2405,6 +2682,8 @@ TEST_F(XdsClientTest, ResourceDeletion) { /*resource_names=*/{"wc1"}); // Server now sends a response without the resource, thus indicating // it's been deleted. + // We increment time to make sure that the CSDS data gets a new timestamp. + time_cache_.TestOnlySetNow(kTime1); stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("2") @@ -2428,9 +2707,13 @@ TEST_F(XdsClientTest, ResourceDeletion) { 1))); // Check CSDS data. csds = DumpCsds(); - EXPECT_THAT(csds.generic_xds_configs(), - ::testing::UnorderedElementsAre(CsdsResourceDoesNotExist( - XdsWildcardCapableResourceType::Get()->type_url(), "wc1"))); + EXPECT_THAT( + csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceEq( + ClientResourceStatus::DOES_NOT_EXIST, + XdsWildcardCapableResourceType::Get()->type_url(), "wc1", + CsdsNoResourceFields(), + CsdsErrorFields("does not exist", "2", TimestampProtoEq(kTime1))))); // Start a new watcher for the same resource. It should immediately // receive the same does-not-exist notification. auto watcher2 = StartWildcardCapableWatch("wc1"); @@ -2444,7 +2727,7 @@ TEST_F(XdsClientTest, ResourceDeletion) { /*resource_names=*/{"wc1"}); // Server sends the resource again. // We increment time to make sure that the CSDS data gets a new timestamp. - time_cache_.TestOnlySetNow(kTime1); + time_cache_.TestOnlySetNow(kTime2); stream->SendMessageToClient( ResponseBuilder(XdsWildcardCapableResourceType::Get()->type_url()) .set_version_info("3") @@ -2479,7 +2762,7 @@ TEST_F(XdsClientTest, ResourceDeletion) { EXPECT_THAT(csds.generic_xds_configs(), ::testing::UnorderedElementsAre(CsdsResourceAcked( XdsWildcardCapableResourceType::Get()->type_url(), "wc1", - resource->AsJsonString(), "3", TimestampProtoEq(kTime1)))); + resource->AsJsonString(), "3", TimestampProtoEq(kTime2)))); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); @@ -2493,11 +2776,10 @@ TEST_F(XdsClientTest, ResourceDeletion) { EXPECT_TRUE(stream->IsOrphaned()); } -// This tests that when we ignore resource deletions from the server -// when configured to do so. -TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) { - InitXdsClient(FakeXdsBootstrap::Builder().SetServers( - {FakeXdsBootstrap::FakeXdsServer(kDefaultXdsServerUrl, true)})); +// This tests that when we ignore resource deletions from the server by +// default. +TEST_F(XdsClientTest, ResourceDeletionIgnoredByDefault) { + InitXdsClient(); // Start a watch for "wc1". auto watcher = StartWildcardCapableWatch("wc1"); // Watcher should initially not see any resource reported. @@ -2562,9 +2844,12 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) { .set_version_info("2") .set_nonce("B") .Serialize()); - // Watcher should not see any update, since we should have ignored the + // Watcher should see an ambient error, since we should have ignored the // deletion. - EXPECT_TRUE(watcher->ExpectNoEvent()); + auto error = watcher->WaitForNextAmbientError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(*error, + absl::NotFoundError("does not exist (node ID:xds_client_test)")); // Check metric data. EXPECT_TRUE(metrics_reporter_->WaitForMetricsReporterData( ::testing::ElementsAre(::testing::Pair( @@ -2572,19 +2857,23 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) { XdsWildcardCapableResourceType::Get()->type_url()), 1)), ::testing::ElementsAre(), ::testing::_)); - EXPECT_THAT( - GetResourceCounts(), - ::testing::ElementsAre(::testing::Pair( - ResourceCountLabelsEq( - XdsClient::kOldStyleAuthority, - XdsWildcardCapableResourceType::Get()->type_url(), "acked"), - 1))); + EXPECT_THAT(GetResourceCounts(), + ::testing::ElementsAre(::testing::Pair( + ResourceCountLabelsEq( + XdsClient::kOldStyleAuthority, + XdsWildcardCapableResourceType::Get()->type_url(), + "does_not_exist_but_cached"), + 1))); // Check CSDS data. csds = DumpCsds(); - EXPECT_THAT(csds.generic_xds_configs(), - ::testing::UnorderedElementsAre(CsdsResourceAcked( - XdsWildcardCapableResourceType::Get()->type_url(), "wc1", - resource->AsJsonString(), "1", TimestampProtoEq(kTime0)))); + EXPECT_THAT( + csds.generic_xds_configs(), + ::testing::UnorderedElementsAre(CsdsResourceEq( + ClientResourceStatus::DOES_NOT_EXIST, + XdsWildcardCapableResourceType::Get()->type_url(), "wc1", + CsdsResourceFields(resource->AsJsonString(), "1", + TimestampProtoEq(kTime0)), + CsdsErrorFields("does not exist", "2", TimestampProtoEq(kTime1))))); // Start a new watcher for the same resource. It should immediately // receive the cached resource. auto watcher2 = StartWildcardCapableWatch("wc1"); @@ -2592,6 +2881,10 @@ TEST_F(XdsClientTest, ResourceDeletionIgnoredWhenConfigured) { ASSERT_NE(resource, nullptr); EXPECT_EQ(resource->name, "wc1"); EXPECT_EQ(resource->value, 6); + error = watcher2->WaitForNextAmbientError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(*error, + absl::NotFoundError("does not exist (node ID:xds_client_test)")); // XdsClient should have sent an ACK message to the xDS server. request = WaitForRequest(stream.get()); ASSERT_TRUE(request.has_value()); @@ -3074,7 +3367,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistUponTimeout) { // CSDS should show that the resource has been requested. csds = DumpCsds(); EXPECT_THAT(csds.generic_xds_configs(), - ::testing::ElementsAre(CsdsResourceDoesNotExist( + ::testing::ElementsAre(CsdsResourceDoesNotExistOnTimeout( XdsFooResourceType::Get()->type_url(), "foo1"))); // Start a new watcher for the same resource. It should immediately // receive the same does-not-exist notification. @@ -3218,7 +3511,7 @@ TEST_F(XdsClientTest, ResourceDoesNotExistAfterStreamRestart) { // CSDS should show that the resource has been requested. csds = DumpCsds(); EXPECT_THAT(csds.generic_xds_configs(), - ::testing::ElementsAre(CsdsResourceDoesNotExist( + ::testing::ElementsAre(CsdsResourceDoesNotExistOnTimeout( XdsFooResourceType::Get()->type_url(), "foo1"))); // Server now sends the requested resource. stream->SendMessageToClient( @@ -3315,7 +3608,7 @@ TEST_F(XdsClientTest, DoesNotExistTimerNotStartedUntilSendCompletes) { // CSDS should show that the resource has been requested. csds = DumpCsds(); EXPECT_THAT(csds.generic_xds_configs(), - ::testing::ElementsAre(CsdsResourceDoesNotExist( + ::testing::ElementsAre(CsdsResourceDoesNotExistOnTimeout( XdsFooResourceType::Get()->type_url(), "foo1"))); // Now server sends a response. stream->SendMessageToClient( diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index b356a02946949..3c3201c06e9bf 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -466,6 +466,7 @@ grpc_cc_test( "//:grpc", "//:grpc++", "//test/core/test_util:grpc_test_util", + "//test/core/test_util:scoped_env_var", "@envoy_api//envoy/extensions/filters/http/fault/v3:pkg_cc_proto", "@envoy_api//envoy/extensions/filters/http/router/v3:pkg_cc_proto", ], @@ -610,5 +611,6 @@ grpc_cc_test( "//:grpc", "//:grpc++", "//test/core/test_util:grpc_test_util", + "//test/core/test_util:scoped_env_var", ], ) diff --git a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc index 7758e03787bc3..f0ad5e9f0da72 100644 --- a/test/cpp/end2end/xds/xds_cluster_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_cluster_end2end_test.cc @@ -369,8 +369,9 @@ class CdsDeletionTest : public XdsEnd2endTest { INSTANTIATE_TEST_SUITE_P(XdsTest, CdsDeletionTest, ::testing::Values(XdsTestType()), &XdsTestType::Name); -// Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted. -TEST_P(CdsDeletionTest, ClusterDeleted) { +// Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted +// by default. +TEST_P(CdsDeletionTest, ClusterDeletedFailsByDefault) { InitClient(); CreateAndStartBackends(1); EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); @@ -390,7 +391,8 @@ TEST_P(CdsDeletionTest, ClusterDeleted) { EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); } -// Tests that we ignore Cluster deletions if configured to do so. +// Tests that we ignore Cluster deletions if ignore_resource_deletions +// is set. TEST_P(CdsDeletionTest, ClusterDeletionIgnored) { InitClient(MakeBootstrapBuilder().SetIgnoreResourceDeletion()); CreateAndStartBackends(2); @@ -426,6 +428,71 @@ TEST_P(CdsDeletionTest, ClusterDeletionIgnored) { WaitForAllBackends(DEBUG_LOCATION, 1, 2); } +// Tests that we ignore Cluster deletions by default if data error +// handling is enabled. +TEST_P(CdsDeletionTest, + ClusterDeletionIgnoredByDefaultWithDataErrorHandlingEnabled) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"); + InitClient(); + CreateAndStartBackends(2); + // Bring up client pointing to backend 0 and wait for it to connect. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForAllBackends(DEBUG_LOCATION, 0, 1); + // Make sure we ACKed the CDS update. + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Unset CDS resource and wait for client to ACK the update. + balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName); + const auto deadline = absl::Now() + absl::Seconds(30); + while (true) { + ASSERT_LT(absl::Now(), deadline) << "timed out waiting for CDS ACK"; + response_state = balancer_->ads_service()->cds_response_state(); + if (response_state.has_value()) break; + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Make sure we can still send RPCs. + CheckRpcSendOk(DEBUG_LOCATION); + // Now recreate the CDS resource pointing to a new EDS resource that + // specified backend 1, and make sure the client uses it. + const char* kNewEdsResourceName = "new_eds_resource_name"; + auto cluster = default_cluster_; + cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName); + balancer_->ads_service()->SetCdsResource(cluster); + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsResourceName)); + // Wait for client to start using backend 1. + WaitForAllBackends(DEBUG_LOCATION, 1, 2); +} + +// Tests that we go into TRANSIENT_FAILURE if the Cluster is deleted +// if data error handling is enabled and fail_on_data_errors is set. +TEST_P(CdsDeletionTest, + ClusterDeletedFailsWithDataErrorHandlingEnabledWithFailOnDataErrors) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"); + InitClient(MakeBootstrapBuilder().SetFailOnDataErrors()); + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // We need to wait for all backends to come online. + WaitForAllBackends(DEBUG_LOCATION); + // Unset CDS resource. + balancer_->ads_service()->UnsetResource(kCdsTypeUrl, kDefaultClusterName); + // Wait for RPCs to start failing. + SendRpcsUntilFailure( + DEBUG_LOCATION, StatusCode::UNAVAILABLE, + absl::StrCat("CDS resource ", kDefaultClusterName, + ": does not exist \\(node ID:xds_end2end_test\\)")); + // Make sure we ACK'ed the update. + auto response_state = balancer_->ads_service()->cds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); +} + // // EDS tests // diff --git a/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc b/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc index 0f50dd22bdb11..3ebaa8231b89a 100644 --- a/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_enabled_server_end2end_test.cc @@ -39,6 +39,7 @@ #include "src/proto/grpc/testing/echo.pb.h" #include "test/core/test_util/port.h" #include "test/core/test_util/resolve_localhost_ip46.h" +#include "test/core/test_util/scoped_env_var.h" #include "test/core/test_util/test_config.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" @@ -91,7 +92,35 @@ TEST_P(XdsEnabledServerTest, Basic) { WaitForBackend(DEBUG_LOCATION, 0); } -TEST_P(XdsEnabledServerTest, ListenerDeletionIgnored) { +TEST_P(XdsEnabledServerTest, ListenerDeletionFailsByDefault) { + DoSetUp(); + StartBackend(0); + ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); + WaitForBackend(DEBUG_LOCATION, 0); + // Check that we ACKed. + // TODO(roth): There may be multiple entries in the resource state response + // queue, because the client doesn't necessarily subscribe to all resources + // in a single message, and the server currently (I suspect incorrectly?) + // thinks that each subscription message is an ACK. So for now, we + // drain the entire LDS resource state response queue, ensuring that + // all responses are ACKs. Need to look more closely at the protocol + // semantics here and make sure the server is doing the right thing, + // in which case we may be able to avoid this. + while (true) { + auto response_state = balancer_->ads_service()->lds_response_state(); + if (!response_state.has_value()) break; + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + } + // Now unset the resource. + balancer_->ads_service()->UnsetResource( + kLdsTypeUrl, GetServerListenerName(backends_[0]->port())); + // Server should stop serving. + ASSERT_TRUE( + backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::NOT_FOUND)); +} + +TEST_P(XdsEnabledServerTest, ListenerDeletionIgnoredIfConfigured) { DoSetUp(MakeBootstrapBuilder().SetIgnoreResourceDeletion()); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); @@ -131,6 +160,80 @@ TEST_P(XdsEnabledServerTest, ListenerDeletionIgnored) { CheckRpcSendOk(DEBUG_LOCATION); } +TEST_P(XdsEnabledServerTest, + ListenerDeletionFailsWithFailOnDataErrorsIfEnabled) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"); + DoSetUp(MakeBootstrapBuilder().SetFailOnDataErrors()); + StartBackend(0); + ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); + WaitForBackend(DEBUG_LOCATION, 0); + // Check that we ACKed. + // TODO(roth): There may be multiple entries in the resource state response + // queue, because the client doesn't necessarily subscribe to all resources + // in a single message, and the server currently (I suspect incorrectly?) + // thinks that each subscription message is an ACK. So for now, we + // drain the entire LDS resource state response queue, ensuring that + // all responses are ACKs. Need to look more closely at the protocol + // semantics here and make sure the server is doing the right thing, + // in which case we may be able to avoid this. + while (true) { + auto response_state = balancer_->ads_service()->lds_response_state(); + if (!response_state.has_value()) break; + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + } + // Now unset the resource. + balancer_->ads_service()->UnsetResource( + kLdsTypeUrl, GetServerListenerName(backends_[0]->port())); + // Server should stop serving. + ASSERT_TRUE( + backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::NOT_FOUND)); +} + +TEST_P(XdsEnabledServerTest, + ListenerDeletionIgnoredByDefaultIfDataErrorHandlingEnabled) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"); + DoSetUp(); + StartBackend(0); + ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); + WaitForBackend(DEBUG_LOCATION, 0); + // Check that we ACKed. + // TODO(roth): There may be multiple entries in the resource state response + // queue, because the client doesn't necessarily subscribe to all resources + // in a single message, and the server currently (I suspect incorrectly?) + // thinks that each subscription message is an ACK. So for now, we + // drain the entire LDS resource state response queue, ensuring that + // all responses are ACKs. Need to look more closely at the protocol + // semantics here and make sure the server is doing the right thing, + // in which case we may be able to avoid this. + while (true) { + auto response_state = balancer_->ads_service()->lds_response_state(); + if (!response_state.has_value()) break; + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + } + // Now unset the resource. + balancer_->ads_service()->UnsetResource( + kLdsTypeUrl, GetServerListenerName(backends_[0]->port())); + // Wait for update to be ACKed. + absl::Time deadline = + absl::Now() + (absl::Seconds(10) * grpc_test_slowdown_factor()); + while (true) { + auto response_state = balancer_->ads_service()->lds_response_state(); + if (!response_state.has_value()) { + gpr_sleep_until(grpc_timeout_seconds_to_deadline(1)); + continue; + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + ASSERT_LT(absl::Now(), deadline); + break; + } + // Make sure server is still serving. + CheckRpcSendOk(DEBUG_LOCATION); +} + // Testing just one example of an invalid resource here. // Unit tests for XdsListenerResourceType have exhaustive tests for all // of the invalid cases. @@ -188,8 +291,6 @@ TEST_P(XdsEnabledServerTest, ListenerAddressMismatch) { class XdsEnabledServerStatusNotificationTest : public XdsEnabledServerTest { protected: - void SetUp() override { DoSetUp(); } - void SetValidLdsUpdate() { SetServerListenerNameAndRouteConfiguration( balancer_.get(), default_server_listener_, backends_[0]->port(), @@ -213,12 +314,14 @@ INSTANTIATE_TEST_SUITE_P(XdsTest, XdsEnabledServerStatusNotificationTest, ::testing::Values(XdsTestType()), &XdsTestType::Name); TEST_P(XdsEnabledServerStatusNotificationTest, ServingStatus) { + DoSetUp(); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_wait_for_ready(true)); } TEST_P(XdsEnabledServerStatusNotificationTest, NotServingStatus) { + DoSetUp(); SetInvalidLdsUpdate(); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange( @@ -229,6 +332,7 @@ TEST_P(XdsEnabledServerStatusNotificationTest, NotServingStatus) { } TEST_P(XdsEnabledServerStatusNotificationTest, ErrorUpdateWhenAlreadyServing) { + DoSetUp(); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); CheckRpcSendOk(DEBUG_LOCATION, 1, RpcOptions().set_wait_for_ready(true)); @@ -248,6 +352,7 @@ TEST_P(XdsEnabledServerStatusNotificationTest, ErrorUpdateWhenAlreadyServing) { TEST_P(XdsEnabledServerStatusNotificationTest, NotServingStatusToServingStatusTransition) { + DoSetUp(); SetInvalidLdsUpdate(); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange( @@ -265,6 +370,7 @@ TEST_P(XdsEnabledServerStatusNotificationTest, // results in future connections being dropped. TEST_P(XdsEnabledServerStatusNotificationTest, ServingStatusToNonServingStatusTransition) { + DoSetUp(MakeBootstrapBuilder().SetFailOnDataErrors()); SetValidLdsUpdate(); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); @@ -280,6 +386,7 @@ TEST_P(XdsEnabledServerStatusNotificationTest, } TEST_P(XdsEnabledServerStatusNotificationTest, RepeatedServingStatusChanges) { + DoSetUp(MakeBootstrapBuilder().SetFailOnDataErrors()); StartBackend(0); for (int i = 0; i < 5; ++i) { // Send a valid LDS update to get the server to start listening @@ -298,6 +405,7 @@ TEST_P(XdsEnabledServerStatusNotificationTest, RepeatedServingStatusChanges) { } TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsOnResourceDeletion) { + DoSetUp(MakeBootstrapBuilder().SetFailOnDataErrors()); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); constexpr int kNumChannels = 10; @@ -352,6 +460,7 @@ TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsOnResourceDeletion) { TEST_P(XdsEnabledServerStatusNotificationTest, ExistingRpcsFailOnResourceUpdateAfterDrainGraceTimeExpires) { + DoSetUp(); constexpr int kDrainGraceTimeMs = 100; xds_drain_grace_time_ms_ = kDrainGraceTimeMs; StartBackend(0); @@ -817,10 +926,9 @@ TEST_P(XdsServerRdsTest, NonInlineRouteConfigurationNotAvailable) { default_server_route_config_); StartBackend(0); ASSERT_TRUE(backends_[0]->WaitOnServingStatusChange(grpc::StatusCode::OK)); - CheckRpcSendFailure( - DEBUG_LOCATION, StatusCode::UNAVAILABLE, - "RDS resource unknown_server_route_config: does not exist " - "\\(node ID:xds_end2end_test\\)"); + CheckRpcSendFailure(DEBUG_LOCATION, StatusCode::UNAVAILABLE, + "RDS resource unknown_server_route_config: " + "does not exist \\(node ID:xds_end2end_test\\)"); } // TODO(yashykt): Once https://github.com/grpc/grpc/issues/24035 is fixed, we diff --git a/test/cpp/end2end/xds/xds_routing_end2end_test.cc b/test/cpp/end2end/xds/xds_routing_end2end_test.cc index 4cd34b9bd2026..4d5650327b9bc 100644 --- a/test/cpp/end2end/xds/xds_routing_end2end_test.cc +++ b/test/cpp/end2end/xds/xds_routing_end2end_test.cc @@ -25,6 +25,7 @@ #include "envoy/extensions/filters/http/router/v3/router.pb.h" #include "src/core/client_channel/backup_poller.h" #include "src/core/config/config_vars.h" +#include "test/core/test_util/scoped_env_var.h" #include "test/cpp/end2end/xds/xds_end2end_test_lib.h" namespace grpc { @@ -89,8 +90,9 @@ class LdsDeletionTest : public XdsEnd2endTest { INSTANTIATE_TEST_SUITE_P(XdsTest, LdsDeletionTest, ::testing::Values(XdsTestType()), &XdsTestType::Name); -// Tests that we go into TRANSIENT_FAILURE if the Listener is deleted. -TEST_P(LdsDeletionTest, ListenerDeleted) { +// Tests that we go into TRANSIENT_FAILURE if the Listener is deleted +// by default. +TEST_P(LdsDeletionTest, ListenerDeletedFailsByDefault) { InitClient(); CreateAndStartBackends(1); EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); @@ -110,7 +112,7 @@ TEST_P(LdsDeletionTest, ListenerDeleted) { EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); } -// Tests that we ignore Listener deletions if configured to do so. +// Tests that we ignore Listener deletions with ignore_resource_deletion. TEST_P(LdsDeletionTest, ListenerDeletionIgnored) { InitClient(MakeBootstrapBuilder().SetIgnoreResourceDeletion()); CreateAndStartBackends(2); @@ -157,6 +159,81 @@ TEST_P(LdsDeletionTest, ListenerDeletionIgnored) { WaitForAllBackends(DEBUG_LOCATION, 1, 2); } +// Tests that we ignore Listener deletions by default when data error +// handling is enabled. +TEST_P(LdsDeletionTest, + ListenerDeletionIgnoredByDefaultWhenDataErrorHandlingEnabled) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"); + InitClient(); + CreateAndStartBackends(2); + // Bring up client pointing to backend 0 and wait for it to connect. + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends(0, 1)}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + WaitForAllBackends(DEBUG_LOCATION, 0, 1); + // Make sure we ACKed the LDS update. + auto response_state = balancer_->ads_service()->lds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Unset LDS resource and wait for client to ACK the update. + balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName); + const auto deadline = + absl::Now() + (absl::Seconds(30) * grpc_test_slowdown_factor()); + while (true) { + ASSERT_LT(absl::Now(), deadline) << "timed out waiting for LDS ACK"; + response_state = balancer_->ads_service()->lds_response_state(); + if (response_state.has_value()) break; + absl::SleepFor(absl::Seconds(1) * grpc_test_slowdown_factor()); + } + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); + // Make sure we can still send RPCs. + CheckRpcSendOk(DEBUG_LOCATION); + // Now recreate the LDS resource pointing to a different CDS and EDS + // resource, pointing to backend 1, and make sure the client uses it. + const char* kNewClusterName = "new_cluster_name"; + const char* kNewEdsResourceName = "new_eds_resource_name"; + auto cluster = default_cluster_; + cluster.set_name(kNewClusterName); + cluster.mutable_eds_cluster_config()->set_service_name(kNewEdsResourceName); + balancer_->ads_service()->SetCdsResource(cluster); + args = EdsResourceArgs({{"locality0", CreateEndpointsForBackends(1, 2)}}); + balancer_->ads_service()->SetEdsResource( + BuildEdsResource(args, kNewEdsResourceName)); + RouteConfiguration new_route_config = default_route_config_; + new_route_config.mutable_virtual_hosts(0) + ->mutable_routes(0) + ->mutable_route() + ->set_cluster(kNewClusterName); + SetListenerAndRouteConfiguration(balancer_.get(), default_listener_, + new_route_config); + // Wait for client to start using backend 1. + WaitForAllBackends(DEBUG_LOCATION, 1, 2); +} + +// Tests that we go into TRANSIENT_FAILURE if the Listener is deleted +// when data error handling is enabled and fail_on_data_errors is set. +TEST_P(LdsDeletionTest, ListenerDeletedFailsWithFailOnDataErrors) { + grpc_core::testing::ScopedExperimentalEnvVar env( + "GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING"); + InitClient(MakeBootstrapBuilder().SetFailOnDataErrors()); + CreateAndStartBackends(1); + EdsResourceArgs args({{"locality0", CreateEndpointsForBackends()}}); + balancer_->ads_service()->SetEdsResource(BuildEdsResource(args)); + // We need to wait for all backends to come online. + WaitForAllBackends(DEBUG_LOCATION); + // Unset LDS resource. + balancer_->ads_service()->UnsetResource(kLdsTypeUrl, kServerName); + // Wait for RPCs to start failing. + SendRpcsUntilFailure( + DEBUG_LOCATION, StatusCode::UNAVAILABLE, + absl::StrCat("empty address list \\(LDS resource ", kServerName, + ": does not exist \\(node ID:xds_end2end_test\\)\\)")); + // Make sure we ACK'ed the update. + auto response_state = balancer_->ads_service()->lds_response_state(); + ASSERT_TRUE(response_state.has_value()); + EXPECT_EQ(response_state->state, AdsServiceImpl::ResponseState::ACKED); +} + using LdsRdsInteractionTest = XdsEnd2endTest; INSTANTIATE_TEST_SUITE_P( diff --git a/test/cpp/end2end/xds/xds_utils.cc b/test/cpp/end2end/xds/xds_utils.cc index 1608099a9dd76..c4315d7483474 100644 --- a/test/cpp/end2end/xds/xds_utils.cc +++ b/test/cpp/end2end/xds/xds_utils.cc @@ -90,6 +90,9 @@ std::string XdsBootstrapBuilder::MakeXdsServersText( " \"server_features\": []\n" " }"; std::vector server_features; + if (fail_on_data_errors_) { + server_features.push_back("\"fail_on_data_errors\""); + } if (ignore_resource_deletion_) { server_features.push_back("\"ignore_resource_deletion\""); } diff --git a/test/cpp/end2end/xds/xds_utils.h b/test/cpp/end2end/xds/xds_utils.h index b8d13ee436cce..114edc0e3d1c6 100644 --- a/test/cpp/end2end/xds/xds_utils.h +++ b/test/cpp/end2end/xds/xds_utils.h @@ -38,6 +38,10 @@ class XdsBootstrapBuilder { ignore_resource_deletion_ = true; return *this; } + XdsBootstrapBuilder& SetFailOnDataErrors() { + fail_on_data_errors_ = true; + return *this; + } XdsBootstrapBuilder& SetTrustedXdsServer() { trusted_xds_server_ = true; return *this; @@ -103,6 +107,7 @@ class XdsBootstrapBuilder { std::string MakeAuthorityText(); bool ignore_resource_deletion_ = false; + bool fail_on_data_errors_ = false; bool trusted_xds_server_ = false; std::vector servers_; std::string xds_channel_creds_type_ = "fake";