Skip to content

Commit 79d7f0b

Browse files
author
Komal Yadav
committed
updated
updated
1 parent 5eb05cc commit 79d7f0b

File tree

3 files changed

+64
-39
lines changed

3 files changed

+64
-39
lines changed

cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/MetadataFormatter.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.Set;
3535
import java.util.stream.Collectors;
3636
import java.util.stream.Stream;
37-
3837
import org.slf4j.Logger;
3938
import org.slf4j.LoggerFactory;
4039

@@ -78,10 +77,10 @@ public MetadataFormatter(MetadataEntity entity, Metadata metadata) throws IOExce
7877

7978
this.metadataProps = new HashSet<>();
8079
this.metadataProps.add(new Property(MetadataScope.SYSTEM.name(), this.type, this.name));
81-
this.metadataProps.addAll(reformatedProperties);
8280
this.metadataProps.addAll(reformatedTags);
81+
this.metadataProps.addAll(reformatedProperties);
8382
this.userText = buildText(reformatedProperties, tags, MetadataScope.USER);
84-
this.systemText = buildText(reformatedProperties, tags, MetadataScope.SYSTEM)+type;
83+
this.systemText = buildText(reformatedProperties, tags, MetadataScope.SYSTEM) + type;
8584
this.created = validateCreationTime(reformatedProperties).orElse(null);
8685
}
8786

@@ -131,13 +130,15 @@ private Set<Property> reformatProperties(Map<ScopedName, String> properties) thr
131130
ScopedName key = entry.getKey();
132131
String name = key.getName().toLowerCase();
133132
String scope = key.getScope().name();
133+
String value = entry.getValue();
134134

135135
// If it's a schema key, reformat it.
136-
String valueForProperty = MetadataConstants.SCHEMA_KEY.equals(name) ?
137-
reformatSchemaProperty(entry.getValue()) :
138-
entry.getValue();
136+
if (MetadataConstants.SCHEMA_KEY.equals(name)) {
137+
extracted.addAll(reformatSchemaProperty(value));
138+
continue;
139+
}
139140

140-
extracted.add(new Property(scope, name, valueForProperty));
141+
extracted.add(new Property(scope, name, value));
141142
propertyNames.add(name);
142143
}
143144

@@ -207,18 +208,36 @@ private Optional<Long> validateCreationTime(Set<Property> properties) {
207208
* @return A formatted string (e.g., "schemaname:TYPE field1:TYPE1 field2:TYPE2").
208209
* @throws IOException if the schema string cannot be parsed.
209210
*/
210-
private String reformatSchemaProperty(String schemaStr) throws IOException {
211+
private List<Property> reformatSchemaProperty(String schemaStr) throws IOException {
212+
List<Property> schemaProperties = new ArrayList<>();
211213
Schema schema = Schema.parseJson(schemaStr);
214+
String schemaName = Objects.requireNonNull(schema.getRecordName()).toLowerCase();
215+
String schemaType = schema.getType().toString().toLowerCase();
216+
schemaProperties.add(new Property(
217+
MetadataScope.SYSTEM.name(),
218+
schemaName,
219+
schemaType));
220+
212221
List<String> formattedFields = new ArrayList<>();
213222
SchemaWalker.walk(schema, (fieldName, fieldSchema) -> {
214223
if (fieldName != null) {
215224
Schema nonNullableSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
216225
String typeName = nonNullableSchema.getType().toString();
226+
schemaProperties.add(new Property(
227+
MetadataScope.SYSTEM.name(),
228+
fieldName.toLowerCase(),
229+
typeName));
217230
formattedFields.add(fieldName.toLowerCase() + ":" + typeName.toLowerCase());
218231
}
219232
});
220233

221-
return formattedFields.isEmpty() ? "" : String.join(" ", formattedFields);
234+
String schemaAndFieldNames = formattedFields.isEmpty() ? "" : String.join(" ", formattedFields);
235+
schemaProperties.add(new Property(
236+
MetadataScope.SYSTEM.name(),
237+
"schema",
238+
schemaAndFieldNames));
239+
240+
return schemaProperties;
222241
}
223242

224243
/**

cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/MetadataMutator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@
4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;
5151

52+
/**
53+
* Generates Spanner mutations from metadata changes.
54+
*/
5255
public class MetadataMutator {
5356

5457
private static final Logger LOG = LoggerFactory.getLogger(MetadataMutator.class);
@@ -145,11 +148,14 @@ private static ChangeRequest create(VersionedMetadata before, MetadataMutation.C
145148
// These variables describe the property's state change.
146149
boolean propertyWasChanged = !existingValue.equals(newValue);
147150
boolean propertyWasRemoved = newValue == null;
148-
if ((directive == MetadataDirective.PRESERVE && propertyWasChanged) ||
149-
(directive == MetadataDirective.KEEP && propertyWasRemoved)) {
151+
if ((directive == MetadataDirective.PRESERVE && propertyWasChanged)
152+
|| (directive == MetadataDirective.KEEP && propertyWasRemoved)) {
150153
finalProperties.put(scopedName, existingValue);
151154
}
152155
break;
156+
157+
default:
158+
throw new IllegalArgumentException("Unknown or unhandled MetadataKind: " + key.getKind());
153159
}
154160
});
155161

cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorage.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -170,23 +170,23 @@ private void getCreateTableDDLStatement() throws IOException {
170170
*/
171171
private String getCreateMetadataTableDDLStatement() {
172172
return String.format(
173-
"CREATE TABLE IF NOT EXISTS %s (" + // metadata
174-
"%s STRING(MAX) NOT NULL," + // metadata_id
175-
"%s STRING(MAX) NOT NULL," + // namespace
176-
"%s STRING(MAX) NOT NULL," + // entity_type
177-
"%s STRING(MAX) NOT NULL," + // name
178-
"%s INT64," + // create_time
179-
"%s STRING(MAX)," + // user
180-
"%s STRING(MAX)," + // system
181-
"%s JSON," + // metadata_column
182-
"%s INT64 NOT NULL," + // version
183-
"user_tokens TOKENLIST AS " + // user_tokens list
184-
"(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN," +
185-
"system_tokens TOKENLIST AS " + // system_tokens list
186-
"(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN," +
187-
"text_tokens TOKENLIST AS " + // text_tokens list
188-
"(TOKENLIST_CONCAT([User_Tokens, System_Tokens])) HIDDEN," +
189-
") PRIMARY KEY (%s) ", // metadata_id
173+
"CREATE TABLE IF NOT EXISTS %s (" // metadata
174+
+ "%s STRING(MAX) NOT NULL," // metadata_id
175+
+ "%s STRING(MAX) NOT NULL," // namespace
176+
+ "%s STRING(MAX) NOT NULL," // entity_type
177+
+ "%s STRING(MAX) NOT NULL," // name
178+
+ "%s INT64," // create_time
179+
+ "%s STRING(MAX)," // user
180+
+ "%s STRING(MAX)," // system
181+
+ "%s JSON," // metadata_column
182+
+ "%s INT64 NOT NULL," // version
183+
+ "user_tokens TOKENLIST AS " // user_tokens list
184+
+ "(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN,"
185+
+ "system_tokens TOKENLIST AS " // system_tokens list
186+
+ "(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN,"
187+
+ "text_tokens TOKENLIST AS " // text_tokens list
188+
+ "(TOKENLIST_CONCAT([User_Tokens, System_Tokens])) HIDDEN,"
189+
+ ") PRIMARY KEY (%s) ", // metadata_id
190190
METADATA_TABLE,
191191
Tables.Metadata.METADATA_ID_FIELD,
192192
Tables.Metadata.NAMESPACE_FIELD,
@@ -229,17 +229,17 @@ private String getCreateMetadataTableDDLStatement() {
229229
*/
230230
private String getCreateMetadataPropsTableDDLStatement() {
231231
return String.format(
232-
"CREATE TABLE IF NOT EXISTS %s (" +
233-
"%s STRING(MAX) NOT NULL," + // metadata_id
234-
"%s STRING(MAX) NOT NULL," + // namespace
235-
"%s STRING(MAX) NOT NULL," + // entity_type
236-
"%s STRING(MAX) NOT NULL," + // name
237-
"%s STRING(MAX)," + // scope
238-
"%s STRING(MAX)," + // value
239-
"value_tokens TOKENLIST AS " + // value_tokens list
240-
"(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN," +
241-
") PRIMARY KEY (%s, %s, %s) ," + // metadata_id, name, scope
242-
"INTERLEAVE IN PARENT %s ON DELETE CASCADE",
232+
"CREATE TABLE IF NOT EXISTS %s ("
233+
+ "%s STRING(MAX) NOT NULL," // metadata_id
234+
+ "%s STRING(MAX) NOT NULL," // namespace
235+
+ "%s STRING(MAX) NOT NULL," // entity_type
236+
+ "%s STRING(MAX) NOT NULL," // name
237+
+ "%s STRING(MAX)," // scope
238+
+ "%s STRING(MAX)," // value
239+
+ "value_tokens TOKENLIST AS " // value_tokens list
240+
+ "(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN,"
241+
+ ") PRIMARY KEY (%s, %s, %s) ," // metadata_id, name, scope
242+
+ "INTERLEAVE IN PARENT %s ON DELETE CASCADE",
243243
METADATA_PROPS_TABLE,
244244
Tables.MetadataProps.METADATA_ID_FIELD,
245245
Tables.MetadataProps.NAMESPACE_FIELD,

0 commit comments

Comments
 (0)