Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
082afb0
feat(csharp): add transformation helpers for object indexing with a t…
Fluf22 Oct 9, 2025
d3881d0
feat(csharp): add optional logger factory parameter to SetTransformat…
Fluf22 Oct 9, 2025
090e2e7
feat(csharp): add C# support to transformation YAML files
Fluf22 Oct 9, 2025
d5eae59
fix(tests): add transformation region support in client creation
Fluf22 Oct 9, 2025
c26a07d
fix: update error message to clarify method call requirements
Fluf22 Oct 13, 2025
50210df
feat(csharp): update response types in SaveObjectsWithTransformation …
Fluf22 Oct 14, 2025
8d68d40
feat(csharp): refactor IngestionClient initialization to use Ingestio…
Fluf22 Oct 15, 2025
b34c98a
feat(csharp): add C# support to validation checks in runCts.ts
Fluf22 Oct 15, 2025
c1993c5
feat(csharp): add C# support to various transformation operations in …
Fluf22 Oct 15, 2025
09c286f
feat(csharp): enhance temporary index name generation using RandomNum…
Fluf22 Oct 15, 2025
403dcce
feat(csharp): improve randomness in temporary index name generation u…
Fluf22 Oct 15, 2025
c1ebca6
feat(csharp): refactor retry logic into RetryHelper for improved code…
Fluf22 Oct 15, 2025
54be615
feat(csharp): update ChunkedPush and ReplaceAllObjectsWithTransformat…
Fluf22 Oct 15, 2025
97f28fd
feat(csharp): update SaveObjectsWithTransformation and PartialUpdateO…
Fluf22 Oct 15, 2025
43a6e6b
Merge branch 'main' into feat/csharp-push-helpers
Fluf22 Oct 15, 2025
628c49c
Merge branch 'main' into feat/csharp-push-helpers
Fluf22 Oct 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Algolia.Search.Exceptions;
using Algolia.Search.Http;
using Algolia.Search.Models.Ingestion;
using Algolia.Search.Serializer;
using Algolia.Search.Utils;

namespace Algolia.Search.Clients;

public partial interface IIngestionClient
{
/// <summary>
/// Helper method to call ChunkedPushAsync and convert the response types.
/// This simplifies SearchClient helpers that need to use IngestionClient.
/// </summary>
Task<List<WatchResponse>> ChunkedPushAsync(
string indexName,
IEnumerable<object> objects,
Models.Ingestion.Action action,
bool waitForTasks = false,
int batchSize = 1000,
string referenceIndexName = null,
RequestOptions options = null,
CancellationToken cancellationToken = default
);

/// <summary>
/// Synchronous version of ChunkedPushAsync
/// </summary>
List<WatchResponse> ChunkedPush(
string indexName,
IEnumerable<object> objects,
Models.Ingestion.Action action,
bool waitForTasks = false,
int batchSize = 1000,
string referenceIndexName = null,
RequestOptions options = null,
CancellationToken cancellationToken = default
);
}

public partial class IngestionClient : IIngestionClient
{
/// <summary>
/// Helper: Chunks the given `objects` list in subset of 1000 elements max in order to make it fit
/// in `push` requests by leveraging the Transformation pipeline setup in the Push connector
/// (https://www.algolia.com/doc/guides/sending-and-managing-data/send-and-update-your-data/connectors/push/).
/// </summary>
/// <param name="indexName">The `indexName` to push `objects` to.</param>
/// <param name="objects">The array of `objects` to store in the given Algolia `indexName`.</param>
/// <param name="action">The `action` to perform on the given array of `objects`.</param>
/// <param name="waitForTasks">Whether or not we should wait until every push task has been processed. This operation may slow the total execution time of this method but is more reliable.</param>
/// <param name="batchSize">The size of the chunk of `objects`. The number of push calls will be equal to `length(objects) / batchSize`. Defaults to 1000.</param>
/// <param name="referenceIndexName">This is required when targeting an index that does not have a push connector setup (e.g. a tmp index), but you wish to attach another index's transformation to it (e.g. the source index name).</param>
/// <param name="options">Add extra http header or query parameters to Algolia.</param>
/// <param name="cancellationToken">Cancellation token to cancel the request</param>
/// <returns>List of WatchResponse objects from the push operations</returns>
public async Task<List<WatchResponse>> ChunkedPushAsync(
string indexName,
IEnumerable<object> objects,
Algolia.Search.Models.Ingestion.Action action,
bool waitForTasks = false,
int batchSize = 1000,
string referenceIndexName = null,
RequestOptions options = null,
CancellationToken cancellationToken = default
)
{
var objectsList = objects.ToList();
var responses = new List<WatchResponse>();
var waitBatchSize = Math.Max(batchSize / 10, 1);
var offset = 0;

for (var i = 0; i < objectsList.Count; i += batchSize)
{
var chunk = objectsList.Skip(i).Take(batchSize);
var records = new List<PushTaskRecords>();

foreach (var obj in chunk)
{
var jsonString = JsonSerializer.Serialize(obj, JsonConfig.Options);
var record = JsonSerializer.Deserialize<PushTaskRecords>(jsonString, JsonConfig.Options);
records.Add(record);
}

var payload = new PushTaskPayload(action, records);

var response = await PushAsync(
indexName,
payload,
watch: null,
referenceIndexName: referenceIndexName,
options: options,
cancellationToken: cancellationToken
)
.ConfigureAwait(false);

responses.Add(response);

if (
waitForTasks
&& responses.Count > 0
&& (responses.Count % waitBatchSize == 0 || i + batchSize >= objectsList.Count)
)
{
for (var j = offset; j < responses.Count; j++)
{
var resp = responses[j];
if (string.IsNullOrEmpty(resp.EventID))
{
throw new AlgoliaException(
"Received unexpected response from the push endpoint, eventID must not be null or empty"
);
}

await RetryHelper.RetryUntil(
async () =>
{
try
{
return await GetEventAsync(
resp.RunID,
resp.EventID,
cancellationToken: cancellationToken
)
.ConfigureAwait(false);
}
catch (AlgoliaApiException ex) when (ex.HttpErrorCode == 404)
{
return await Task.FromResult<Algolia.Search.Models.Ingestion.Event>(null);
}
},
eventResponse => eventResponse != null,
maxRetries: 50,
ct: cancellationToken
)
.ConfigureAwait(false);
}
offset = responses.Count;
}
}

return responses;
}

/// <summary>
/// Synchronous version of ChunkedPushAsync
/// </summary>
public List<WatchResponse> ChunkedPush(
string indexName,
IEnumerable<object> objects,
Algolia.Search.Models.Ingestion.Action action,
bool waitForTasks = false,
int batchSize = 1000,
string referenceIndexName = null,
RequestOptions options = null,
CancellationToken cancellationToken = default
) =>
AsyncHelper.RunSync(() =>
ChunkedPushAsync(
indexName,
objects,
action,
waitForTasks,
batchSize,
referenceIndexName,
options,
cancellationToken
)
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Algolia.Search.Exceptions;

namespace Algolia.Search.Utils;

/// <summary>
/// A helper class to retry operations
/// </summary>
public static class RetryHelper
{
/// <summary>
/// The default maximum number of retries
/// </summary>
public const int DefaultMaxRetries = 50;

/// <summary>
/// Retry the given function until the validation function returns true or the maximum number of retries is reached
/// </summary>
/// <typeparam name="T">The type of the function's return value</typeparam>
/// <param name="func">The function to retry</param>
/// <param name="validate">The validation function</param>
/// <param name="maxRetries">The maximum number of retries</param>
/// <param name="timeout">A function that takes the retry count and returns the timeout in milliseconds before the next retry</param>
/// <param name="ct">A cancellation token to cancel the operation</param>
/// <returns>The result of the function if the validation function returns true</returns>
/// <exception cref="AlgoliaException">Thrown if the maximum number of retries is reached</exception>
public static async Task<T> RetryUntil<T>(
Func<Task<T>> func,
Func<T, bool> validate,
int maxRetries = DefaultMaxRetries,
Func<int, int> timeout = null,
CancellationToken ct = default
)
{
timeout ??= NextDelay;

var retryCount = 0;
while (retryCount < maxRetries)
{
var resp = await func().ConfigureAwait(false);
if (validate(resp))
{
return resp;
}

await Task.Delay(timeout(retryCount), ct).ConfigureAwait(false);
retryCount++;
}

throw new AlgoliaException(
"The maximum number of retries exceeded. (" + (retryCount + 1) + "/" + maxRetries + ")"
);
}

private static int NextDelay(int retryCount)
{
return Math.Min(retryCount * 200, 5000);
}
}
Loading