Skip to content

Commit ffd48d4

Browse files
authored
Remove SchemaRegistryClient caching (Azure#24380)
* Remove builder caching references. * Remove caching from SchemaRegistryAsyncClient. Make methods public for Response. * Remove cached tests. * Adding service annotation.
1 parent 1d78acd commit ffd48d4

File tree

6 files changed

+26
-204
lines changed

6 files changed

+26
-204
lines changed

sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClient.java

Lines changed: 24 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020
import java.net.URI;
2121
import java.nio.charset.Charset;
2222
import java.nio.charset.StandardCharsets;
23-
import java.util.Map;
2423
import java.util.Objects;
25-
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.ConcurrentSkipListMap;
27-
import java.util.function.Function;
2824
import java.util.regex.Matcher;
2925
import java.util.regex.Pattern;
3026

@@ -53,20 +49,9 @@ public final class SchemaRegistryAsyncClient {
5349
private static final Pattern SCHEMA_PATTERN = Pattern.compile("/\\$schemagroups/(?<schemaGroup>.+)/schemas/(?<schemaName>.+?)/");
5450
private final ClientLogger logger = new ClientLogger(SchemaRegistryAsyncClient.class);
5551
private final AzureSchemaRegistry restService;
56-
private final Integer maxSchemaMapSize;
57-
private final ConcurrentSkipListMap<String, Function<String, Object>> typeParserMap;
58-
private final Map<String, SchemaProperties> idCache;
59-
private final Map<String, SchemaProperties> schemaStringCache;
6052

61-
SchemaRegistryAsyncClient(
62-
AzureSchemaRegistry restService,
63-
int maxSchemaMapSize,
64-
ConcurrentSkipListMap<String, Function<String, Object>> typeParserMap) {
53+
SchemaRegistryAsyncClient(AzureSchemaRegistry restService) {
6554
this.restService = restService;
66-
this.maxSchemaMapSize = maxSchemaMapSize;
67-
this.typeParserMap = typeParserMap;
68-
this.idCache = new ConcurrentHashMap<>();
69-
this.schemaStringCache = new ConcurrentHashMap<>();
7055
}
7156

7257
/**
@@ -119,11 +104,6 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, St
119104
name,
120105
content.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
121106

122-
schemaStringCache.putIfAbsent(getSchemaStringCacheKey(groupName, name, content),
123-
registered);
124-
idCache.putIfAbsent(schemaId.getId(), registered);
125-
126-
logger.verbose("Cached schema string. Group: '{}', name: '{}'", groupName, name);
127107
SimpleResponse<SchemaProperties> schemaRegistryObjectSimpleResponse = new SimpleResponse<>(
128108
response.getRequest(), response.getStatusCode(),
129109
response.getHeaders(), registered);
@@ -140,10 +120,6 @@ Mono<Response<SchemaProperties>> registerSchemaWithResponse(String groupName, St
140120
*/
141121
@ServiceMethod(returns = ReturnType.SINGLE)
142122
public Mono<SchemaProperties> getSchema(String id) {
143-
if (idCache.containsKey(id)) {
144-
logger.verbose("Cache hit for schema id '{}'", id);
145-
return Mono.fromCallable(() -> idCache.get(id));
146-
}
147123
return getSchemaWithResponse(id).map(Response::getValue);
148124
}
149125

@@ -154,7 +130,8 @@ public Mono<SchemaProperties> getSchema(String id) {
154130
*
155131
* @return The {@link SchemaProperties} associated with the given {@code id} along with the HTTP response.
156132
*/
157-
Mono<Response<SchemaProperties>> getSchemaWithResponse(String id) {
133+
@ServiceMethod(returns = ReturnType.SINGLE)
134+
public Mono<Response<SchemaProperties>> getSchemaWithResponse(String id) {
158135
return FluxUtil.withContext(context -> getSchemaWithResponse(id, context));
159136
}
160137

@@ -174,21 +151,12 @@ Mono<Response<SchemaProperties>> getSchemaWithResponse(String id, Context contex
174151
return;
175152
}
176153

177-
final String schemaGroup = matcher.group("schemaGroup");
178154
final String schemaName = matcher.group("schemaName");
179155
final SchemaProperties schemaObject = new SchemaProperties(id,
180156
serializationType,
181157
schemaName,
182158
response.getValue());
183-
final String schemaCacheKey = getSchemaStringCacheKey(schemaGroup, schemaName,
184-
new String(response.getValue(), SCHEMA_REGISTRY_SERVICE_ENCODING));
185-
186-
schemaStringCache.putIfAbsent(schemaCacheKey, schemaObject);
187-
idCache.putIfAbsent(id, schemaObject);
188-
189-
logger.verbose("Cached schema object. Path: '{}'", id);
190-
191-
SimpleResponse<SchemaProperties> schemaResponse = new SimpleResponse<>(
159+
final SimpleResponse<SchemaProperties> schemaResponse = new SimpleResponse<>(
192160
response.getRequest(), response.getStatusCode(),
193161
response.getHeaders(), schemaObject);
194162

@@ -210,16 +178,6 @@ Mono<Response<SchemaProperties>> getSchemaWithResponse(String id, Context contex
210178
@ServiceMethod(returns = ReturnType.SINGLE)
211179
public Mono<String> getSchemaId(String groupName, String name, String content,
212180
SerializationType serializationType) {
213-
214-
String schemaStringCacheKey = getSchemaStringCacheKey(groupName, name, content);
215-
216-
if (schemaStringCache.containsKey(schemaStringCacheKey)) {
217-
return Mono.fromCallable(() -> {
218-
logger.verbose("Cache hit schema string. Group: '{}', name: '{}'", groupName, name);
219-
return schemaStringCache.get(schemaStringCacheKey).getSchemaId();
220-
});
221-
}
222-
223181
return getSchemaIdWithResponse(groupName, name, content, serializationType)
224182
.map(response -> response.getValue());
225183
}
@@ -234,7 +192,8 @@ public Mono<String> getSchemaId(String groupName, String name, String content,
234192
*
235193
* @return The unique identifier for this schema.
236194
*/
237-
Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, String content,
195+
@ServiceMethod(returns = ReturnType.SINGLE)
196+
public Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, String content,
238197
SerializationType serializationType) {
239198

240199
return FluxUtil.withContext(context ->
@@ -256,27 +215,32 @@ Mono<Response<String>> getSchemaIdWithResponse(String groupName, String name, St
256215
SerializationType serializationType, Context context) {
257216

258217
return this.restService.getSchemas()
259-
.queryIdByContentWithResponseAsync(groupName, name,
260-
com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO, content)
218+
.queryIdByContentWithResponseAsync(groupName, name, getSerialization(serializationType), content)
261219
.handle((response, sink) -> {
262220
SchemaId schemaId = response.getValue();
263-
SchemaProperties properties = new SchemaProperties(schemaId.getId(), serializationType, name,
264-
content.getBytes(SCHEMA_REGISTRY_SERVICE_ENCODING));
265-
266-
schemaStringCache.putIfAbsent(
267-
getSchemaStringCacheKey(groupName, name, content), properties);
268-
idCache.putIfAbsent(schemaId.getId(), properties);
269-
270-
logger.verbose("Cached schema string. Group: '{}', name: '{}'", groupName, name);
271-
272221
SimpleResponse<String> schemaIdResponse = new SimpleResponse<>(
273222
response.getRequest(), response.getStatusCode(),
274223
response.getHeaders(), schemaId.getId());
224+
275225
sink.next(schemaIdResponse);
276226
});
277227
}
278228

279-
private static String getSchemaStringCacheKey(String groupName, String name, String content) {
280-
return groupName + name + content;
229+
/**
230+
* Gets the matching implementation class serialization type.
231+
*
232+
* @param serializationType Model serialization type.
233+
*
234+
* @return Implementation serialization type.
235+
*
236+
* @throws UnsupportedOperationException if the serialization type is not supported.
237+
*/
238+
private static com.azure.data.schemaregistry.implementation.models.SerializationType getSerialization(
239+
SerializationType serializationType) {
240+
if (serializationType == SerializationType.AVRO) {
241+
return com.azure.data.schemaregistry.implementation.models.SerializationType.AVRO;
242+
} else {
243+
throw new UnsupportedOperationException("Serialization type is not supported: " + serializationType);
244+
}
281245
}
282246
}

sdk/schemaregistry/azure-data-schemaregistry/src/main/java/com/azure/data/schemaregistry/SchemaRegistryClientBuilder.java

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737
import java.util.List;
3838
import java.util.Map;
3939
import java.util.Objects;
40-
import java.util.concurrent.ConcurrentSkipListMap;
41-
import java.util.function.Function;
4240

4341
/**
4442
* Fluent builder for interacting with the Schema Registry service via {@link SchemaRegistryAsyncClient} and
@@ -54,11 +52,8 @@
5452
* <p><strong>Instantiating with custom retry policy and HTTP log options</strong></p>
5553
* {@codesnippet com.azure.data.schemaregistry.schemaregistryasyncclient.retrypolicy.instantiation}
5654
*/
57-
@ServiceClientBuilder(serviceClients = SchemaRegistryAsyncClient.class)
55+
@ServiceClientBuilder(serviceClients = {SchemaRegistryAsyncClient.class, SchemaRegistryClient.class})
5856
public class SchemaRegistryClientBuilder {
59-
static final int MAX_SCHEMA_MAP_SIZE_DEFAULT = 1000;
60-
static final int MAX_SCHEMA_MAP_SIZE_MINIMUM = 10;
61-
6257
private final ClientLogger logger = new ClientLogger(SchemaRegistryClientBuilder.class);
6358

6459
private static final String DEFAULT_SCOPE = "https://eventhubs.azure.net/.default";
@@ -69,8 +64,6 @@ public class SchemaRegistryClientBuilder {
6964
private static final AddHeadersPolicy API_HEADER_POLICY = new AddHeadersPolicy(new HttpHeaders()
7065
.set("api-version", "2020-09-01-preview"));
7166

72-
private final ConcurrentSkipListMap<String, Function<String, Object>> typeParserMap;
73-
7467
private final List<HttpPipelinePolicy> perCallPolicies = new ArrayList<>();
7568
private final List<HttpPipelinePolicy> perRetryPolicies = new ArrayList<>();
7669

@@ -80,7 +73,6 @@ public class SchemaRegistryClientBuilder {
8073
private String endpoint;
8174
private String host;
8275
private HttpClient httpClient;
83-
private Integer maxSchemaMapSize;
8476
private TokenCredential credential;
8577
private ClientOptions clientOptions;
8678
private HttpLogOptions httpLogOptions;
@@ -93,8 +85,6 @@ public class SchemaRegistryClientBuilder {
9385
*/
9486
public SchemaRegistryClientBuilder() {
9587
this.httpLogOptions = new HttpLogOptions();
96-
this.maxSchemaMapSize = null;
97-
this.typeParserMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
9888
this.httpClient = null;
9989
this.credential = null;
10090
this.retryPolicy = new RetryPolicy("retry-after-ms", ChronoUnit.MILLIS);
@@ -132,24 +122,6 @@ public SchemaRegistryClientBuilder endpoint(String endpoint) {
132122
return this;
133123
}
134124

135-
/**
136-
* Sets schema cache size limit. If limit is exceeded on any cache, all caches are recycled.
137-
*
138-
* @param maxCacheSize max size for internal schema caches in {@link SchemaRegistryAsyncClient}
139-
* @return The updated {@link SchemaRegistryClientBuilder} object.
140-
* @throws IllegalArgumentException on invalid maxCacheSize value
141-
*/
142-
SchemaRegistryClientBuilder maxCacheSize(int maxCacheSize) {
143-
if (maxCacheSize < MAX_SCHEMA_MAP_SIZE_MINIMUM) {
144-
throw logger.logExceptionAsError(new IllegalArgumentException(
145-
String.format("Schema map size must be greater than %s entries",
146-
MAX_SCHEMA_MAP_SIZE_MINIMUM)));
147-
}
148-
149-
this.maxSchemaMapSize = maxCacheSize;
150-
return this;
151-
}
152-
153125
/**
154126
* Sets the HTTP client to use for sending and receiving requests to and from the service.
155127
*
@@ -333,11 +305,7 @@ public SchemaRegistryAsyncClient buildAsyncClient() {
333305
.pipeline(buildPipeline)
334306
.buildClient();
335307

336-
int buildMaxSchemaMapSize = (maxSchemaMapSize == null)
337-
? MAX_SCHEMA_MAP_SIZE_DEFAULT
338-
: maxSchemaMapSize;
339-
340-
return new SchemaRegistryAsyncClient(restService, buildMaxSchemaMapSize, typeParserMap);
308+
return new SchemaRegistryAsyncClient(restService);
341309
}
342310

343311
/**

sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryAsyncClientTests.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -222,35 +222,6 @@ public void registerBadRequest() {
222222
}).verify();
223223
}
224224

225-
/**
226-
* Verifies that we can register a schema and then get it by its schemaId.
227-
*/
228-
@Test
229-
public void registerAndGetCachedSchema() {
230-
// Arrange
231-
final String schemaName = testResourceNamer.randomName("sch", RESOURCE_LENGTH);
232-
final SchemaRegistryAsyncClient client1 = builder.buildAsyncClient();
233-
234-
final AtomicReference<String> schemaId = new AtomicReference<>();
235-
236-
// Act & Assert
237-
StepVerifier.create(client1.registerSchema(schemaGroup, schemaName, SCHEMA_CONTENT, SerializationType.AVRO))
238-
.assertNext(response -> {
239-
assertSchemaProperties(response, null, schemaName, SCHEMA_CONTENT);
240-
schemaId.set(response.getSchemaId());
241-
}).verifyComplete();
242-
243-
// Assert that we can get a schema based on its id. We registered a schema with client1 and its response is
244-
// cached, so it won't make a network call when getting the schema. client2 will not have this information.
245-
final String schemaIdToGet = schemaId.get();
246-
assertNotNull(schemaIdToGet);
247-
248-
// Act & Assert
249-
StepVerifier.create(client1.getSchema(schemaIdToGet))
250-
.assertNext(schema -> assertSchemaProperties(schema, schemaIdToGet, schemaName, SCHEMA_CONTENT))
251-
.verifyComplete();
252-
}
253-
254225
/**
255226
* Verifies that we get 404 when non-existent schema returned.
256227
*/

sdk/schemaregistry/azure-data-schemaregistry/src/test/java/com/azure/data/schemaregistry/SchemaRegistryClientTests.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -184,29 +184,6 @@ public void registerBadRequest() {
184184
assertEquals(400, exception.getResponse().getStatusCode());
185185
}
186186

187-
/**
188-
* Verifies that we can register a schema and then get it by its schemaId.
189-
*/
190-
@Test
191-
public void registerAndGetCachedSchema() {
192-
// Arrange
193-
final String schemaName = testResourceNamer.randomName("sch", RESOURCE_LENGTH);
194-
final SchemaRegistryClient client1 = builder.buildClient();
195-
196-
// Act & Assert
197-
final SchemaProperties response = client1.registerSchema(schemaGroup, schemaName, SCHEMA_CONTENT,
198-
SerializationType.AVRO);
199-
assertSchemaProperties(response, null, schemaName, SCHEMA_CONTENT);
200-
201-
// Assert that we can get a schema based on its id. We registered a schema with client1 and its response is
202-
// cached, so it won't make a network call when getting the schema.
203-
final String schemaIdToGet = response.getSchemaId();
204-
205-
// Act & Assert
206-
final SchemaProperties response2 = client1.getSchema(schemaIdToGet);
207-
assertSchemaProperties(response2, schemaIdToGet, schemaName, SCHEMA_CONTENT);
208-
}
209-
210187
/**
211188
* Verifies that we get 404 when non-existent schema returned.
212189
*/

sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryAsyncClientTests.registerAndGetCachedSchema.json

Lines changed: 0 additions & 29 deletions
This file was deleted.

sdk/schemaregistry/azure-data-schemaregistry/src/test/resources/session-records/SchemaRegistryClientTests.registerAndGetCachedSchema.json

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)