Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 85 additions & 5 deletions src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB;
Expand Down Expand Up @@ -67,6 +69,10 @@ await database.CreateCollectionAsync(options.PublishedCollection, cancellationTo
await database.CreateCollectionAsync(options.LockCollection, cancellationToken: cancellationToken)
.ConfigureAwait(false);

await Task.WhenAll(
DropReceivedMessageDeprecatedIndexesAsync(),
DropPublishedMessageDeprecatedIndexesAsync()).ConfigureAwait(false);

await Task.WhenAll(
CreateReceivedMessageIndexesAsync(),
CreatePublishedMessageIndexesAsync()).ConfigureAwait(false);
Expand Down Expand Up @@ -98,13 +104,12 @@ async Task CreateReceivedMessageIndexesAsync()
new(builder.Ascending(x => x.Name)),
new(builder.Ascending(x => x.Added)),
new(builder.Ascending(x => x.ExpiresAt)),
new(builder.Ascending(x => x.StatusName)),
new(builder.Ascending(x => x.Retries)),
new(builder.Ascending(x => x.Version)),
new(builder.Ascending(x => x.StatusName).Ascending(x => x.ExpiresAt))
};

await col.Indexes.CreateManyAsync(indexes, cancellationToken);
await CreateIndexesIfKeyPatternMissingAsync(col, indexes);
}

async Task CreatePublishedMessageIndexesAsync()
Expand All @@ -117,13 +122,88 @@ async Task CreatePublishedMessageIndexesAsync()
new(builder.Ascending(x => x.Name)),
new(builder.Ascending(x => x.Added)),
new(builder.Ascending(x => x.ExpiresAt)),
new(builder.Ascending(x => x.StatusName)),
new(builder.Ascending(x => x.Retries)),
new(builder.Ascending(x => x.Version)),
new(builder.Ascending(x => x.StatusName).Ascending(x => x.ExpiresAt))
};

await CreateIndexesIfKeyPatternMissingAsync(col, indexes);
}

/// <summary>
/// It creates indexes if an index with the same key pattern does not already exist.
/// This ensures backward compatibility with previous versions that created indexes with different names but the same key pattern.
/// </summary>
async Task CreateIndexesIfKeyPatternMissingAsync<T>(
IMongoCollection<T> collection,
IEnumerable<CreateIndexModel<T>> desiredIndexes)
{
var indexList = await collection.Indexes.ListAsync(cancellationToken);
var indexes = await indexList.ToListAsync(cancellationToken);

var existingIndexes = new HashSet<string>(
indexes.Select(i => GetIndexKeyPattern(i["key"].AsBsonDocument))
Copy link
Preview

Copilot AI Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accessing i["key"] without checking if the key exists could throw a KeyNotFoundException if an index document doesn't have a 'key' field. Add a null check or use TryGetValue to handle malformed index documents safely.

Suggested change
indexes.Select(i => GetIndexKeyPattern(i["key"].AsBsonDocument))
indexes
.Select(i => i.TryGetValue("key", out var keyValue) && keyValue != null
? GetIndexKeyPattern(keyValue.AsBsonDocument)
: null)
.Where(pattern => pattern != null)

Copilot uses AI. Check for mistakes.

);

foreach (var desiredIndex in desiredIndexes)
{
var indexBsonDocument = desiredIndex.Keys.Render(new RenderArgs<T>(
collection.DocumentSerializer,
collection.Settings.SerializerRegistry));
var desiredPattern = GetIndexKeyPattern(indexBsonDocument);

if (!existingIndexes.Contains(desiredPattern))
{
await collection.Indexes.CreateOneAsync(desiredIndex, cancellationToken: cancellationToken);
}
}
}

static string GetIndexKeyPattern(BsonDocument keyDoc)
{
return string.Join(",", keyDoc.Elements.Select(e => $"{e.Name}:{e.Value}"));
}

async Task DropReceivedMessageDeprecatedIndexesAsync()
{
var obsoleteIndexes = new HashSet<string> { "Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version" };

var col = database.GetCollection<ReceivedMessage>(options.ReceivedCollection);

await DropIndexesAsync(col, obsoleteIndexes);
}

async Task DropPublishedMessageDeprecatedIndexesAsync()
{
var obsoleteIndexes = new HashSet<string> { "Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version" };

var col = database.GetCollection<PublishedMessage>(options.PublishedCollection);

await DropIndexesAsync(col, obsoleteIndexes);

}

async Task DropIndexesAsync<T>(IMongoCollection<T> col, ISet<string> obsoleteIndexes)
{
using var cursor = await col.Indexes.ListAsync(cancellationToken);
var indexList = await cursor.ToListAsync(cancellationToken);

await col.Indexes.CreateManyAsync(indexes, cancellationToken);
foreach (var index in indexList)
{
var indexName = index["name"].AsString;
if (!obsoleteIndexes.Contains(indexName)) continue;

try
{
await col.Indexes.DropOneAsync(indexName, cancellationToken);
}
catch (MongoCommandException ex) when (ex.CodeName == "IndexNotFound")
{
// Index already dropped or not found, ignore
}
}
}
}


}