Skip to content

Commit 042eef1

Browse files
Add caching to SchemaRegistryClient (Azure#23274)
* Add caching to SchemaRegistryClient * Add comments for warnings being disabled
1 parent 276b6f0 commit 042eef1

File tree

14 files changed

+452
-261
lines changed

14 files changed

+452
-261
lines changed

sdk/schemaregistry/Azure.Data.SchemaRegistry/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Release History
22

3-
## 1.0.0-beta.3 (Unreleased)
4-
3+
## 1.0.0-beta.3 (2021-08-16)
4+
- Added caching to `SchemaRegistryClient`
55

66
## 1.0.0-beta.2 (2020-09-22)
77
- Fixed schema encoding issue

sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Azure Schema Registry is a schema repository service hosted by Azure Event Hubs,
99
Install the Azure Schema Registry client library for .NET with [NuGet][nuget]:
1010

1111
```bash
12-
dotnet add package Azure.Data.SchemaRegistry --version 1.0.0-beta.1
12+
dotnet add package --prerelease Azure.Data.SchemaRegistry
1313
```
1414

1515
### Prerequisites
@@ -125,17 +125,17 @@ string schemaContent = @"
125125
]
126126
}";
127127

128-
Response<SchemaProperties> schemaProperties = client.GetSchemaId(groupName, schemaName, schemaType, schemaContent);
129-
string schemaId = schemaProperties.Value.Id;
128+
SchemaProperties schemaProperties = client.GetSchemaId(groupName, schemaName, schemaType, schemaContent);
129+
string schemaId = schemaProperties.Id;
130130
```
131131

132132
### Retrieve a schema
133133

134134
Retrieve a previously registered schema's content from the Azure Schema Registry.
135135

136136
```C# Snippet:SchemaRegistryRetrieveSchema
137-
Response<SchemaProperties> schemaProperties = client.GetSchema(schemaId);
138-
string schemaContent = schemaProperties.Value.Content;
137+
SchemaProperties schemaProperties = client.GetSchema(schemaId);
138+
string schemaContent = schemaProperties.Content;
139139
```
140140

141141
## Troubleshooting

sdk/schemaregistry/Azure.Data.SchemaRegistry/api/Azure.Data.SchemaRegistry.netstandard2.0.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ public partial class SchemaRegistryClient
1111
protected SchemaRegistryClient() { }
1212
public SchemaRegistryClient(string endpoint, Azure.Core.TokenCredential credential) { }
1313
public SchemaRegistryClient(string endpoint, Azure.Core.TokenCredential credential, Azure.Data.SchemaRegistry.SchemaRegistryClientOptions options) { }
14-
public virtual Azure.Response<Azure.Data.SchemaRegistry.SchemaProperties> GetSchema(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
15-
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Data.SchemaRegistry.SchemaProperties>> GetSchemaAsync(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
16-
public virtual Azure.Response<Azure.Data.SchemaRegistry.SchemaProperties> GetSchemaId(string groupName, string schemaName, Azure.Data.SchemaRegistry.SerializationType serializationType, string schemaContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
17-
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Data.SchemaRegistry.SchemaProperties>> GetSchemaIdAsync(string groupName, string schemaName, Azure.Data.SchemaRegistry.SerializationType serializationType, string schemaContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
14+
public virtual Azure.Data.SchemaRegistry.SchemaProperties GetSchema(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
15+
public virtual System.Threading.Tasks.ValueTask<Azure.Data.SchemaRegistry.SchemaProperties> GetSchemaAsync(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
16+
public virtual Azure.Data.SchemaRegistry.SchemaProperties GetSchemaId(string groupName, string schemaName, Azure.Data.SchemaRegistry.SerializationType serializationType, string schemaContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
17+
public virtual System.Threading.Tasks.ValueTask<Azure.Data.SchemaRegistry.SchemaProperties> GetSchemaIdAsync(string groupName, string schemaName, Azure.Data.SchemaRegistry.SerializationType serializationType, string schemaContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
1818
public virtual Azure.Response<Azure.Data.SchemaRegistry.SchemaProperties> RegisterSchema(string groupName, string schemaName, Azure.Data.SchemaRegistry.SerializationType serializationType, string schemaContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
1919
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Data.SchemaRegistry.SchemaProperties>> RegisterSchemaAsync(string groupName, string schemaName, Azure.Data.SchemaRegistry.SerializationType serializationType, string schemaContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
2020
}

sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaRegistryClient.cs

Lines changed: 118 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
// Licensed under the MIT License.
33

44
using System;
5-
using System.Globalization;
6-
using System.Text.Encodings.Web;
7-
using System.Text.Json;
5+
using System.Collections.Generic;
86
using System.Threading;
97
using System.Threading.Tasks;
108
using Azure.Core;
119
using Azure.Core.Pipeline;
10+
using Azure.Data.SchemaRegistry.Models;
1211

1312
namespace Azure.Data.SchemaRegistry
1413
{
@@ -21,6 +20,9 @@ public class SchemaRegistryClient
2120
internal SchemaRestClient RestClient { get; }
2221
private const string CredentialScope = "https://eventhubs.azure.net/.default";
2322

23+
private readonly Dictionary<string, SchemaProperties> _schemaIdToPropertiesMap = new();
24+
private readonly Dictionary<(string, string, string, SerializationType), SchemaProperties> _contentToPropertiesMap = new();
25+
2426
/// <summary>
2527
/// Initializes a new instance of the <see cref="SchemaRegistryClient"/>.
2628
/// </summary>
@@ -68,22 +70,14 @@ internal SchemaRegistryClient(ClientDiagnostics clientDiagnostics, HttpPipeline
6870
/// <param name="schemaContent">The string representation of the schema's content.</param>
6971
/// <param name="cancellationToken">The cancellation token for the operation.</param>
7072
/// <returns>The properties of the schema.</returns>
71-
public virtual async Task<Response<SchemaProperties>> RegisterSchemaAsync(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default)
72-
{
73-
using DiagnosticScope scope = _clientDiagnostics.CreateScope(RegisterSchemaScopeName);
74-
scope.Start();
75-
try
76-
{
77-
var response = await RestClient.RegisterAsync(groupName, schemaName, serializationType, schemaContent, cancellationToken).ConfigureAwait(false);
78-
var properties = new SchemaProperties(schemaContent, response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion);
79-
return Response.FromValue(properties, response);
80-
}
81-
catch (Exception e)
82-
{
83-
scope.Failed(e);
84-
throw;
85-
}
86-
}
73+
public virtual async Task<Response<SchemaProperties>> RegisterSchemaAsync(
74+
string groupName,
75+
string schemaName,
76+
SerializationType serializationType,
77+
string schemaContent,
78+
CancellationToken cancellationToken = default) =>
79+
await RegisterSchemaInternalAsync(groupName, schemaName, serializationType, schemaContent, true, cancellationToken)
80+
.ConfigureAwait(false);
8781

8882
/// <summary>
8983
/// Registers a schema with the SchemaRegistry service.
@@ -96,14 +90,41 @@ public virtual async Task<Response<SchemaProperties>> RegisterSchemaAsync(string
9690
/// <param name="schemaContent">The string representation of the schema's content.</param>
9791
/// <param name="cancellationToken">The cancellation token for the operation.</param>
9892
/// <returns>The properties of the schema.</returns>
99-
public virtual Response<SchemaProperties> RegisterSchema(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default)
93+
public virtual Response<SchemaProperties> RegisterSchema(
94+
string groupName,
95+
string schemaName,
96+
SerializationType serializationType,
97+
string schemaContent,
98+
CancellationToken cancellationToken = default) =>
99+
RegisterSchemaInternalAsync(groupName, schemaName, serializationType, schemaContent, false, cancellationToken)
100+
.EnsureCompleted();
101+
102+
private async Task<Response<SchemaProperties>> RegisterSchemaInternalAsync(
103+
string groupName,
104+
string schemaName,
105+
SerializationType serializationType,
106+
string schemaContent,
107+
bool async,
108+
CancellationToken cancellationToken = default)
100109
{
101110
using DiagnosticScope scope = _clientDiagnostics.CreateScope(RegisterSchemaScopeName);
102111
scope.Start();
103112
try
104113
{
105-
var response = RestClient.Register(groupName, schemaName, serializationType, schemaContent, cancellationToken);
114+
ResponseWithHeaders<SchemaId, SchemaRegisterHeaders> response;
115+
if (async)
116+
{
117+
response = await RestClient.RegisterAsync(groupName, schemaName, serializationType, schemaContent, cancellationToken).ConfigureAwait(false);
118+
}
119+
else
120+
{
121+
response = RestClient.Register(groupName, schemaName, serializationType, schemaContent, cancellationToken);
122+
}
123+
106124
var properties = new SchemaProperties(schemaContent, response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion);
125+
_schemaIdToPropertiesMap[properties.Id] = properties;
126+
_contentToPropertiesMap[(groupName, schemaName, schemaContent, serializationType)] = properties;
127+
107128
return Response.FromValue(properties, response);
108129
}
109130
catch (Exception e)
@@ -122,22 +143,16 @@ public virtual Response<SchemaProperties> RegisterSchema(string groupName, strin
122143
/// <param name="schemaContent">The string representation of the schema's content.</param>
123144
/// <param name="cancellationToken">The cancellation token for the operation.</param>
124145
/// <returns>The properties of the schema, including the schema ID provided by the service.</returns>
125-
public virtual async Task<Response<SchemaProperties>> GetSchemaIdAsync(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default)
126-
{
127-
using DiagnosticScope scope = _clientDiagnostics.CreateScope(GetSchemaIdScopeName);
128-
scope.Start();
129-
try
130-
{
131-
var response = await RestClient.QueryIdByContentAsync(groupName, schemaName, serializationType, schemaContent, cancellationToken).ConfigureAwait(false);
132-
var properties = new SchemaProperties(schemaContent, response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion);
133-
return Response.FromValue(properties, response);
134-
}
135-
catch (Exception e)
136-
{
137-
scope.Failed(e);
138-
throw;
139-
}
140-
}
146+
#pragma warning disable AZC0015 // Unexpected client method return type.
147+
public virtual async ValueTask<SchemaProperties> GetSchemaIdAsync(
148+
string groupName,
149+
string schemaName,
150+
SerializationType serializationType,
151+
string schemaContent,
152+
CancellationToken cancellationToken = default) =>
153+
#pragma warning restore AZC0015 // Unexpected client method return type.
154+
await GetSchemaIdInternalAsync(groupName, schemaName, serializationType, schemaContent, true, cancellationToken)
155+
.ConfigureAwait(false);
141156

142157
/// <summary>
143158
/// Gets the schema ID associated with the schema from the SchemaRegistry service.
@@ -148,15 +163,50 @@ public virtual async Task<Response<SchemaProperties>> GetSchemaIdAsync(string gr
148163
/// <param name="schemaContent">The string representation of the schema's content.</param>
149164
/// <param name="cancellationToken">The cancellation token for the operation.</param>
150165
/// <returns>The properties of the schema, including the schema ID provided by the service.</returns>
151-
public virtual Response<SchemaProperties> GetSchemaId(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default)
166+
#pragma warning disable AZC0015 // Unexpected client method return type.
167+
public virtual SchemaProperties GetSchemaId(
168+
string groupName,
169+
string schemaName,
170+
SerializationType serializationType,
171+
string schemaContent,
172+
CancellationToken cancellationToken = default) =>
173+
#pragma warning restore AZC0015 // Unexpected client method return type.
174+
GetSchemaIdInternalAsync(groupName, schemaName, serializationType, schemaContent, false, cancellationToken).EnsureCompleted();
175+
176+
private async ValueTask<SchemaProperties> GetSchemaIdInternalAsync(
177+
string groupName,
178+
string schemaName,
179+
SerializationType serializationType,
180+
string schemaContent,
181+
bool async,
182+
CancellationToken cancellationToken)
152183
{
184+
if (_contentToPropertiesMap.TryGetValue(
185+
(groupName, schemaName, schemaContent, serializationType),
186+
out SchemaProperties schema))
187+
{
188+
return schema;
189+
}
190+
153191
using DiagnosticScope scope = _clientDiagnostics.CreateScope(GetSchemaIdScopeName);
154192
scope.Start();
155193
try
156194
{
157-
var response = RestClient.QueryIdByContent(groupName, schemaName, serializationType, schemaContent, cancellationToken);
195+
ResponseWithHeaders<SchemaId, SchemaQueryIdByContentHeaders> response;
196+
if (async)
197+
{
198+
response = await RestClient.QueryIdByContentAsync(groupName, schemaName, serializationType, schemaContent, cancellationToken).ConfigureAwait(false);
199+
}
200+
else
201+
{
202+
response = RestClient.QueryIdByContent(groupName, schemaName, serializationType, schemaContent, cancellationToken);
203+
}
204+
158205
var properties = new SchemaProperties(schemaContent, response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion);
159-
return Response.FromValue(properties, response);
206+
_schemaIdToPropertiesMap[properties.Id] = properties;
207+
_contentToPropertiesMap[(groupName, schemaName, schemaContent, serializationType)] = properties;
208+
209+
return properties;
160210
}
161211
catch (Exception e)
162212
{
@@ -171,38 +221,47 @@ public virtual Response<SchemaProperties> GetSchemaId(string groupName, string s
171221
/// <param name="schemaId">The schema ID of the the schema from the SchemaRegistry.</param>
172222
/// <param name="cancellationToken">The cancellation token for the operation.</param>
173223
/// <returns>The properties of the schema, including the schema content provided by the service.</returns>
174-
public virtual async Task<Response<SchemaProperties>> GetSchemaAsync(string schemaId, CancellationToken cancellationToken = default)
175-
{
176-
using DiagnosticScope scope = _clientDiagnostics.CreateScope(GetSchemaScopeName);
177-
scope.Start();
178-
try
179-
{
180-
var response = await RestClient.GetByIdAsync(schemaId, cancellationToken).ConfigureAwait(false);
181-
var properties = new SchemaProperties(response.Value, response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion);
182-
return Response.FromValue(properties, response);
183-
}
184-
catch (Exception e)
185-
{
186-
scope.Failed(e);
187-
throw;
188-
}
189-
}
224+
#pragma warning disable AZC0015 // Unexpected client method return type.
225+
public virtual async ValueTask<SchemaProperties> GetSchemaAsync(string schemaId, CancellationToken cancellationToken = default) =>
226+
#pragma warning restore AZC0015 // Unexpected client method return type.
227+
await GetSchemaInternalAsync(schemaId, true, cancellationToken).ConfigureAwait(false);
190228

191229
/// <summary>
192230
/// Gets the schema content associated with the schema ID from the SchemaRegistry service.
193231
/// </summary>
194232
/// <param name="schemaId">The schema ID of the the schema from the SchemaRegistry.</param>
195233
/// <param name="cancellationToken">The cancellation token for the operation.</param>
196234
/// <returns>The properties of the schema, including the schema content provided by the service.</returns>
197-
public virtual Response<SchemaProperties> GetSchema(string schemaId, CancellationToken cancellationToken = default)
235+
#pragma warning disable AZC0015 // Unexpected client method return type.
236+
public virtual SchemaProperties GetSchema(string schemaId, CancellationToken cancellationToken = default) =>
237+
#pragma warning restore AZC0015 // Unexpected client method return type.
238+
GetSchemaInternalAsync(schemaId, false, cancellationToken).EnsureCompleted();
239+
240+
private async ValueTask<SchemaProperties> GetSchemaInternalAsync(string schemaId, bool async, CancellationToken cancellationToken)
198241
{
242+
if (_schemaIdToPropertiesMap.TryGetValue(schemaId, out SchemaProperties schema))
243+
{
244+
return schema;
245+
}
246+
199247
using DiagnosticScope scope = _clientDiagnostics.CreateScope(GetSchemaScopeName);
200248
scope.Start();
201249
try
202250
{
203-
var response = RestClient.GetById(schemaId, cancellationToken);
251+
ResponseWithHeaders<string, SchemaGetByIdHeaders> response;
252+
if (async)
253+
{
254+
response = await RestClient.GetByIdAsync(schemaId, cancellationToken).ConfigureAwait(false);
255+
}
256+
else
257+
{
258+
response = RestClient.GetById(schemaId, cancellationToken);
259+
}
260+
204261
var properties = new SchemaProperties(response.Value, response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion);
205-
return Response.FromValue(properties, response);
262+
_schemaIdToPropertiesMap[schemaId] = properties;
263+
264+
return properties;
206265
}
207266
catch (Exception e)
208267
{

sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ public void RetrieveSchemaId()
7979
]
8080
}";
8181

82-
Response<SchemaProperties> schemaProperties = client.GetSchemaId(groupName, schemaName, schemaType, schemaContent);
83-
string schemaId = schemaProperties.Value.Id;
82+
SchemaProperties schemaProperties = client.GetSchemaId(groupName, schemaName, schemaType, schemaContent);
83+
string schemaId = schemaProperties.Id;
8484
#endregion
8585

8686
Assert.AreEqual(_schemaProperties.Id, schemaId);
@@ -93,8 +93,8 @@ public void RetrieveSchema()
9393
var schemaId = _schemaProperties.Id;
9494

9595
#region Snippet:SchemaRegistryRetrieveSchema
96-
Response<SchemaProperties> schemaProperties = client.GetSchema(schemaId);
97-
string schemaContent = schemaProperties.Value.Content;
96+
SchemaProperties schemaProperties = client.GetSchema(schemaId);
97+
string schemaContent = schemaProperties.Content;
9898
#endregion
9999

100100
Assert.AreEqual(Regex.Replace(_schemaProperties.Content, @"\s+", string.Empty), schemaContent);

0 commit comments

Comments
 (0)