Skip to content

Commit 2597c22

Browse files
authored
[Event Hubs] Checkpoint Migration Sample Tweak (Azure#29129)
The focus of these changes is to tweak the checkpoint migration sample to account for the potential that a blob exists for ownership purposes but does not have valid checkpoint data. Previously, the sample would emit a T2 checkpoint for these will potentially invalid data.
1 parent 6f154d9 commit 2597c22

File tree

2 files changed

+32
-8
lines changed

2 files changed

+32
-8
lines changed

sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Snippets/MigrationGuideSnippetsLiveTests.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,14 @@ public async Task MigrateCheckpoints()
100100
{
101101
var metadata = new Dictionary<string, string>()
102102
{
103-
{ offsetKey, checkpoint.Offset.ToString(CultureInfo.InvariantCulture) },
104-
{ sequenceKey, checkpoint.SequenceNumber.ToString(CultureInfo.InvariantCulture) }
103+
{ offsetKey, checkpoint.Offset.ToString(CultureInfo.InvariantCulture) }
105104
};
106105

106+
if (checkpoint.SequenceNumber.HasValue)
107+
{
108+
metadata[sequenceKey] = checkpoint.SequenceNumber.Value.ToString(CultureInfo.InvariantCulture);
109+
}
110+
107111
BlobClient blobClient = storageClient.GetBlobClient($"{ prefix }{ checkpoint.PartitionId }");
108112

109113
using var content = new MemoryStream(Array.Empty<byte>());
@@ -140,6 +144,14 @@ private IEnumerable<MigrationCheckpoint> ReadFakeLegacyCheckpoints(string fakeCo
140144
""Epoch"":1,
141145
""Offset"":""78"",
142146
""SequenceNumber"":39
147+
},
148+
{
149+
""PartitionId"":""3"",
150+
""Owner"":""eecd42df-a253-49d1-bb04-e5f00c106cfc"",
151+
""Token"":""6271aadb-801f-4ec7-a011-a008808a656c"",
152+
""Epoch"":1,
153+
""Offset"":""123"",
154+
""SequenceNumber"":null
143155
}]";
144156

145157
return JsonSerializer.Deserialize<MigrationCheckpoint[]>(checkpointJson);
@@ -172,7 +184,11 @@ private async Task<List<MigrationCheckpoint>> ReadLegacyCheckpoints(
172184
await (storageClient.GetBlobClient(blobItem.Name)).DownloadToAsync(blobContentStream);
173185

174186
var checkpoint = JsonSerializer.Deserialize<MigrationCheckpoint>(Encoding.UTF8.GetString(blobContentStream.ToArray()));
175-
checkpoints.Add(checkpoint);
187+
188+
if (!string.IsNullOrEmpty(checkpoint.Offset))
189+
{
190+
checkpoints.Add(checkpoint);
191+
}
176192
}
177193

178194
return checkpoints;
@@ -186,7 +202,7 @@ private class MigrationCheckpoint
186202
{
187203
public string PartitionId { get; set; }
188204
public string Offset { get; set; }
189-
public long SequenceNumber { get; set; }
205+
public long? SequenceNumber { get; set; }
190206
}
191207

192208
#endregion

sdk/eventhub/Azure.Messaging.EventHubs/MigrationGuide.md

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ private class MigrationCheckpoint
761761
{
762762
public string PartitionId { get; set; }
763763
public string Offset { get; set; }
764-
public long SequenceNumber { get; set; }
764+
public long? SequenceNumber { get; set; }
765765
}
766766
```
767767

@@ -818,10 +818,14 @@ foreach (var checkpoint in legacyCheckpoints)
818818
{
819819
var metadata = new Dictionary<string, string>()
820820
{
821-
{ offsetKey, checkpoint.Offset.ToString(CultureInfo.InvariantCulture) },
822-
{ sequenceKey, checkpoint.SequenceNumber.ToString(CultureInfo.InvariantCulture) }
821+
{ offsetKey, checkpoint.Offset.ToString(CultureInfo.InvariantCulture) }
823822
};
824823

824+
if (checkpoint.SequenceNumber.HasValue)
825+
{
826+
metadata[sequenceKey] = checkpoint.SequenceNumber.Value.ToString(CultureInfo.InvariantCulture);
827+
}
828+
825829
BlobClient blobClient = storageClient.GetBlobClient($"{ prefix }{ checkpoint.PartitionId }");
826830

827831
using var content = new MemoryStream(Array.Empty<byte>());
@@ -857,7 +861,11 @@ private async Task<List<MigrationCheckpoint>> ReadLegacyCheckpoints(
857861
await (storageClient.GetBlobClient(blobItem.Name)).DownloadToAsync(blobContentStream);
858862

859863
var checkpoint = JsonSerializer.Deserialize<MigrationCheckpoint>(Encoding.UTF8.GetString(blobContentStream.ToArray()));
860-
checkpoints.Add(checkpoint);
864+
865+
if (!string.IsNullOrEmpty(checkpoint.Offset))
866+
{
867+
checkpoints.Add(checkpoint);
868+
}
861869
}
862870

863871
return checkpoints;

0 commit comments

Comments
 (0)