Skip to content

Commit

Permalink
Merge pull request #113 from CSCfi/devel
Browse files Browse the repository at this point in the history
Process publications in chunks
  • Loading branch information
sarkikos authored Jan 12, 2024
2 parents eee97c4 + c9f5786 commit 356fea1
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 23 deletions.
23 changes: 17 additions & 6 deletions aspnetcore/src/ElasticService/ElasticSearchIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ public async Task IndexAsync(string indexName, List<object> entities, Type model
// Add entities to the new index.
await IndexEntities(indexToCreate, entities, modelType);

// Switch indexes
await SwitchIndexes(indexName, indexToCreate, indexToDelete);

_logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName);
}

public async Task IndexChunkAsync(string indexToCreate, List<object> entities, Type modelType)
{
// Add entities to the index.
await IndexEntities(indexToCreate, entities, modelType);
}

public async Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete)
{
// Wait for new index to be operational.
await _elasticClient.Cluster
.HealthAsync(selector: s => s
Expand All @@ -47,12 +61,9 @@ await _elasticClient.Indices.BulkAliasAsync(r => r

// Delete the old index if it exists.
await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404)));

_logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName);

}

private async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName)
public async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName)
{
var indexNameV1 = $"{indexName}_v1";
var indexNameV2 = $"{indexName}_v2";
Expand Down Expand Up @@ -99,7 +110,7 @@ private async Task IndexEntities<T>(string indexName, List<T> entities, Type mod
throw new InvalidOperationException($"Indexing documents to {indexName} failed.", indexBatchResponse.OriginalException);
}
indexedCount = indexedCount + batchToIndex.Count;
_logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}. Progress {IndexedCount}/{TotalCount}", modelType.Name, batchToIndex.Count, indexName, indexedCount, entities.Count);
_logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}", modelType.Name, batchToIndex.Count, indexName);
}
}

Expand All @@ -111,7 +122,7 @@ private async Task IndexEntities<T>(string indexName, List<T> entities, Type mod
/// <param name="type"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
private async Task CreateIndex(string indexName, Type type)
public async Task CreateIndex(string indexName, Type type)
{
await _elasticClient.Indices.DeleteAsync(indexName,
d => d.RequestConfiguration(x => x.AllowedStatusCodes(404)));
Expand Down
38 changes: 37 additions & 1 deletion aspnetcore/src/ElasticService/IElasticSearchIndexService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,48 @@
public interface IElasticSearchIndexService
{
/// <summary>
/// Creates a new index with the given name and indexses the given entities.
/// Creates a new index with the given name and indexes the given entities.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="indexName"></param>
/// <param name="entities"></param>
/// <param name="modelType"></param>
/// <returns></returns>
Task IndexAsync(string indexName, List<object> entities, Type modelType);

/// <summary>
/// Index given entities.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="indexName"></param>
/// <param name="entities"></param>
/// <param name="modelType"></param>
/// <returns></returns>
Task IndexChunkAsync(string indexName, List<object> entities, Type modelType);

/// <summary>
/// Get name of index to create and name of index to delete.
/// </summary>
/// <param name="indexName"></param>
/// <returns></returns>
Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName);

/// <summary>
/// Creates index with the given name.
/// If the index exists already, it will be deleted first.
/// </summary>
/// <param name="indexName"></param>
/// <param name="type"></param>
/// <returns></returns>
/// <exception cref="InvalidOperationException"></exception>
Task CreateIndex(string indexName, Type type);

/// <summary>
/// Switch index
/// </summary>
/// <param name="indexName"></param>
/// <param name="indexToCreate"></param>
/// <param name="indexToDelete"></param>
/// <returns></returns>
Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete);
}
44 changes: 28 additions & 16 deletions aspnetcore/src/Indexer/Indexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ private async Task IndexEntities(string indexName,
List<object> finalized = new();

if (indexName.Contains("publication")) {

// Create new index
var (indexToCreate, indexToDelete) = await _indexService.GetIndexNames(indexName);
await _indexService.CreateIndex(indexToCreate, type);

/*
* Process database result in smaller chunks to keep memory requirement low.
* Chunking is based on skip/take query.
Expand All @@ -143,8 +148,9 @@ private async Task IndexEntities(string indexName,
*/

int skipAmount = 0;
int takeAmount = 200000;
int takeAmount = 50000;
int numOfResults = 0;
int processedCount = 0;

do
{
Expand All @@ -155,14 +161,19 @@ private async Task IndexEntities(string indexName,

if (numOfResults > 0)
{
_logger.LogInformation("{EntityType}: In-memory operations start", type.Name);
foreach (object entity in indexModels) {
finalized.Add(repository.PerformInMemoryOperation(entity));
}
_logger.LogInformation("{EntityType}: In-memory operations complete", type.Name);
await _indexService.IndexChunkAsync(indexToCreate, finalized, type);
}
skipAmount = skipAmount + takeAmount;
processedCount = processedCount + numOfResults;
finalized = new();
_logger.LogInformation("{EntityType}: Total documents indexed = {processedCount}", type.Name, processedCount);
} while(numOfResults >= takeAmount-1);

// Activate new index and delete old
await _indexService.SwitchIndexes(indexName, indexToCreate, indexToDelete);
}
else
{
Expand All @@ -180,20 +191,21 @@ private async Task IndexEntities(string indexName,
_logger.LogInformation("{EntityType}: Start in-memory operations", type.Name);
finalized = repository.PerformInMemoryOperations(indexModels);
}
}
var inMemoryElapsed = stopWatch.Elapsed;

if (finalized.Count > 0)
{
_logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed);
await _indexService.IndexAsync(indexName, finalized, type);
var indexingElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed);
_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed);
}
else
{
_logger.LogInformation("{EntityType}: Nothing to index", type.Name);
var inMemoryElapsed = stopWatch.Elapsed;

if (finalized.Count > 0)
{
_logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed);
await _indexService.IndexAsync(indexName, finalized, type);
var indexingElapsed = stopWatch.Elapsed;
_logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed);
_logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed);
}
else
{
_logger.LogInformation("{EntityType}: Nothing to index", type.Name);
}
}
}
catch (Exception ex)
Expand Down

0 comments on commit 356fea1

Please sign in to comment.