Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for response builders on IRequestConfiguration #134

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"sdk": {
"version": "8.0.100",
"rollForward": "latestMajor",
"rollForward": "latestMinor",
"allowPrerelease": false
}
}
41 changes: 32 additions & 9 deletions src/Elastic.Transport/Components/Pipeline/RequestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Collections.Specialized;
using System.Security.Cryptography.X509Certificates;
using Elastic.Transport.Extensions;
using Elastic.Transport.Products;

namespace Elastic.Transport;

Expand Down Expand Up @@ -41,9 +40,6 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
ProxyPassword = global.ProxyPassword;
DisableAutomaticProxyDetection = global.DisableAutomaticProxyDetection;
UserAgent = global.UserAgent;
ResponseBuilders = global.ResponseBuilders;
ProductResponseBuilders = global.ProductRegistration.ResponseBuilders;

KeepAliveInterval = (int)(global.KeepAliveInterval?.TotalMilliseconds ?? 2000);
KeepAliveTime = (int)(global.KeepAliveTime?.TotalMilliseconds ?? 2000);
RunAs = local?.RunAs ?? global.RunAs;
Expand Down Expand Up @@ -90,13 +86,36 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
Headers ??= [];
Headers.Add(OpaqueIdHeader, local.OpaqueId);
}
}

/// <inheritdoc cref="ITransportConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ProductResponseBuilders { get; }
// If there are builders set at the transport level and on the request config, we combine them,
// prioritising the request config response builders as most specific.
if (local is not null && local.ResponseBuilders.Count > 0 && global.ResponseBuilders.Count > 0)
{
var builders = new IResponseBuilder[local.ResponseBuilders.Count + global.ResponseBuilders.Count];

/// <inheritdoc cref="ITransportConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
var counter = 0;
foreach (var builder in local.ResponseBuilders)
{
builders[counter++] = builder;
}
foreach (var builder in global.ResponseBuilders)
{
builders[counter++] = builder;
}

ResponseBuilders = builders;
}
else if (local is not null && local.ResponseBuilders.Count > 0)
{
ResponseBuilders = local.ResponseBuilders;
}
else
{
ResponseBuilders = global.ResponseBuilders;
}

ProductResponseBuilders = global.ProductRegistration.ResponseBuilders;
}

/// <inheritdoc cref="ITransportConfiguration.MemoryStreamFactory"/>
public MemoryStreamFactory MemoryStreamFactory { get; }
Expand Down Expand Up @@ -168,4 +187,8 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
public bool DisableSniff { get; }
/// <inheritdoc cref="IRequestConfiguration.DisablePings"/>
public bool DisablePings { get; }
/// <inheritdoc cref="IRequestConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ProductResponseBuilders { get; }
/// <inheritdoc cref="IRequestConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
}

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
Expand All @@ -218,8 +218,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
}

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
Expand All @@ -208,8 +208,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
throw;
}
}
Expand Down
155 changes: 155 additions & 0 deletions src/Elastic.Transport/Configuration/IRequestConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Security.Cryptography.X509Certificates;

namespace Elastic.Transport;

/// <summary>
/// Allows you to inject per request overrides to the current <see cref="ITransportConfiguration"/>.
/// </summary>
public interface IRequestConfiguration
{
/// <summary>
/// Force a different Accept header on the request
/// </summary>
string? Accept { get; }

/// <summary>
/// Treat the following statuses (on top of the 200 range) NOT as error.
/// </summary>
IReadOnlyCollection<int>? AllowedStatusCodes { get; }

/// <summary> Provide an authentication header override for this request </summary>
AuthorizationHeader? Authentication { get; }

/// <summary>
/// Use the following client certificates to authenticate this single request
/// </summary>
X509CertificateCollection? ClientCertificates { get; }

/// <summary>
/// Force a different Content-Type header on the request
/// </summary>
string? ContentType { get; }

/// <summary>
/// Whether to buffer the request and response bytes for the call
/// </summary>
bool? DisableDirectStreaming { get; }

/// <summary>
/// Whether to disable the audit trail for the request.
/// </summary>
bool? DisableAuditTrail { get; }

/// <summary>
/// Under no circumstance do a ping before the actual call. If a node was previously dead a small ping with
/// low connect timeout will be tried first in normal circumstances
/// </summary>
bool? DisablePings { get; }

/// <summary>
/// Forces no sniffing to occur on the request no matter what configuration is in place
/// globally
/// </summary>
bool? DisableSniff { get; }

/// <summary>
/// Whether or not this request should be pipelined. http://en.wikipedia.org/wiki/HTTP_pipelining defaults to true
/// </summary>
bool? HttpPipeliningEnabled { get; }

/// <summary>
/// Enable gzip compressed requests and responses
/// </summary>
bool? EnableHttpCompression { get; }

/// <summary>
/// This will force the operation on the specified node, this will bypass any configured connection pool and will no retry.
/// </summary>
Uri? ForceNode { get; }

/// <summary>
/// When a retryable exception occurs or status code is returned this controls the maximum
/// amount of times we should retry the call to Elasticsearch
/// </summary>
int? MaxRetries { get; }

/// <summary>
/// Limits the total runtime including retries separately from <see cref="IRequestConfiguration.RequestTimeout" />
/// <pre>
/// When not specified defaults to <see cref="IRequestConfiguration.RequestTimeout" /> which itself defaults to 60 seconds
/// </pre>
/// </summary>
TimeSpan? MaxRetryTimeout { get; }

/// <summary>
/// Associate an Id with this user-initiated task, such that it can be located in the cluster task list.
/// Valid only for Elasticsearch 6.2.0+
/// </summary>
string? OpaqueId { get; }

/// <summary> Determines whether to parse all HTTP headers in the request. </summary>
bool? ParseAllHeaders { get; }

/// <summary>
/// The ping timeout for this specific request
/// </summary>
TimeSpan? PingTimeout { get; }

/// <summary>
/// The timeout for this specific request, takes precedence over the global timeout init
/// </summary>
TimeSpan? RequestTimeout { get; }

/// <summary>
/// Additional response builders to apply.
/// </summary>
IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }

/// <summary> Specifies the headers from the response that should be parsed. </summary>
HeadersList? ResponseHeadersToParse { get; }

/// <summary>
/// Submit the request on behalf in the context of a different shield user
/// <pre />https://www.elastic.co/guide/en/shield/current/submitting-requests-for-other-users.html
/// </summary>
string? RunAs { get; }

/// <summary>
/// Instead of following a c/go like error checking on response.IsValid do throw an exception (except when <see cref="ApiCallDetails.SuccessOrKnownError"/> is false)
/// on the client when a call resulted in an exception on either the client or the Elasticsearch server.
/// <para>Reasons for such exceptions could be search parser errors, index missing exceptions, etc...</para>
/// </summary>
bool? ThrowExceptions { get; }

/// <summary>
/// Whether the request should be sent with chunked Transfer-Encoding.
/// </summary>
bool? TransferEncodingChunked { get; }

/// <summary>
/// Try to send these headers for this single request
/// </summary>
NameValueCollection? Headers { get; }

/// <summary>
/// Enable statistics about TCP connections to be collected when making a request
/// </summary>
bool? EnableTcpStats { get; }

/// <summary>
/// Enable statistics about thread pools to be collected when making a request
/// </summary>
bool? EnableThreadPoolStats { get; }

/// <summary>
/// Holds additional meta data about the request.
/// </summary>
RequestMetaData? RequestMetaData { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,4 @@ public interface ITransportConfiguration : IRequestConfiguration, IDisposable
/// about the client and runtime.
/// </summary>
bool DisableMetaHeader { get; }

/// <summary>
/// Additional response builders to apply.
/// </summary>
IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
}
Loading
Loading