Skip to content

Commit de60c7c

Browse files
author
Komal Yadav
committed
[CDAP-21172] Implement CRUD Operations for Metadata Tables
updated updated updated updated updated updated updated [CDAP-21172] Implement CRUD Operations for Metadata Tables updated updated updated updated updated updated updated formatted updated updated resolved checkstyle warnings updated updated updated updated updated updated updated formatted formatted updated updated updated formatted formatted updated updated updated updated updated updated updated updated updated updated updated updated updated updated updated updated updated
1 parent 05432b1 commit de60c7c

File tree

4 files changed

+693
-50
lines changed

4 files changed

+693
-50
lines changed
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.cdap.metadata.spanner;
18+
19+
import io.cdap.cdap.api.data.schema.Schema;
20+
import io.cdap.cdap.api.data.schema.SchemaWalker;
21+
import io.cdap.cdap.api.metadata.MetadataEntity;
22+
import io.cdap.cdap.api.metadata.MetadataScope;
23+
import io.cdap.cdap.spi.metadata.Metadata;
24+
import io.cdap.cdap.spi.metadata.MetadataConstants;
25+
import io.cdap.cdap.spi.metadata.ScopedName;
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.HashSet;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.Objects;
34+
import java.util.Optional;
35+
import java.util.Set;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
/**
42+
* A helper class to process and prepare metadata fields for optimized storage.
43+
* This class encapsulates the logic for flattening text fields, parsing schemas,
44+
* and extracting specific properties for relational storage.
45+
*/
46+
public class FormattedMetadata {
47+
48+
private static final Logger LOG = LoggerFactory.getLogger(FormattedMetadata.class);
49+
50+
// Fields directly mapped to metadata table columns
51+
private final String namespace;
52+
private final String type;
53+
private final String name;
54+
private final Long created;
55+
private final String userText;
56+
private final String systemText;
57+
private final Set<Property> metadataProps;
58+
59+
/**
60+
* Creates an instance of FormattedMetadata from a MetadataEntity and Metadata.
61+
* This is the public entry point for creating formatted metadata.
62+
*
63+
* @param entity the metadata entity being processed.
64+
* @param metadata the metadata to be associated with the entity.
65+
* @return a new instance of FormattedMetadata.
66+
* @throws IOException if schema parsing fails.
67+
*/
68+
public static FormattedMetadata from(MetadataEntity entity, Metadata metadata) throws IOException {
69+
return new FormattedMetadata(entity, metadata);
70+
}
71+
72+
private FormattedMetadata(MetadataEntity entity, Metadata metadata) throws IOException {
73+
this.namespace = entity.getValue("namespace");
74+
this.type = entity.getType().toLowerCase();
75+
this.name = Objects.requireNonNull(entity.getValue(entity.getType())).toLowerCase();
76+
77+
Map<ScopedName, String> properties = metadata.getProperties();
78+
String schemaJson = properties.get(new ScopedName(MetadataScope.SYSTEM, MetadataConstants.SCHEMA_KEY));
79+
Map<String, Set<Property>> schemaProperty = reformatSchemaProperty(schemaJson);
80+
Map<String, Set<Property>> reformatedProperties = reformatProperties(properties);
81+
Set<Property> reformatedPropertiesWithValue = reformatedProperties
82+
.getOrDefault("extractedProperties", Collections.emptySet());
83+
reformatedPropertiesWithValue.addAll(schemaProperty.getOrDefault("schemaAndFieldNames",
84+
Collections.emptySet()));
85+
Set<ScopedName> tags = metadata.getTags();
86+
Set<Property> reformatedTags = reformatTags(tags);
87+
88+
this.metadataProps = new HashSet<>();
89+
this.metadataProps.addAll(reformatedPropertiesWithValue);
90+
this.metadataProps.addAll(reformatedTags);
91+
this.metadataProps.add(new Property(MetadataScope.SYSTEM.name(), this.type, this.name));
92+
this.systemText = buildText(reformatedPropertiesWithValue, tags, MetadataScope.SYSTEM) + " " + type;
93+
this.metadataProps.addAll(reformatedProperties.getOrDefault("propertyNames", Collections.emptySet()));
94+
this.metadataProps.addAll(schemaProperty.getOrDefault("fieldProperties", Collections.emptySet()));
95+
this.userText = buildText(reformatedPropertiesWithValue, tags, MetadataScope.USER);
96+
this.created = parseCreationTime(reformatedPropertiesWithValue).orElse(null);
97+
}
98+
99+
public String getNamespace() {
100+
return namespace;
101+
}
102+
103+
public String getType() {
104+
return type;
105+
}
106+
107+
public String getName() {
108+
return name;
109+
}
110+
111+
public Optional<Long> getCreated() {
112+
return Optional.ofNullable(created);
113+
}
114+
115+
public String getUserText() {
116+
return userText;
117+
}
118+
119+
public String getSystemText() {
120+
return systemText;
121+
}
122+
123+
public Set<Property> getMetadataProps() {
124+
return metadataProps;
125+
}
126+
127+
/**
128+
* Processes properties and extract property names into a Set.
129+
*
130+
* @param properties The original properties from the metadata object.
131+
* @return A Set containing Property objects.
132+
*/
133+
private Map<String, Set<Property>> reformatProperties(Map<ScopedName, String> properties) {
134+
if (properties == null || properties.isEmpty()) {
135+
return Collections.emptyMap();
136+
}
137+
138+
Set<Property> extracted = new HashSet<>(properties.size());
139+
Set<String> propertyNames = new HashSet<>();
140+
for (Map.Entry<ScopedName, String> entry : properties.entrySet()) {
141+
ScopedName key = entry.getKey();
142+
String name = key.getName().toLowerCase();
143+
String scope = key.getScope().name();
144+
String value = entry.getValue().toLowerCase();
145+
146+
// If it's a schema key, reformat it.
147+
if (MetadataConstants.SCHEMA_KEY.equals(name)) {
148+
continue;
149+
}
150+
151+
extracted.add(new Property(scope, name, value));
152+
propertyNames.add(name);
153+
}
154+
155+
String allPropertiesValue = String.join(" ", propertyNames);
156+
Set<Property> propertyValue = new HashSet<>();
157+
propertyValue.add(new Property(MetadataScope.SYSTEM.name(),
158+
MetadataConstants.PROPERTIES_KEY, allPropertiesValue));
159+
160+
Map<String, Set<Property>> result = new HashMap<>();
161+
result.put("extractedProperties", extracted);
162+
result.put("propertyNames", propertyValue);
163+
164+
return result;
165+
166+
}
167+
168+
/**
169+
* Extracts tags into the set format.
170+
*/
171+
private Set<Property> reformatTags(Set<ScopedName> tags) {
172+
return tags.stream()
173+
.map(tag -> new Property(tag.getScope().name(), MetadataConstants.TAGS_KEY, tag.getName()))
174+
.collect(Collectors.toSet());
175+
}
176+
177+
/**
178+
* Builds a single, space-delimited string of text from properties and tags
179+
* for a given scope.
180+
*/
181+
private String buildText(Set<Property> properties, Set<ScopedName> tags, MetadataScope scope) {
182+
String scopeProperties = properties.stream()
183+
.filter(property -> Objects.equals(property.getScope(), scope.toString()))
184+
.map(property -> property.getValue().toLowerCase())
185+
.collect(Collectors.joining(" "));
186+
187+
String scopeTags = tags.stream()
188+
.filter(tag -> tag.getScope() == scope)
189+
.map(tag -> tag.getName().toLowerCase())
190+
.collect(Collectors.joining(" "));
191+
192+
return Stream.of(scopeProperties, scopeTags)
193+
.filter(s -> !s.isEmpty())
194+
.collect(Collectors.joining(" "));
195+
}
196+
197+
/**
198+
* Finds and parses the creation time from the processed properties.
199+
*/
200+
private Optional<Long> parseCreationTime(Set<Property> properties) {
201+
if (properties == null || properties.isEmpty()) {
202+
return Optional.empty();
203+
}
204+
205+
String expectedScope = MetadataScope.SYSTEM.name();
206+
String expectedName = MetadataConstants.CREATION_TIME_KEY;
207+
for (Property property : properties) {
208+
if (expectedScope.equalsIgnoreCase(property.getScope()) && expectedName.equalsIgnoreCase(property.getName())) {
209+
try {
210+
return Optional.of(Long.parseLong(property.getValue()));
211+
} catch (NumberFormatException e) {
212+
LOG.warn("Unable to parse property value '{}' as a long for the creation time key. Skipping.",
213+
property.getValue(), e);
214+
return Optional.empty();
215+
}
216+
}
217+
}
218+
219+
return Optional.empty();
220+
}
221+
222+
/**
223+
* Parses a schema JSON string into a concise, human-readable format.
224+
*
225+
* @param schemaStr The raw JSON string representing the schema.
226+
* @return A formatted string (e.g., "schemaname:TYPE field1:TYPE1 field2:TYPE2").
227+
* @throws IOException if the schema string cannot be parsed.
228+
*/
229+
private Map<String, Set<Property>> reformatSchemaProperty(String schemaStr) throws IOException {
230+
if (schemaStr == null) {
231+
return Collections.emptyMap();
232+
}
233+
234+
Set<Property> fieldProperties = new HashSet<>();
235+
Set<Property> schemaAndFieldNames = new HashSet<>();
236+
Schema schema = Schema.parseJson(schemaStr);
237+
238+
List<String> formattedFields = new ArrayList<>();
239+
SchemaWalker.walk(schema, (fieldName, fieldSchema) -> {
240+
if (fieldName != null) {
241+
Schema nonNullableSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
242+
String typeName = nonNullableSchema.getType().toString().toLowerCase();
243+
fieldProperties.add(new Property(
244+
MetadataScope.SYSTEM.name(),
245+
fieldName.toLowerCase(),
246+
typeName));
247+
formattedFields.add(fieldName.toLowerCase() + ":" + typeName.toLowerCase());
248+
}
249+
});
250+
251+
String schemaProperties = formattedFields.isEmpty() ? "" : String.join(" ", formattedFields);
252+
schemaAndFieldNames.add(new Property(
253+
MetadataScope.SYSTEM.name(),
254+
"schema",
255+
schemaProperties));
256+
257+
Map<String, Set<Property>> result = new HashMap<>();
258+
result.put("fieldProperties", fieldProperties);
259+
result.put("schemaAndFieldNames", schemaAndFieldNames);
260+
261+
return result;
262+
}
263+
264+
/**
265+
* Represents a property for the Spanner metadata_props table.
266+
*/
267+
static class Property {
268+
private final String scope;
269+
private final String name;
270+
private final String value;
271+
272+
Property(String scope, String name, String value) {
273+
this.scope = scope;
274+
this.name = name;
275+
this.value = value;
276+
}
277+
278+
public String getScope() {
279+
return scope;
280+
}
281+
282+
public String getName() {
283+
return name;
284+
}
285+
286+
public String getValue() {
287+
return value;
288+
}
289+
290+
/**
291+
* Checks if this Property is equal to another object.
292+
*/
293+
public boolean equals(Object o) {
294+
if (this == o) {
295+
return true;
296+
}
297+
if (o == null || getClass() != o.getClass()) {
298+
return false;
299+
}
300+
301+
Property property = (Property) o;
302+
return Objects.equals(scope, property.scope)
303+
&& Objects.equals(name, property.name)
304+
&& Objects.equals(value, property.value);
305+
}
306+
307+
public int hashCode() {
308+
return Objects.hash(scope, name, value);
309+
}
310+
311+
public String toString() {
312+
return scope + ':' + name + ':' + value;
313+
}
314+
}
315+
}
316+

0 commit comments

Comments
 (0)