From e561d39e6fe8e8ed3361b7ac74a46348dbd4e1d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Sa=CC=88rkikoski?= Date: Wed, 10 Jan 2024 10:24:44 +0200 Subject: [PATCH 1/4] TTV model update dim_publication.publication_status_code --- aspnetcore/src/DatabaseContext/ApiDbContext.cs | 10 +++++++--- .../DatabaseContext/Entities/DimPublication.cs | 3 ++- .../DatabaseContext/Entities/DimReferencedatum.cs | 2 ++ .../src/Repositories/Maps/PublicationProfile.cs | 2 +- .../Indexer.Tests/Maps/PublicationProfileTest.cs | 15 ++++++++++++--- 5 files changed, 24 insertions(+), 8 deletions(-) diff --git a/aspnetcore/src/DatabaseContext/ApiDbContext.cs b/aspnetcore/src/DatabaseContext/ApiDbContext.cs index bb30d61..9121c43 100644 --- a/aspnetcore/src/DatabaseContext/ApiDbContext.cs +++ b/aspnetcore/src/DatabaseContext/ApiDbContext.cs @@ -2907,9 +2907,7 @@ protected override void OnModelCreating(ModelBuilder modelBuilder) .HasMaxLength(255) .HasColumnName("publication_org_id"); - entity.Property(e => e.PublicationStatusCode) - .HasMaxLength(255) - .HasColumnName("publication_status_code"); + entity.Property(e => e.PublicationStatusCode).HasColumnName("publication_status_code"); entity.Property(e => e.PublicationTypeCode).HasColumnName("publication_type_code"); @@ -2997,6 +2995,12 @@ protected override void OnModelCreating(ModelBuilder modelBuilder) .OnDelete(DeleteBehavior.ClientSetNull) .HasConstraintName("publication_country_code"); + entity.HasOne(d => d.PublicationStatusCodeNavigation) + .WithMany(p => p.DimPublicationPublicationStatusCodeNavigations) + .HasForeignKey(d => d.PublicationStatusCode) + .OnDelete(DeleteBehavior.ClientSetNull) + .HasConstraintName("publication_status_code"); + entity.HasOne(d => d.PublicationTypeCodeNavigation) .WithMany(p => p.DimPublicationPublicationTypeCodeNavigations) .HasForeignKey(d => d.PublicationTypeCode) diff --git a/aspnetcore/src/DatabaseContext/Entities/DimPublication.cs b/aspnetcore/src/DatabaseContext/Entities/DimPublication.cs index 2c9a5e9..44f218e 100644 --- a/aspnetcore/src/DatabaseContext/Entities/DimPublication.cs +++ b/aspnetcore/src/DatabaseContext/Entities/DimPublication.cs @@ -20,7 +20,7 @@ public DimPublication() public int Id { get; set; } public int? ReportingYear { get; set; } public string PublicationId { get; set; } = null!; - public string? PublicationStatusCode { get; set; } + public int PublicationStatusCode { get; set; } public string PublicationOrgId { get; set; } = null!; public string PublicationName { get; set; } = null!; public string AuthorsText { get; set; } = null!; @@ -84,6 +84,7 @@ public DimPublication() public virtual DimReferencedatum LicenseCodeNavigation { get; set; } = null!; public virtual DimReferencedatum? ParentPublicationTypeCodeNavigation { get; set; } public virtual DimReferencedatum PublicationCountryCodeNavigation { get; set; } = null!; + public virtual DimReferencedatum PublicationStatusCodeNavigation { get; set; } = null!; public virtual DimReferencedatum? PublicationTypeCode2Navigation { get; set; } public virtual DimReferencedatum PublicationTypeCodeNavigation { get; set; } = null!; public virtual DimReferencedatum PublisherOpenAccessCodeNavigation { get; set; } = null!; diff --git a/aspnetcore/src/DatabaseContext/Entities/DimReferencedatum.cs b/aspnetcore/src/DatabaseContext/Entities/DimReferencedatum.cs index 72de7c2..6b1b90e 100644 --- a/aspnetcore/src/DatabaseContext/Entities/DimReferencedatum.cs +++ b/aspnetcore/src/DatabaseContext/Entities/DimReferencedatum.cs @@ -28,6 +28,7 @@ public DimReferencedatum() DimPublicationLicenseCodeNavigations = new HashSet(); DimPublicationParentPublicationTypeCodeNavigations = new HashSet(); DimPublicationPublicationCountryCodeNavigations = new HashSet(); + DimPublicationPublicationStatusCodeNavigations = new HashSet(); DimPublicationPublicationTypeCode2Navigations = new HashSet(); DimPublicationPublicationTypeCodeNavigations = new HashSet(); DimPublicationPublisherOpenAccessCodeNavigations = new HashSet(); @@ -83,6 +84,7 @@ public DimReferencedatum() public virtual ICollection DimPublicationLicenseCodeNavigations { get; set; } public virtual ICollection DimPublicationParentPublicationTypeCodeNavigations { get; set; } public virtual ICollection DimPublicationPublicationCountryCodeNavigations { get; set; } + public virtual ICollection DimPublicationPublicationStatusCodeNavigations { get; set; } public virtual ICollection DimPublicationPublicationTypeCode2Navigations { get; set; } public virtual ICollection DimPublicationPublicationTypeCodeNavigations { get; set; } public virtual ICollection DimPublicationPublisherOpenAccessCodeNavigations { get; set; } diff --git a/aspnetcore/src/Repositories/Maps/PublicationProfile.cs b/aspnetcore/src/Repositories/Maps/PublicationProfile.cs index 63117ed..f98c283 100644 --- a/aspnetcore/src/Repositories/Maps/PublicationProfile.cs +++ b/aspnetcore/src/Repositories/Maps/PublicationProfile.cs @@ -63,7 +63,7 @@ public PublicationProfile() .ForMember(dst => dst.BusinessCollaboration, opt => opt.MapFrom(src => src.BusinessCollaboration)) .ForMember(dst => dst.ApcFeeEur, opt => opt.MapFrom(src => src.ApcFeeEur)) .ForMember(dst => dst.ArticleType, opt => opt.MapFrom(src => src.ArticleTypeCodeNavigation)) - .ForMember(dst => dst.Status, opt => opt.MapFrom(src => src.PublicationStatusCode)) + .ForMember(dst => dst.Status, opt => opt.MapFrom(src => src.PublicationStatusCodeNavigation)) .ForMember(dst => dst.License, opt => opt.MapFrom(src => src.LicenseCodeNavigation.Id != -1 ? src.LicenseCodeNavigation : null)) .ForMember(dst => dst.Preprint, opt => opt.MapFrom(src => src.DimLocallyReportedPubInfos.Where(i => i.SelfArchivedType == PreprintType))) .ForMember(dst => dst.SelfArchived, opt => opt.MapFrom(src => src.DimLocallyReportedPubInfos.Where(i => i.SelfArchivedType == SelfArchivedType))) diff --git a/aspnetcore/test/Indexer.Tests/Maps/PublicationProfileTest.cs b/aspnetcore/test/Indexer.Tests/Maps/PublicationProfileTest.cs index aa931d3..12cc341 100644 --- a/aspnetcore/test/Indexer.Tests/Maps/PublicationProfileTest.cs +++ b/aspnetcore/test/Indexer.Tests/Maps/PublicationProfileTest.cs @@ -215,7 +215,13 @@ private static DimPublication GetEntity() NameFi = "articleTypeCodeNameFi", NameSv = "articleTypeCodeNameSv", }, - PublicationStatusCode = "publicationStatusCode", + PublicationStatusCodeNavigation = new DimReferencedatum + { + CodeValue = "publicationStatusCode", + NameEn = "publicationStatusCodeNameEn", + NameFi = "publicationStatusCodeNameFi", + NameSv = "publicationStatusCodeNameSv", + }, LicenseCodeNavigation = new DimReferencedatum { CodeValue = "1337", @@ -419,11 +425,14 @@ private Publication GetModel() Code = "articleTypeCode", NameEn = "articleTypeCodeNameEn", NameFi = "articleTypeCodeNameFi", - NameSv = "articleTypeCodeNameSv", + NameSv = "articleTypeCodeNameSv" }, Status = new ReferenceData { - Code = "publicationStatusCode" + Code = "publicationStatusCode", + NameEn = "publicationStatusCodeNameEn", + NameFi = "publicationStatusCodeNameFi", + NameSv = "publicationStatusCodeNameSv" }, License = new ReferenceData() { From 041fc8c2228e779d07f51ac6b396b808d122e54d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Sa=CC=88rkikoski?= Date: Fri, 12 Jan 2024 12:38:25 +0200 Subject: [PATCH 2/4] Process publications in chunks including Elasticsearch indexing --- .../ElasticSearchIndexService.cs | 31 ++++++++++++-- .../IElasticSearchIndexService.cs | 38 ++++++++++++++++- aspnetcore/src/Indexer/Indexer.cs | 42 ++++++++++++------- 3 files changed, 91 insertions(+), 20 deletions(-) diff --git a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs index 0ad3f7c..38f68c5 100644 --- a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs +++ b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs @@ -49,10 +49,35 @@ await _elasticClient.Indices.BulkAliasAsync(r => r await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); _logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName); + } + + public async Task IndexChunkAsync(string indexToCreate, List entities, Type modelType) + { + // Add entities to the index. + await IndexEntities(indexToCreate, entities, modelType); + } + public async Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete) + { + // Wait for new index to be operational. + await _elasticClient.Cluster + .HealthAsync(selector: s => s + .WaitForStatus(Elasticsearch.Net.WaitForStatus.Yellow) + .WaitForActiveShards("1") + .Index(indexToCreate)); + + // Add new alias from index_new to index + await _elasticClient.Indices.BulkAliasAsync(r => r + // Remove alias "index_old => index" + .Remove(remove => remove.Alias(indexName).Index("*")) + // Add alias "index_new => index" + .Add(add => add.Alias(indexName).Index(indexToCreate))); + + // Delete the old index if it exists. + await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); } - private async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName) + public async Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName) { var indexNameV1 = $"{indexName}_v1"; var indexNameV2 = $"{indexName}_v2"; @@ -99,7 +124,7 @@ private async Task IndexEntities(string indexName, List entities, Type mod throw new InvalidOperationException($"Indexing documents to {indexName} failed.", indexBatchResponse.OriginalException); } indexedCount = indexedCount + batchToIndex.Count; - _logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}. Progress {IndexedCount}/{TotalCount}", modelType.Name, batchToIndex.Count, indexName, indexedCount, entities.Count); + _logger.LogInformation("{EntityType}: Indexed {BatchSize} documents to {IndexName}", modelType.Name, batchToIndex.Count, indexName); } } @@ -111,7 +136,7 @@ private async Task IndexEntities(string indexName, List entities, Type mod /// /// /// - private async Task CreateIndex(string indexName, Type type) + public async Task CreateIndex(string indexName, Type type) { await _elasticClient.Indices.DeleteAsync(indexName, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); diff --git a/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs b/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs index e2241fb..960b848 100644 --- a/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs +++ b/aspnetcore/src/ElasticService/IElasticSearchIndexService.cs @@ -6,7 +6,7 @@ public interface IElasticSearchIndexService { /// - /// Creates a new index with the given name and indexses the given entities. + /// Creates a new index with the given name and indexes the given entities. /// /// /// @@ -14,4 +14,40 @@ public interface IElasticSearchIndexService /// /// Task IndexAsync(string indexName, List entities, Type modelType); + + /// + /// Index given entities. + /// + /// + /// + /// + /// + /// + Task IndexChunkAsync(string indexName, List entities, Type modelType); + + /// + /// Get name of index to create and name of index to delete. + /// + /// + /// + Task<(string indexToCreate, string indexToDelete)> GetIndexNames(string indexName); + + /// + /// Creates index with the given name. + /// If the index exists already, it will be deleted first. + /// + /// + /// + /// + /// + Task CreateIndex(string indexName, Type type); + + /// + /// Switch index + /// + /// + /// + /// + /// + Task SwitchIndexes(string indexName, string indexToCreate, string indexToDelete); } \ No newline at end of file diff --git a/aspnetcore/src/Indexer/Indexer.cs b/aspnetcore/src/Indexer/Indexer.cs index d8bcecd..f82b7a1 100644 --- a/aspnetcore/src/Indexer/Indexer.cs +++ b/aspnetcore/src/Indexer/Indexer.cs @@ -135,6 +135,10 @@ private async Task IndexEntities(string indexName, List finalized = new(); if (indexName.Contains("publication")) { + + var (indexToCreate, indexToDelete) = await _indexService.GetIndexNames(indexName); + await _indexService.CreateIndex(indexToCreate, type); + /* * Process database result in smaller chunks to keep memory requirement low. * Chunking is based on skip/take query. @@ -143,8 +147,9 @@ private async Task IndexEntities(string indexName, */ int skipAmount = 0; - int takeAmount = 200000; + int takeAmount = 2; int numOfResults = 0; + int processedCount = 0; do { @@ -155,14 +160,18 @@ private async Task IndexEntities(string indexName, if (numOfResults > 0) { - _logger.LogInformation("{EntityType}: In-memory operations start", type.Name); foreach (object entity in indexModels) { finalized.Add(repository.PerformInMemoryOperation(entity)); } - _logger.LogInformation("{EntityType}: In-memory operations complete", type.Name); + await _indexService.IndexChunkAsync(indexToCreate, finalized, type); } skipAmount = skipAmount + takeAmount; + finalized = new(); + processedCount = processedCount + numOfResults; + _logger.LogInformation("{EntityType}: Total indexed count = {processedCount}", type.Name, processedCount); } while(numOfResults >= takeAmount-1); + + await _indexService.SwitchIndexes(indexName, indexToCreate, indexToDelete); } else { @@ -180,20 +189,21 @@ private async Task IndexEntities(string indexName, _logger.LogInformation("{EntityType}: Start in-memory operations", type.Name); finalized = repository.PerformInMemoryOperations(indexModels); } - } - var inMemoryElapsed = stopWatch.Elapsed; - if (finalized.Count > 0) - { - _logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed); - await _indexService.IndexAsync(indexName, finalized, type); - var indexingElapsed = stopWatch.Elapsed; - _logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed); - _logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed); - } - else - { - _logger.LogInformation("{EntityType}: Nothing to index", type.Name); + var inMemoryElapsed = stopWatch.Elapsed; + + if (finalized.Count > 0) + { + _logger.LogInformation("{EntityType}: Retrieved and performed in-memory operations to {FinalizedCount} entities in {Elapsed}. Start indexing.", type.Name, finalized.Count, inMemoryElapsed); + await _indexService.IndexAsync(indexName, finalized, type); + var indexingElapsed = stopWatch.Elapsed; + _logger.LogInformation("{EntityType}: Indexed total of {IndexCount} documents in {ElapsedIndexing}...", type.Name, finalized.Count, indexingElapsed - inMemoryElapsed); + _logger.LogInformation("{EntityType}: Index '{IndexName}' recreated successfully in {ElapsedTotal}", type.Name, indexName, stopWatch.Elapsed); + } + else + { + _logger.LogInformation("{EntityType}: Nothing to index", type.Name); + } } } catch (Exception ex) From b3f9b2ef0240023380a9818126c5b01d53a82e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Sa=CC=88rkikoski?= Date: Fri, 12 Jan 2024 12:42:45 +0200 Subject: [PATCH 3/4] Set chunk size to 50000 --- aspnetcore/src/Indexer/Indexer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aspnetcore/src/Indexer/Indexer.cs b/aspnetcore/src/Indexer/Indexer.cs index f82b7a1..92f2d4d 100644 --- a/aspnetcore/src/Indexer/Indexer.cs +++ b/aspnetcore/src/Indexer/Indexer.cs @@ -147,7 +147,7 @@ private async Task IndexEntities(string indexName, */ int skipAmount = 0; - int takeAmount = 2; + int takeAmount = 50000; int numOfResults = 0; int processedCount = 0; From 6b5533dd28bbd2804ad1179d099f690f0e4ee9ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Sa=CC=88rkikoski?= Date: Fri, 12 Jan 2024 14:23:57 +0200 Subject: [PATCH 4/4] Reuse same function when activating new index --- .../ElasticSearchIndexService.cs | 18 ++---------------- aspnetcore/src/Indexer/Indexer.cs | 6 ++++-- 2 files changed, 6 insertions(+), 18 deletions(-) diff --git a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs index 38f68c5..7e4b48e 100644 --- a/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs +++ b/aspnetcore/src/ElasticService/ElasticSearchIndexService.cs @@ -31,22 +31,8 @@ public async Task IndexAsync(string indexName, List entities, Type model // Add entities to the new index. await IndexEntities(indexToCreate, entities, modelType); - // Wait for new index to be operational. - await _elasticClient.Cluster - .HealthAsync(selector: s => s - .WaitForStatus(Elasticsearch.Net.WaitForStatus.Yellow) - .WaitForActiveShards("1") - .Index(indexToCreate)); - - // Add new alias from index_new to index - await _elasticClient.Indices.BulkAliasAsync(r => r - // Remove alias "index_old => index" - .Remove(remove => remove.Alias(indexName).Index("*")) - // Add alias "index_new => index" - .Add(add => add.Alias(indexName).Index(indexToCreate))); - - // Delete the old index if it exists. - await _elasticClient.Indices.DeleteAsync(indexToDelete, d => d.RequestConfiguration(x => x.AllowedStatusCodes(404))); + // Switch indexes + await SwitchIndexes(indexName, indexToCreate, indexToDelete); _logger.LogDebug("{EntityType}: Indexing to {IndexName} complete", modelType.Name, indexName); } diff --git a/aspnetcore/src/Indexer/Indexer.cs b/aspnetcore/src/Indexer/Indexer.cs index 92f2d4d..bd7b21a 100644 --- a/aspnetcore/src/Indexer/Indexer.cs +++ b/aspnetcore/src/Indexer/Indexer.cs @@ -136,6 +136,7 @@ private async Task IndexEntities(string indexName, if (indexName.Contains("publication")) { + // Create new index var (indexToCreate, indexToDelete) = await _indexService.GetIndexNames(indexName); await _indexService.CreateIndex(indexToCreate, type); @@ -166,11 +167,12 @@ private async Task IndexEntities(string indexName, await _indexService.IndexChunkAsync(indexToCreate, finalized, type); } skipAmount = skipAmount + takeAmount; - finalized = new(); processedCount = processedCount + numOfResults; - _logger.LogInformation("{EntityType}: Total indexed count = {processedCount}", type.Name, processedCount); + finalized = new(); + _logger.LogInformation("{EntityType}: Total documents indexed = {processedCount}", type.Name, processedCount); } while(numOfResults >= takeAmount-1); + // Activate new index and delete old await _indexService.SwitchIndexes(indexName, indexToCreate, indexToDelete); } else