diff --git a/PipeFlow.Tests/ApiReaderGenericTests.cs b/PipeFlow.Tests/ApiReaderGenericTests.cs new file mode 100644 index 0000000..df14e4c --- /dev/null +++ b/PipeFlow.Tests/ApiReaderGenericTests.cs @@ -0,0 +1,288 @@ +using System.Net; +using System.Net.Http.Json; +using System.Reflection; +using PipeFlow.Core.Api; + +namespace PipeFlow.Tests +{ + public class ApiReaderGenericTests + { + private sealed class SampleDto + { + public int Id { get; set; } + public string? Name { get; set; } + } + + private sealed class TestableApiReader : ApiReader + { + public TestableApiReader(string baseUrl) : base(baseUrl) + { + } + + public string? AuthTokenValue => AuthToken; + public IReadOnlyDictionary HeaderValues => Headers; + public int MaxRetriesValue => MaxRetries; + public TimeSpan RetryDelayValue => RetryDelay; + + public Func>? CustomFetch { get; set; } + + public void UseHttpClient(HttpClient client) + { + var field = typeof(ApiReader).GetField("HttpClient", BindingFlags.Instance | BindingFlags.NonPublic); + field?.SetValue(this, client); + } + + protected override Task FetchDataWithRetry(string url) + { + if (CustomFetch != null) + { + return CustomFetch(url); + } + + return base.FetchDataWithRetry(url); + } + } + + private sealed class TestHttpMessageHandler : HttpMessageHandler + { + private readonly Queue> _responses; + + public TestHttpMessageHandler(IEnumerable> responses) + { + _responses = new Queue>(responses); + } + + public List Requests { get; } = new(); + + public void EnqueueResponse(Func response) + { + _responses.Enqueue(response); + } + + public void EnqueueResponses(params Func[] responses) + { + foreach (var response in responses) + { + _responses.Enqueue(response); + } + } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + Requests.Add(CloneRequest(request)); + + var responseFactory = _responses.Count > 0 + ? _responses.Dequeue() + : (_ => new HttpResponseMessage(HttpStatusCode.NotFound)); + + var response = responseFactory(request); + return Task.FromResult(response); + } + + private static HttpRequestMessage CloneRequest(HttpRequestMessage request) + { + var clone = new HttpRequestMessage(request.Method, request.RequestUri); + + foreach (var header in request.Headers) + { + clone.Headers.TryAddWithoutValidation(header.Key, header.Value); + } + + return clone; + } + } + + [Fact] + public void Constructor_NullUrl_Throws() + { + Assert.Throws(() => new ApiReader(null!)); + } + + [Fact] + public async Task ReadAsync_ReturnsDeserializedResult() + { + var reader = CreateReader(out var handler); + + handler.EnqueueResponses(_ => new HttpResponseMessage(HttpStatusCode.OK) + { + Content = JsonContent.Create(new SampleDto { Id = 42, Name = "Test" }) + }); + + var result = await reader.ReadAsync(); + + Assert.NotNull(result); + Assert.Equal(42, result!.Id); + Assert.Equal("Test", result.Name); + } + + [Fact] + public void Read_UsesSynchronousWrapper() + { + var reader = new TestableApiReader("https://api.example.com/data") + { + CustomFetch = _ => Task.FromResult(new SampleDto { Id = 7, Name = "Sync" }) + }; + + var result = reader.Read(); + + Assert.Equal(7, result.Id); + Assert.Equal("Sync", result.Name); + } + + [Fact] + public async Task WithAuth_AddsAuthorizationHeader() + { + var reader = CreateReader(out var handler); + + handler.EnqueueResponses(_ => new HttpResponseMessage(HttpStatusCode.OK) + { + Content = JsonContent.Create(new SampleDto()) + }); + + reader.WithAuth("token-value", "Bearer"); + + await reader.ReadAsync(); + + var authorization = handler.Requests.Single().Headers.GetValues("Authorization").Single(); + Assert.Equal("Bearer token-value", authorization); + Assert.Equal("Bearer token-value", reader.AuthTokenValue); + } + + [Fact] + public async Task WithHeader_AddsCustomHeader() + { + var reader = CreateReader(out var handler); + + handler.EnqueueResponses(_ => new HttpResponseMessage(HttpStatusCode.OK) + { + Content = JsonContent.Create(new SampleDto()) + }); + + reader.WithHeader("X-Custom", "Value123"); + + await reader.ReadAsync(); + + var headerValues = handler.Requests.Single().Headers.GetValues("X-Custom").ToArray(); + Assert.Single(headerValues); + Assert.Equal("Value123", headerValues[0]); + Assert.Equal("Value123", reader.HeaderValues["X-Custom"]); + } + + [Fact] + public void WithRetry_OverridesRetryConfiguration() + { + var reader = new TestableApiReader("https://api.example.com/data"); + + reader.WithRetry(5, TimeSpan.FromMilliseconds(250)); + + Assert.Equal(5, reader.MaxRetriesValue); + Assert.Equal(TimeSpan.FromMilliseconds(250), reader.RetryDelayValue); + } + + [Fact] + public async Task FetchDataWithRetry_RetriesUntilSuccess() + { + var reader = CreateReader(out var handler); + + reader.WithRetry(3, TimeSpan.Zero); + + handler.EnqueueResponses( + _ => new HttpResponseMessage(HttpStatusCode.InternalServerError), + _ => new HttpResponseMessage(HttpStatusCode.BadGateway), + _ => new HttpResponseMessage(HttpStatusCode.OK) + { + Content = JsonContent.Create(new SampleDto { Id = 1 }) + }); + + var result = await reader.ReadAsync(); + + Assert.Equal(3, handler.Requests.Count); + Assert.NotNull(result); + Assert.Equal(1, result!.Id); + } + + [Fact] + public async Task FetchDataWithRetry_ReturnsDefaultAfterUnsuccessfulResponses() + { + var reader = CreateReader(out var handler); + + reader.WithRetry(2, TimeSpan.Zero); + + handler.EnqueueResponses( + _ => new HttpResponseMessage(HttpStatusCode.ServiceUnavailable), + _ => new HttpResponseMessage(HttpStatusCode.InternalServerError)); + + var result = await reader.ReadAsync(); + + Assert.Equal(2, handler.Requests.Count); + Assert.Null(result); + } + + [Fact] + public async Task FetchDataWithRetry_ThrowsAfterMaxExceptions() + { + var reader = CreateReader(out var handler); + + reader.WithRetry(2, TimeSpan.Zero); + + handler.EnqueueResponses( + _ => throw new HttpRequestException("boom"), + _ => throw new HttpRequestException("boom")); + + var exception = await Assert.ThrowsAsync(() => reader.ReadAsync()); + + Assert.Contains("Failed to fetch data", exception.Message); + Assert.Equal(2, handler.Requests.Count); + } + + [Fact] + public async Task FetchDataWithRetry_NoConfiguredResponses_ReturnsDefault() + { + var reader = CreateReader(out var handler); + + var result = await reader.ReadAsync(); + + Assert.Null(result); + Assert.Equal(reader.MaxRetriesValue, handler.Requests.Count); + } + + [Fact] + public void WithHeader_RewritesExistingValue() + { + var reader = CreateReader(out _); + + reader.WithHeader("X-Duplicate", "First") + .WithHeader("X-Duplicate", "Second"); + + Assert.Equal("Second", reader.HeaderValues["X-Duplicate"]); + } + + [Fact] + public async Task Dispose_PreventsFurtherRequests() + { + var reader = CreateReader(out var handler); + + handler.EnqueueResponses(_ => new HttpResponseMessage(HttpStatusCode.OK) + { + Content = JsonContent.Create(new SampleDto()) + }); + + await reader.ReadAsync(); + + reader.Dispose(); + + var exception = await Assert.ThrowsAsync(() => reader.ReadAsync()); + + Assert.Contains("Failed to fetch data", exception.Message); + Assert.All(handler.Requests, request => Assert.Equal(HttpMethod.Get, request.Method)); + } + + private static TestableApiReader CreateReader(out TestHttpMessageHandler handler) + { + handler = new TestHttpMessageHandler(Array.Empty>()); + var reader = new TestableApiReader("https://api.example.com/data"); + reader.UseHttpClient(new HttpClient(handler)); + return reader; + } + } +} diff --git a/PipeFlow/Api/ApiReader.cs b/PipeFlow/Api/ApiReader.cs index 86c6d72..b55045b 100644 --- a/PipeFlow/Api/ApiReader.cs +++ b/PipeFlow/Api/ApiReader.cs @@ -1,52 +1,34 @@ -using System; -using System.Collections.Generic; -using System.Net.Http; using System.Text.Json; -using System.Threading.Tasks; -using System.Linq; -using PipeFlow.Core; namespace PipeFlow.Core.Api; -public class ApiReader +public sealed class ApiReader : ApiReader> { - private readonly string _baseUrl; - private readonly HttpClient _httpClient; - private string _authToken; - private Dictionary _headers; - private int _maxRetries = 3; - private TimeSpan _retryDelay = TimeSpan.FromSeconds(1); private int? _pageSize; private string _pageParameter = "page"; private string _pageSizeParameter = "pageSize"; public ApiReader(string baseUrl) + : base(baseUrl) { - if (baseUrl == null) - throw new ArgumentNullException("baseUrl"); - - _baseUrl = baseUrl; - _httpClient = new HttpClient(); - _headers = new Dictionary(); + } - public ApiReader WithAuth(string token, string scheme = "Bearer") + public override ApiReader WithAuth(string token, string scheme = "Bearer") { - _authToken = $"{scheme} {token}"; + base.WithAuth(token, scheme); return this; } - public ApiReader WithHeader(string name, string value) + public override ApiReader WithHeader(string name, string value) { - _headers[name] = value; + base.WithHeader(name, value); return this; } - public ApiReader WithRetry(int maxRetries, TimeSpan? delay = null) + public override ApiReader WithRetry(int maxRetries, TimeSpan? delay = null) { - _maxRetries = maxRetries; - if (delay != null) - _retryDelay = delay.Value; + base.WithRetry(maxRetries, delay); return this; } @@ -58,7 +40,7 @@ public ApiReader WithPagination(int pageSize, string pageParam = "page", string return this; } - public IEnumerable Read() + public override IEnumerable Read() { var task = Task.Run(async () => await ReadAsync()); task.Wait(); @@ -69,14 +51,14 @@ public IEnumerable Read() } } - private async Task> ReadAsync() + public override async Task> ReadAsync() { var results = new List(); if (_pageSize != null) { - int page = 1; - bool hasMoreData = true; + var page = 1; + var hasMoreData = true; while (hasMoreData) { @@ -96,7 +78,7 @@ private async Task> ReadAsync() } else { - var data = await FetchDataWithRetry(_baseUrl); + var data = await FetchDataWithRetry(BaseUrl); if (data != null) results.AddRange(data); } @@ -106,35 +88,31 @@ private async Task> ReadAsync() private string BuildPaginatedUrl(int page) { - string separator; - if (_baseUrl.Contains("?")) - separator = "&"; - else - separator = "?"; - return $"{_baseUrl}{separator}{_pageParameter}={page}&{_pageSizeParameter}={_pageSize}"; + var separator = BaseUrl.Contains('?') ? "&" : "?"; + return $"{BaseUrl}{separator}{_pageParameter}={page}&{_pageSizeParameter}={_pageSize}"; } - private async Task> FetchDataWithRetry(string url) + protected override async Task> FetchDataWithRetry(string url) { - int attempt = 0; + var attempt = 0; - while (attempt < _maxRetries) + while (attempt < MaxRetries) { try { var request = new HttpRequestMessage(HttpMethod.Get, url); - if (_authToken != null) + if (AuthToken != null) { - request.Headers.Add("Authorization", _authToken); + request.Headers.Add("Authorization", AuthToken); } - foreach (var header in _headers) + foreach (var header in Headers) { request.Headers.Add(header.Key, header.Value); } - var response = await _httpClient.SendAsync(request); + var response = await HttpClient.SendAsync(request); if (response.IsSuccessStatusCode) { @@ -143,19 +121,19 @@ private async Task> FetchDataWithRetry(string url) } attempt++; - if (attempt < _maxRetries) + if (attempt < MaxRetries) { - await Task.Delay(_retryDelay * attempt); + await Task.Delay(RetryDelay * attempt); } } catch (Exception ex) { attempt++; - if (attempt >= _maxRetries) + if (attempt >= MaxRetries) { - throw new Exception($"Failed to fetch data from {url} after {_maxRetries} attempts", ex); + throw new Exception($"Failed to fetch data from {url} after {MaxRetries} attempts", ex); } - await Task.Delay(_retryDelay * attempt); + await Task.Delay(RetryDelay * attempt); } } @@ -254,9 +232,4 @@ private object GetJsonValue(JsonElement element) return element.ToString(); } } - - public void Dispose() - { - _httpClient?.Dispose(); - } } \ No newline at end of file diff --git a/PipeFlow/Api/ApiReaderGeneric.cs b/PipeFlow/Api/ApiReaderGeneric.cs new file mode 100644 index 0000000..9908923 --- /dev/null +++ b/PipeFlow/Api/ApiReaderGeneric.cs @@ -0,0 +1,133 @@ +using System.Net.Http.Json; + +namespace PipeFlow.Core.Api; + +public class ApiReader : IDisposable +{ + protected readonly string BaseUrl; + protected readonly HttpClient HttpClient; + protected string AuthToken; + protected readonly Dictionary Headers; + protected int MaxRetries = 3; + protected TimeSpan RetryDelay = TimeSpan.FromSeconds(1); + + public ApiReader(string baseUrl) + { + BaseUrl = baseUrl ?? throw new ArgumentNullException(nameof(baseUrl)); + HttpClient = new HttpClient(); + Headers = new Dictionary(StringComparer.OrdinalIgnoreCase); + } + + public virtual ApiReader WithAuth(string token, string scheme = "Bearer") + { + if (string.IsNullOrWhiteSpace(token)) + throw new ArgumentException("Authentication token cannot be null or whitespace.", nameof(token)); + + if (string.IsNullOrWhiteSpace(scheme)) + throw new ArgumentException("Authentication scheme cannot be null or whitespace.", nameof(scheme)); + + AuthToken = $"{scheme} {token}"; + return this; + } + + public virtual ApiReader WithHeader(string name, string value) + { + if (string.IsNullOrWhiteSpace(name)) + throw new ArgumentException("Header name cannot be null or whitespace.", nameof(name)); + + if (value is null) + throw new ArgumentNullException(nameof(value)); + + Headers[name] = value; + return this; + } + + public virtual ApiReader WithRetry(int maxRetries, TimeSpan? delay = null) + { + if (maxRetries <= 0) + throw new ArgumentOutOfRangeException(nameof(maxRetries), maxRetries, "Retry count must be greater than zero."); + + if (delay is { } retryDelay && retryDelay < TimeSpan.Zero) + throw new ArgumentOutOfRangeException(nameof(delay), delay, "Retry delay must be non-negative."); + + MaxRetries = maxRetries; + if (delay != null) + RetryDelay = delay.Value; + return this; + } + + public virtual TResult Read() + { + var result = ReadAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + + return result; + } + + public virtual async Task ReadAsync() + { + return await FetchDataWithRetry(BaseUrl); + } + + protected virtual async Task FetchDataWithRetry(string url) + { + var attempt = 0; + + while (attempt < MaxRetries) + { + try + { + using var request = new HttpRequestMessage(HttpMethod.Get, url); + ApplyRequestHeaders(request); + + using var response = await HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead); + + if (response.IsSuccessStatusCode) + { + var result = await response.Content.ReadFromJsonAsync(); + return result; + } + } + catch (Exception ex) + { + attempt++; + if (attempt >= MaxRetries) + { + throw new Exception($"Failed to fetch data from {url} after {MaxRetries} attempts", ex); + } + + await Task.Delay(RetryDelay * attempt); + continue; + } + + attempt++; + if (attempt < MaxRetries) + { + await Task.Delay(RetryDelay * attempt); + } + } + + return default; + } + + public void Dispose() + { + HttpClient?.Dispose(); + GC.SuppressFinalize(this); + } + + private void ApplyRequestHeaders(HttpRequestMessage request) + { + if (!string.IsNullOrWhiteSpace(AuthToken)) + { + request.Headers.Add("Authorization", AuthToken); + } + + foreach (var header in Headers) + { + if (!request.Headers.TryAddWithoutValidation(header.Key, header.Value)) + { + throw new InvalidOperationException($"Failed to add header '{header.Key}' to the request."); + } + } + } +} diff --git a/PipeFlow/PipeFlow.cs b/PipeFlow/PipeFlow.cs index 96c36b4..3733a0d 100644 --- a/PipeFlow/PipeFlow.cs +++ b/PipeFlow/PipeFlow.cs @@ -115,6 +115,20 @@ public IPipeline Api(string url, Action configure) configure?.Invoke(reader); return new Pipeline(reader.Read()); } + + public IPipeline Api(string url, Action>? configure = null) + { + var reader = new ApiReader(url); + configure?.Invoke(reader); + return new Pipeline([reader.Read()]); + } + + public async Task> ApiAsync(string url, Action>? configure = null) + { + var reader = new ApiReader(url); + configure?.Invoke(reader); + return new Pipeline([await reader.ReadAsync()]); + } public IPipeline MongoDB(string connectionString, string database, string collection) {