diff --git a/src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs index c6c92276..8784270c 100644 --- a/src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs @@ -2,12 +2,14 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Persistence; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using MongoDB.Bson; using MongoDB.Driver; namespace DotNetCore.CAP.MongoDB; @@ -67,6 +69,10 @@ await database.CreateCollectionAsync(options.PublishedCollection, cancellationTo await database.CreateCollectionAsync(options.LockCollection, cancellationToken: cancellationToken) .ConfigureAwait(false); + await Task.WhenAll( + DropReceivedMessageDeprecatedIndexesAsync(), + DropPublishedMessageDeprecatedIndexesAsync()).ConfigureAwait(false); + await Task.WhenAll( CreateReceivedMessageIndexesAsync(), CreatePublishedMessageIndexesAsync()).ConfigureAwait(false); @@ -98,13 +104,12 @@ async Task CreateReceivedMessageIndexesAsync() new(builder.Ascending(x => x.Name)), new(builder.Ascending(x => x.Added)), new(builder.Ascending(x => x.ExpiresAt)), - new(builder.Ascending(x => x.StatusName)), new(builder.Ascending(x => x.Retries)), new(builder.Ascending(x => x.Version)), new(builder.Ascending(x => x.StatusName).Ascending(x => x.ExpiresAt)) }; - - await col.Indexes.CreateManyAsync(indexes, cancellationToken); + + await CreateIndexesIfKeyPatternMissingAsync(col, indexes); } async Task CreatePublishedMessageIndexesAsync() @@ -117,13 +122,88 @@ async Task CreatePublishedMessageIndexesAsync() new(builder.Ascending(x => x.Name)), new(builder.Ascending(x => x.Added)), new(builder.Ascending(x => x.ExpiresAt)), - new(builder.Ascending(x => x.StatusName)), new(builder.Ascending(x => x.Retries)), new(builder.Ascending(x => x.Version)), new(builder.Ascending(x => x.StatusName).Ascending(x => x.ExpiresAt)) }; + + await CreateIndexesIfKeyPatternMissingAsync(col, indexes); + } + + /// + /// It creates indexes if an index with the same key pattern does not already exist. + /// This ensures backward compatibility with previous versions that created indexes with different names but the same key pattern. + /// + async Task CreateIndexesIfKeyPatternMissingAsync( + IMongoCollection collection, + IEnumerable> desiredIndexes) + { + var indexList = await collection.Indexes.ListAsync(cancellationToken); + var indexes = await indexList.ToListAsync(cancellationToken); + + var existingIndexes = new HashSet( + indexes.Select(i => GetIndexKeyPattern(i["key"].AsBsonDocument)) + ); + + foreach (var desiredIndex in desiredIndexes) + { + var indexBsonDocument = desiredIndex.Keys.Render(new RenderArgs( + collection.DocumentSerializer, + collection.Settings.SerializerRegistry)); + var desiredPattern = GetIndexKeyPattern(indexBsonDocument); + + if (!existingIndexes.Contains(desiredPattern)) + { + await collection.Indexes.CreateOneAsync(desiredIndex, cancellationToken: cancellationToken); + } + } + } + + static string GetIndexKeyPattern(BsonDocument keyDoc) + { + return string.Join(",", keyDoc.Elements.Select(e => $"{e.Name}:{e.Value}")); + } + + async Task DropReceivedMessageDeprecatedIndexesAsync() + { + var obsoleteIndexes = new HashSet { "Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version" }; + + var col = database.GetCollection(options.ReceivedCollection); + + await DropIndexesAsync(col, obsoleteIndexes); + } + + async Task DropPublishedMessageDeprecatedIndexesAsync() + { + var obsoleteIndexes = new HashSet { "Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version" }; + + var col = database.GetCollection(options.PublishedCollection); + + await DropIndexesAsync(col, obsoleteIndexes); + + } + + async Task DropIndexesAsync(IMongoCollection col, ISet obsoleteIndexes) + { + using var cursor = await col.Indexes.ListAsync(cancellationToken); + var indexList = await cursor.ToListAsync(cancellationToken); - await col.Indexes.CreateManyAsync(indexes, cancellationToken); + foreach (var index in indexList) + { + var indexName = index["name"].AsString; + if (!obsoleteIndexes.Contains(indexName)) continue; + + try + { + await col.Indexes.DropOneAsync(indexName, cancellationToken); + } + catch (MongoCommandException ex) when (ex.CodeName == "IndexNotFound") + { + // Index already dropped or not found, ignore + } + } } } + + }