Skip to content

Commit

Permalink
[xDS] replace "ignore_resource_deletion" server feature with "fail_on…
Browse files Browse the repository at this point in the history
…_data_errors" (grpc#38278)

Another piece of gRFC A88 (grpc/proposal#466).

Note that even with the feature disabled, this changes the way that ignored resource deletions are reflected in CSDS and XdsClient metrics.

As a side effect, also fixes one of the edge cases described in grpc#38094.

Closes grpc#38278

COPYBARA_INTEGRATE_REVIEW=grpc#38278 from markdroth:xds_client_fail_on_data_errors 7d31238
PiperOrigin-RevId: 713348335
  • Loading branch information
markdroth authored and pawbhard committed Jan 9, 2025
1 parent f2f5c07 commit 16c5311
Show file tree
Hide file tree
Showing 15 changed files with 774 additions and 192 deletions.
2 changes: 2 additions & 0 deletions build_autogenerated.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions src/core/xds/grpc/xds_server_grpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ namespace {
constexpr absl::string_view kServerFeatureIgnoreResourceDeletion =
"ignore_resource_deletion";

constexpr absl::string_view kServerFeatureFailOnDataErrors =
"fail_on_data_errors";

constexpr absl::string_view kServerFeatureTrustedXdsServer =
"trusted_xds_server";

Expand All @@ -46,6 +49,11 @@ bool GrpcXdsServer::IgnoreResourceDeletion() const {
kServerFeatureIgnoreResourceDeletion)) != server_features_.end();
}

bool GrpcXdsServer::FailOnDataErrors() const {
return server_features_.find(std::string(kServerFeatureFailOnDataErrors)) !=
server_features_.end();
}

bool GrpcXdsServer::TrustedXdsServer() const {
return server_features_.find(std::string(kServerFeatureTrustedXdsServer)) !=
server_features_.end();
Expand Down Expand Up @@ -126,6 +134,7 @@ void GrpcXdsServer::JsonPostLoad(const Json& json, const JsonArgs& args,
for (const Json& feature_json : array) {
if (feature_json.type() == Json::Type::kString &&
(feature_json.string() == kServerFeatureIgnoreResourceDeletion ||
feature_json.string() == kServerFeatureFailOnDataErrors ||
feature_json.string() == kServerFeatureTrustedXdsServer)) {
server_features_.insert(feature_json.string());
}
Expand Down
1 change: 1 addition & 0 deletions src/core/xds/grpc/xds_server_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class GrpcXdsServer final : public XdsBootstrap::XdsServer {
const std::string& server_uri() const override { return server_uri_; }

bool IgnoreResourceDeletion() const override;
bool FailOnDataErrors() const override;

bool TrustedXdsServer() const;

Expand Down
9 changes: 9 additions & 0 deletions src/core/xds/xds_client/xds_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,13 @@ bool XdsFederationEnabled() {
return parse_succeeded && parsed_value;
}

// TODO(roth): Remove this once the feature passes interop tests.
bool XdsDataErrorHandlingEnabled() {
auto value = GetEnv("GRPC_EXPERIMENTAL_XDS_DATA_ERROR_HANDLING");
if (!value.has_value()) return false;
bool parsed_value;
bool parse_succeeded = gpr_parse_bool_value(value->c_str(), &parsed_value);
return parse_succeeded && parsed_value;
}

} // namespace grpc_core
6 changes: 6 additions & 0 deletions src/core/xds/xds_client/xds_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
namespace grpc_core {

bool XdsFederationEnabled();
bool XdsDataErrorHandlingEnabled();

class XdsBootstrap {
public:
Expand All @@ -46,8 +47,13 @@ class XdsBootstrap {
virtual ~XdsServer() = default;

virtual const std::string& server_uri() const = 0;

// TODO(roth): Remove this method once the data error handling
// feature passes interop tests.
virtual bool IgnoreResourceDeletion() const = 0;

virtual bool FailOnDataErrors() const = 0;

virtual bool Equals(const XdsServer& other) const = 0;

// Returns a key to be used for uniquely identifying this XdsServer.
Expand Down
167 changes: 77 additions & 90 deletions src/core/xds/xds_client/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ class XdsClient::XdsChannel::AdsCall final
name_.authority, type_->type_url(), name_.key)
<< "} from xds server";
resource_seen_ = true;
state.SetDoesNotExist();
state.SetDoesNotExistOnTimeout();
ads_call_->xds_client()->NotifyWatchersOnResourceChanged(
absl::NotFoundError("does not exist"), state.watchers(),
state.failed_status(), state.watchers(),
ReadDelayHandle::NoWait());
}
}
Expand Down Expand Up @@ -978,8 +978,8 @@ void XdsClient::XdsChannel::AdsCall::ParseResource(
if (authority_it == xds_client()->authority_state_map_.end()) {
return; // Skip resource -- we don't have a subscription for it.
}
// Found authority, so look up type.
AuthorityState& authority_state = authority_it->second;
// Found authority, so look up type.
auto type_it = authority_state.resource_map.find(context->type);
if (type_it == authority_state.resource_map.end()) {
return; // Skip resource -- we don't have a subscription for it.
Expand All @@ -996,32 +996,17 @@ void XdsClient::XdsChannel::AdsCall::ParseResource(
context->resources_seen[parsed_resource_name->authority].insert(
parsed_resource_name->key);
}
// If we previously ignored the resource's deletion, log that we're
// now re-adding it.
if (resource_state.ignored_deletion()) {
LOG(INFO) << "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": server returned new version of resource for which we "
"previously ignored a deletion: type "
<< type_url << " name " << resource_name;
resource_state.set_ignored_deletion(false);
}
// Update resource state based on whether the resource is valid.
absl::Status status = absl::InvalidArgumentError(
absl::StrCat("invalid resource: ", decode_status.ToString()));
if (!decode_status.ok()) {
if (!resource_state.HasResource()) {
xds_client()->NotifyWatchersOnResourceChanged(std::move(status),
resource_state.watchers(),
context->read_delay_handle);
} else {
xds_client()->NotifyWatchersOnAmbientError(std::move(status),
resource_state.watchers(),
context->read_delay_handle);
}
resource_state.SetNacked(context->version, decode_status.ToString(),
context->update_time);
++context->num_invalid_resources;
// If the fail_on_data_errors server feature is present, drop the
// existing cached resource, if any.
const bool drop_cached_resource = XdsDataErrorHandlingEnabled() &&
xds_channel()->server_.FailOnDataErrors();
resource_state.SetNacked(context->version, decode_status.message(),
context->update_time, drop_cached_resource);
xds_client()->NotifyWatchersOnError(resource_state,
context->read_delay_handle);
return;
}
// Resource is valid.
Expand Down Expand Up @@ -1216,23 +1201,14 @@ void XdsClient::XdsChannel::AdsCall::OnRecvMessage(absl::string_view payload) {
// that the resource does not exist. For that case, we rely on
// the request timeout instead.
if (!resource_state.HasResource()) continue;
if (xds_channel()->server_.IgnoreResourceDeletion()) {
if (!resource_state.ignored_deletion()) {
LOG(ERROR)
<< "[xds_client " << xds_client() << "] xds server "
<< xds_channel()->server_.server_uri()
<< ": ignoring deletion for resource type "
<< context.type_url << " name "
<< XdsClient::ConstructFullXdsResourceName(
authority, context.type_url.c_str(), resource_key);
resource_state.set_ignored_deletion(true);
}
} else {
resource_state.SetDoesNotExist();
xds_client()->NotifyWatchersOnResourceChanged(
absl::NotFoundError("does not exist"),
resource_state.watchers(), context.read_delay_handle);
}
const bool drop_cached_resource =
XdsDataErrorHandlingEnabled()
? xds_channel()->server_.FailOnDataErrors()
: !xds_channel()->server_.IgnoreResourceDeletion();
resource_state.SetDoesNotExistOnLdsOrCdsDeletion(
context.version, context.update_time, drop_cached_resource);
xds_client()->NotifyWatchersOnError(resource_state,
context.read_delay_handle);
}
}
}
Expand Down Expand Up @@ -1329,31 +1305,50 @@ void XdsClient::ResourceState::SetAcked(
update_time_ = update_time;
version_ = std::move(version);
failed_version_.clear();
failed_details_.clear();
failed_status_ = absl::OkStatus();
}

void XdsClient::ResourceState::SetNacked(const std::string& version,
const std::string& details,
Timestamp update_time) {
absl::string_view details,
Timestamp update_time,
bool drop_cached_resource) {
if (drop_cached_resource) {
resource_.reset();
serialized_proto_.clear();
}
client_status_ = ClientResourceStatus::NACKED;
failed_version_ = version;
failed_details_ = details;
failed_status_ =
absl::InvalidArgumentError(absl::StrCat("invalid resource: ", details));
failed_update_time_ = update_time;
}

void XdsClient::ResourceState::SetDoesNotExist() {
resource_.reset();
serialized_proto_.clear();
void XdsClient::ResourceState::SetDoesNotExistOnTimeout() {
client_status_ = ClientResourceStatus::DOES_NOT_EXIST;
failed_status_ = absl::NotFoundError("does not exist");
failed_version_.clear();
}

void XdsClient::ResourceState::SetDoesNotExistOnLdsOrCdsDeletion(
const std::string& version, Timestamp update_time,
bool drop_cached_resource) {
if (drop_cached_resource) {
resource_.reset();
serialized_proto_.clear();
}
client_status_ = ClientResourceStatus::DOES_NOT_EXIST;
failed_status_ = absl::NotFoundError("does not exist");
failed_version_ = version;
failed_update_time_ = update_time;
}

absl::string_view XdsClient::ResourceState::CacheStateString() const {
switch (client_status_) {
case ClientResourceStatus::REQUESTED:
return "requested";
case ClientResourceStatus::DOES_NOT_EXIST:
return "does_not_exist";
return resource_ != nullptr ? "does_not_exist_but_cached"
: "does_not_exist";
case ClientResourceStatus::ACKED:
return "acked";
case ClientResourceStatus::NACKED:
Expand Down Expand Up @@ -1395,14 +1390,16 @@ void XdsClient::ResourceState::FillGenericXdsConfig(
google_protobuf_Any_set_value(any_field,
StdStringToUpbString(serialized_proto_));
}
if (client_status_ == ClientResourceStatus::NACKED) {
if (!failed_status_.ok()) {
auto* update_failure_state = envoy_admin_v3_UpdateFailureState_new(arena);
envoy_admin_v3_UpdateFailureState_set_details(
update_failure_state, StdStringToUpbString(failed_details_));
envoy_admin_v3_UpdateFailureState_set_version_info(
update_failure_state, StdStringToUpbString(failed_version_));
envoy_admin_v3_UpdateFailureState_set_last_update_attempt(
update_failure_state, EncodeTimestamp(failed_update_time_, arena));
update_failure_state, StdStringToUpbString(failed_status_.message()));
if (!failed_version_.empty()) {
envoy_admin_v3_UpdateFailureState_set_version_info(
update_failure_state, StdStringToUpbString(failed_version_));
envoy_admin_v3_UpdateFailureState_set_last_update_attempt(
update_failure_state, EncodeTimestamp(failed_update_time_, arena));
}
envoy_service_status_v3_ClientConfig_GenericXdsConfig_set_error_state(
entry, update_failure_state);
}
Expand Down Expand Up @@ -1527,7 +1524,6 @@ void XdsClient::WatchResource(const XdsResourceType* type,
bool first_watcher_for_resource = it_is_new.second;
ResourceState& resource_state = it_is_new.first->second;
resource_state.AddWatcher(watcher);
bool notified_watcher = false;
if (first_watcher_for_resource) {
// We try to add new channels in 2 cases:
// - This is the first resource for this authority (i.e., the list
Expand Down Expand Up @@ -1563,26 +1559,13 @@ void XdsClient::WatchResource(const XdsResourceType* type,
<< name;
NotifyWatchersOnResourceChanged(resource_state.resource(), {watcher},
ReadDelayHandle::NoWait());
notified_watcher = true;
} else if (resource_state.client_status() ==
ResourceState::ClientResourceStatus::DOES_NOT_EXIST) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << this
<< "] reporting cached does-not-exist for " << name;
NotifyWatchersOnResourceChanged(absl::NotFoundError("does not exist"),
{watcher}, ReadDelayHandle::NoWait());
notified_watcher = true;
} else if (resource_state.client_status() ==
ResourceState::ClientResourceStatus::NACKED) {
}
if (!resource_state.failed_status().ok()) {
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << this
<< "] reporting cached validation failure for " << name << ": "
<< resource_state.failed_details();
NotifyWatchersOnResourceChanged(
absl::InvalidArgumentError(absl::StrCat(
"invalid resource: ", resource_state.failed_details())),
{watcher}, ReadDelayHandle::NoWait());
notified_watcher = true;
<< "[xds_client " << this << "] returning cached error for " << name
<< ": " << resource_state.failed_status();
NotifyWatchersOnError(resource_state, ReadDelayHandle::NoWait(),
{watcher});
}
}
// If the channel is not connected, report an error to the watcher.
Expand All @@ -1591,13 +1574,8 @@ void XdsClient::WatchResource(const XdsResourceType* type,
GRPC_TRACE_LOG(xds_client, INFO)
<< "[xds_client " << this << "] returning cached channel error for "
<< name << ": " << channel_status;
if (notified_watcher) {
NotifyWatchersOnAmbientError(std::move(channel_status), {watcher},
ReadDelayHandle::NoWait());
} else {
NotifyWatchersOnResourceChanged(std::move(channel_status), {watcher},
ReadDelayHandle::NoWait());
}
NotifyWatchersOnError(resource_state, ReadDelayHandle::NoWait(),
{watcher}, std::move(channel_status));
}
}
work_serializer_.DrainQueue();
Expand Down Expand Up @@ -1629,12 +1607,6 @@ void XdsClient::CancelResourceWatch(const XdsResourceType* type,
resource_state.RemoveWatcher(watcher);
// Clean up empty map entries, if any.
if (!resource_state.HasWatchers()) {
if (resource_state.ignored_deletion()) {
LOG(INFO) << "[xds_client " << this
<< "] unsubscribing from a resource for which we "
<< "previously ignored a deletion: type " << type->type_url()
<< " name " << name;
}
for (const auto& xds_channel : authority_state.xds_channels) {
xds_channel->UnsubscribeLocked(type, *resource_name,
delay_unsubscription);
Expand Down Expand Up @@ -1757,6 +1729,21 @@ void XdsClient::NotifyWatchersOnAmbientError(
DEBUG_LOCATION);
}

void XdsClient::NotifyWatchersOnError(
const ResourceState& resource_state,
RefCountedPtr<ReadDelayHandle> read_delay_handle, WatcherSet watchers,
absl::Status status) {
if (watchers.empty()) watchers = resource_state.watchers();
if (status.ok()) status = resource_state.failed_status();
if (!resource_state.HasResource()) {
NotifyWatchersOnResourceChanged(std::move(status), std::move(watchers),
std::move(read_delay_handle));
} else {
NotifyWatchersOnAmbientError(std::move(status), std::move(watchers),
std::move(read_delay_handle));
}
}

void XdsClient::DumpClientConfig(
std::set<std::string>* string_pool, upb_Arena* arena,
envoy_service_status_v3_ClientConfig* client_config) {
Expand Down
Loading

0 comments on commit 16c5311

Please sign in to comment.