diff --git a/cdap-metadata-ext-spanner/pom.xml b/cdap-metadata-ext-spanner/pom.xml index 06af96381b57..7d3d5e9e43fe 100644 --- a/cdap-metadata-ext-spanner/pom.xml +++ b/cdap-metadata-ext-spanner/pom.xml @@ -67,6 +67,19 @@ junit test + + io.cdap.cdap + cdap-metadata-spi + ${project.version} + test-jar + test + + + io.cdap.cdap + cdap-common-unit-test + ${project.version} + test + diff --git a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/FormattedMetadata.java b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/FormattedMetadata.java index a45360a329ac..f46c6f0f9911 100644 --- a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/FormattedMetadata.java +++ b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/FormattedMetadata.java @@ -70,7 +70,8 @@ public static FormattedMetadata from(MetadataEntity entity, Metadata metadata) t } private FormattedMetadata(MetadataEntity entity, Metadata metadata) throws IOException { - this.namespace = entity.getValue("namespace"); + this.namespace = Optional.ofNullable(entity.getValue("namespace")) + .orElse("default"); this.type = entity.getType().toLowerCase(); this.name = Objects.requireNonNull(entity.getValue(entity.getType())).toLowerCase(); @@ -142,12 +143,6 @@ private Map> reformatProperties(Map pr String name = key.getName().toLowerCase(); String scope = key.getScope().name(); String value = entry.getValue().toLowerCase(); - - // If it's a schema key, reformat it. - if (MetadataConstants.SCHEMA_KEY.equals(name)) { - continue; - } - extracted.add(new Property(scope, name, value)); propertyNames.add(name); } diff --git a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/MetadataMutator.java b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/MetadataMutator.java index 0cd5d32f9c44..c66ce480c8f3 100644 --- a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/MetadataMutator.java +++ b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/MetadataMutator.java @@ -189,10 +189,6 @@ private static Set determineAffectedScopes(Metadata metadata) { private static ChangeRequest drop(MetadataEntity entity, VersionedMetadata before) { List mutations = new ArrayList<>(); String metadataId = toMetadataId(entity); - if (before.getVersion() == null) { - throw new IllegalArgumentException("existingVersion cannot be null when deleting a " - + "specific version of a metadata entity."); - } mutations.add(Mutation.delete(METADATA_TABLE, Key.of(metadataId))); return new ChangeRequest(mutations, new MetadataChange(entity, before.getMetadata(), Metadata.EMPTY)); } @@ -288,8 +284,6 @@ private static List createMetadataPropsTableMutations(MetadataEntity e for (FormattedMetadata.Property prop : formattedMetadata.getMetadataProps()) { propMutations.add(Mutation.newInsertOrUpdateBuilder(METADATA_PROPS_TABLE) .set(Tables.MetadataProps.METADATA_ID_FIELD).to(entityId) - .set(Tables.MetadataProps.NAMESPACE_FIELD).to(formattedMetadata.getNamespace()) - .set(Tables.MetadataProps.TYPE_FIELD).to(formattedMetadata.getType()) .set(Tables.MetadataProps.NESTED_SCOPE_FIELD).to(prop.getScope()) .set(Tables.MetadataProps.NESTED_NAME_FIELD).to(prop.getName()) .set(Tables.MetadataProps.NESTED_VALUE_FIELD).to(prop.getValue()) diff --git a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorage.java b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorage.java index f60568a861b9..8d583aada4ac 100644 --- a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorage.java +++ b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorage.java @@ -33,9 +33,12 @@ import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; +import com.google.cloud.spanner.Value; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; @@ -44,11 +47,14 @@ import com.google.gson.JsonParseException; import io.cdap.cdap.api.metadata.MetadataEntity; import io.cdap.cdap.api.metadata.MetadataScope; +import io.cdap.cdap.common.metadata.Cursor; import io.cdap.cdap.common.metadata.MetadataUtil; import io.cdap.cdap.spi.metadata.Metadata; import io.cdap.cdap.spi.metadata.MetadataChange; +import io.cdap.cdap.spi.metadata.MetadataConstants; import io.cdap.cdap.spi.metadata.MetadataKind; import io.cdap.cdap.spi.metadata.MetadataMutation; +import io.cdap.cdap.spi.metadata.MetadataRecord; import io.cdap.cdap.spi.metadata.MetadataStorage; import io.cdap.cdap.spi.metadata.MetadataStorageContext; import io.cdap.cdap.spi.metadata.MutationOptions; @@ -58,18 +64,24 @@ import io.cdap.cdap.spi.metadata.ScopedNameTypeAdapter; import io.cdap.cdap.spi.metadata.SearchRequest; import io.cdap.cdap.spi.metadata.SearchResponse; +import io.cdap.cdap.spi.metadata.Sorting; import io.cdap.cdap.spi.metadata.VersionedMetadata; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +113,17 @@ public class SpannerMetadataStorage implements MetadataStorage { private DatabaseClient dbClient; private DatabaseAdminClient adminClient; + // Define the wildcard characters for regex matching + private static final String SQL_WILDCARD_ANY_STRING = "*"; + private static final String SQL_WILDCARD_ANY_CHAR = "?"; + + private static final Map SORT_KEY_MAP = ImmutableMap.of( + "entity-name", Tables.Metadata.NAME_FIELD, + "creation-time", Tables.Metadata.CREATED_FIELD + ); + + private static final Pattern SPACE_SEPARATOR_PATTERN = Pattern.compile("\\s+"); + @Override public void initialize(MetadataStorageContext context) throws Exception { Map properties = context.getProperties(); @@ -180,12 +203,12 @@ private String getCreateMetadataTableDDLStatement() { + "%s STRING(MAX)," // system + "%s JSON," // metadata_column + "%s INT64 NOT NULL," // version - + "user_tokens TOKENLIST AS " // user_tokens list + + "%s TOKENLIST AS " // user_tokens list + "(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN," - + "system_tokens TOKENLIST AS " // system_tokens list + + "%s TOKENLIST AS " // system_tokens list + "(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN," - + "text_tokens TOKENLIST AS " // text_tokens list - + "(TOKENLIST_CONCAT([User_Tokens, System_Tokens])) HIDDEN," + + "%s TOKENLIST AS " // text_tokens list + + "(TOKENLIST_CONCAT([%s, %s])) HIDDEN," + ") PRIMARY KEY (%s) ", // metadata_id METADATA_TABLE, Tables.Metadata.METADATA_ID_FIELD, @@ -197,8 +220,13 @@ private String getCreateMetadataTableDDLStatement() { Tables.Metadata.SYSTEM_FIELD, Tables.Metadata.METADATA_COLUMN_FIELD, Tables.Metadata.VERSION, + Tables.Metadata.USER_TOKEN_FIELD, Tables.Metadata.USER_FIELD, + Tables.Metadata.SYSTEM_TOKEN_FIELD, Tables.Metadata.SYSTEM_FIELD, + Tables.Metadata.TEXT_TOKEN_FIELD, + Tables.Metadata.USER_TOKEN_FIELD, + Tables.Metadata.SYSTEM_TOKEN_FIELD, Tables.Metadata.METADATA_ID_FIELD ); } @@ -231,22 +259,19 @@ private String getCreateMetadataPropsTableDDLStatement() { return String.format( "CREATE TABLE IF NOT EXISTS %s (" + "%s STRING(MAX) NOT NULL," // metadata_id - + "%s STRING(MAX) NOT NULL," // namespace - + "%s STRING(MAX) NOT NULL," // entity_type + "%s STRING(MAX) NOT NULL," // name + "%s STRING(MAX)," // scope + "%s STRING(MAX)," // value - + "value_tokens TOKENLIST AS " // value_tokens list + + "%s TOKENLIST AS " // value_tokens list + "(TOKENIZE_SUBSTRING(%s, support_relative_search=>TRUE)) HIDDEN," + ") PRIMARY KEY (%s, %s, %s) ," // metadata_id, name, scope + "INTERLEAVE IN PARENT %s ON DELETE CASCADE", METADATA_PROPS_TABLE, Tables.MetadataProps.METADATA_ID_FIELD, - Tables.MetadataProps.NAMESPACE_FIELD, - Tables.MetadataProps.TYPE_FIELD, Tables.MetadataProps.NESTED_NAME_FIELD, Tables.MetadataProps.NESTED_SCOPE_FIELD, Tables.MetadataProps.NESTED_VALUE_FIELD, + Tables.MetadataProps.NESTED_VALUE_TOKEN_FIELD, Tables.MetadataProps.NESTED_VALUE_FIELD, Tables.MetadataProps.METADATA_ID_FIELD, Tables.MetadataProps.NESTED_NAME_FIELD, @@ -441,6 +466,8 @@ public List batch(List mutations, Mu changes.add(change); } return null; + + }); } catch (SpannerException e) { throw new IOException("Error applying batch mutations to Spanner", e); @@ -487,10 +514,435 @@ public static String toMetadataId(MetadataEntity entity) { } @Override - public SearchResponse search(SearchRequest request) throws IOException { - throw new IOException("NOT IMPLEMENTED"); + public SearchResponse search(SearchRequest request) { + Cursor cursor = Strings.isNullOrEmpty(request.getCursor()) ? null : Cursor.fromString(request.getCursor()); + return doSearch(request, cursor); + } + + /** + * Executes a metadata search query against Spanner, handling query building, + * parameter binding, and result mapping. + * + * @param request The {@link SearchRequest} containing search criteria. + * @param requestCursor An optional {@link Cursor} for pagination, providing the start point for the results. + * @return A {@link SearchResponse} containing the search results and a cursor for the next page, if any. + */ + private SearchResponse doSearch(SearchRequest request, + @Nullable Cursor requestCursor) { + try (ReadOnlyTransaction transaction = dbClient.readOnlyTransaction()) { + QueryBuildResult queryResult = buildQuery(request, requestCursor); + String sqlTemplate = queryResult.getSql(); + Map params = queryResult.getParams(); + List sortColumns = queryResult.getSortColumns(); + Statement.Builder statementBuilder = Statement.newBuilder(sqlTemplate); + params.forEach((key, value) -> statementBuilder.bind(key).to(value)); + Statement statement = statementBuilder.build(); + + LOG.info("Executing Spanner SQL Template: {} With Parameters: {}", statement.getSql(), + statement.getParameters()); + + ResultSet resultSet = transaction.executeQuery(statement); + List results = new ArrayList<>(); + String nextActualCursor = null; + while (resultSet.next()) { + results.add(mapResult(resultSet)); + nextActualCursor = createNextCursorKey(resultSet, sortColumns); + } + + LOG.info("Found {} results.", results.size()); + + return createSearchResponse(request, results, nextActualCursor); + } + } + + /** + * Main query builder that orchestrates calls to helper methods. + */ + private QueryBuildResult buildQuery(SearchRequest request, @Nullable Cursor requestCursor) { + StringBuilder sql = new StringBuilder("SELECT * FROM metadata"); + MetadataScope scope = request.getScope(); + Map params = new HashMap<>(); + SortDetailsResult sortDetails = getSortDetails(request); + List sortColumns = sortDetails.getColumns(); + List sortOrders = sortDetails.getOrders(); + List allSearchConditions = new ArrayList<>(); + Iterable terms = Splitter.on(SPACE_SEPARATOR_PATTERN) + .omitEmptyStrings().trimResults().split(request.getQuery()); + + if (request.getNamespaces() != null && !request.getNamespaces().isEmpty()) { + allSearchConditions.add("namespace IN UNNEST(@namespaces)"); + params.put("namespaces", Value.stringArray(request.getNamespaces())); + } + + if (request.getTypes() != null && !request.getTypes().isEmpty()) { + allSearchConditions.add("entity_type IN UNNEST(@types)"); + params.put("types", Value.stringArray(request.getTypes())); + } + + // Add standard filter conditions (namespaces, types, etc.) to the WHERE clause. + for (String searchTerm : terms) { + List termConditions = appendFilterConditions(searchTerm, params, scope); + if (!termConditions.isEmpty()) { + allSearchConditions.add("(" + String.join(" OR ", termConditions) + ")"); + } + } + + // Add the special WHERE condition for keyset pagination if a cursor exists. + appendCursorCondition(requestCursor, sortColumns, sortOrders, allSearchConditions, params); + if (!allSearchConditions.isEmpty()) { + sql.append(" WHERE ").append(String.join(" AND ", allSearchConditions)); + } + + // Add the ORDER BY clause. + List orderByClauses = new ArrayList<>(); + for (int i = 0; i < sortColumns.size(); i++) { + orderByClauses.add(sortColumns.get(i) + " " + sortOrders.get(i).name()); + } + sql.append(" ORDER BY ").append(String.join(", ", orderByClauses)); + + // Add the LIMIT clause with a parameter. + sql.append(" LIMIT @limit"); + params.put("limit", Value.int64(request.getLimit())); + + return new QueryBuildResult(sql.toString(), params, sortColumns); + } + + /** + * Determines the final list of columns and directions for the ORDER BY clause, + * ensuring a unique tie-breaker is always present. + * @returns A type-safe SortDetailsResult. + */ + private SortDetailsResult getSortDetails(SearchRequest request) { + List columns = new ArrayList<>(); + List orders = new ArrayList<>(); + + if (request.getSorting() != null) { + columns.add(mapSortKey(request.getSorting().getKey())); + orders.add(request.getSorting().getOrder()); + } else { + // Default sort order if none is provided in the request + columns.add(Tables.Metadata.NAME_FIELD); + orders.add(Sorting.Order.ASC); + } + + // VITAL: Add a unique tie-breaker to prevent inconsistent ordering and broken pagination. + String primaryKey = Tables.Metadata.METADATA_ID_FIELD; + if (!columns.contains(primaryKey)) { + columns.add(primaryKey); + orders.add(Sorting.Order.ASC); + } + + return new SortDetailsResult(columns, orders); + } + + /** + * Appends standard WHERE conditions and their parameters for filtering the search. + */ + private List appendFilterConditions(String term, Map params , MetadataScope scope) { + List conditions = new ArrayList<>(); + if (term.isEmpty() || term.equals("*")) { + return conditions; + } + + if (term.contains(":")) { + conditions.add(buildKeyValueSearchCondition(term, params)); + } else { + conditions.add(buildScopedSearchCondition(term, scope, params)); + } + + return conditions; + } + + /** + * Builds the condition for a key:value search (e.g., "tags:retail"). + * Now uses the precise SEARCH_SUBSTRING syntax for the value part. + * + * @param term The full query string, like "a:b" + * @param params The map to add bind parameters to. + * @return The parameterized SQL condition string. + */ + private String buildKeyValueSearchCondition(String term, Map params) { + String[] parts = term.split(MetadataConstants.KEYVALUE_SEPARATOR, 2); + String key = parts[0].trim(); + String value = parts[1].trim(); + + String keyParam = "propKey_" + params.size(); + params.put(keyParam, Value.string(key)); + if (isWildcardPattern(value)) { + String regexPattern = convertToRegexpPattern(value); + String valueParam = "propValueRegex_" + params.size(); + params.put(valueParam, Value.string(regexPattern)); + + return String.format( + "EXISTS (SELECT 1 FROM %s " + + "WHERE %s = %s.%s " + + "AND %s = @%s " + + "AND REGEXP_CONTAINS(%s, @%s))", + METADATA_PROPS_TABLE, + Tables.MetadataProps.METADATA_ID_FIELD, + METADATA_TABLE, + Tables.Metadata.METADATA_ID_FIELD, + Tables.MetadataProps.NESTED_NAME_FIELD, + keyParam, + Tables.MetadataProps.NESTED_VALUE_FIELD, + valueParam + ); + } else { + String valueParam = "propValueExact_" + params.size(); + params.put(valueParam, Value.string(value)); + return String.format( + "EXISTS (SELECT 1 FROM %s " + + "WHERE %s = %s.%s " + + "AND %s = @%s " + + "AND SEARCH_SUBSTRING(%s, @%s, relative_search_type=>'word_prefix'))", + METADATA_PROPS_TABLE, + Tables.MetadataProps.METADATA_ID_FIELD, + METADATA_TABLE, + Tables.Metadata.METADATA_ID_FIELD, + Tables.MetadataProps.NESTED_NAME_FIELD, + keyParam, + Tables.MetadataProps.NESTED_VALUE_TOKEN_FIELD, + valueParam + ); + } + } + + /** + * Checks if a given string contains SQL-style wildcard characters. + * + * @param s The string to check. + * @return true if '*' or '?' are present, false otherwise. + */ + private boolean isWildcardPattern(String s) { + return s.contains(SQL_WILDCARD_ANY_STRING) || s.contains(SQL_WILDCARD_ANY_CHAR); + } + + /** + * Converts a search pattern containing SQL-style wildcards into a format usable by regular expression matching. + * This means: + *
    + *
  • '*' (matches any sequence of characters) becomes '.*'
  • + *
  • '?' (matches any single character) becomes '.'
  • + *
+ * Other special characters in the pattern are treated as literal text. + * + * @param sqlWildcardPattern The input pattern (e.g., "la*si?"). + * @return The converted regular expression (e.g., "la.*si."). + */ + private String convertToRegexpPattern(String sqlWildcardPattern) { + if (sqlWildcardPattern == null) { + return null; + } + StringBuilder re2Pattern = new StringBuilder(); + for (char c : sqlWildcardPattern.toCharArray()) { + switch (c) { + case '*': + re2Pattern.append(".*"); + break; + case '?': + re2Pattern.append("."); + break; + default: + re2Pattern.append(c); + break; + } + } + return re2Pattern.toString(); + } + + /** + * Builds the condition for a simple term search based on the provided scope. + * + * @param searchTerm The term to search for. + * @param scope The scope from the search request (USER, SYSTEM, or null). + * @param params The map to add bind parameters to. + * @return The parameterized SQL condition string. + */ + private String buildScopedSearchCondition(String searchTerm, @Nullable MetadataScope scope, + Map params) { + String searchColumn; + + if (scope == MetadataScope.USER) { + searchColumn = Tables.Metadata.USER_TOKEN_FIELD; + } else if (scope == MetadataScope.SYSTEM) { + searchColumn = Tables.Metadata.SYSTEM_TOKEN_FIELD; + } else { + searchColumn = Tables.Metadata.TEXT_TOKEN_FIELD; + } + + String termParam = "searchTerm_" + params.size(); + params.put(termParam, Value.string(searchTerm)); + + return String.format( + "SEARCH_SUBSTRING(%s, @%s, relative_search_type=>'word_prefix')", + searchColumn, + termParam + ); + } + + /** + * Appends the complex WHERE condition for keyset pagination if a cursor is provided. + */ + private void appendCursorCondition(@Nullable Cursor requestCursor, List sortColumns, + List sortOrders, List conditions, + Map params) { + if (requestCursor == null || requestCursor.getActualCursor() == null) { + return; + } + String[] cursorValues = requestCursor.getActualCursor().split(",", -1); + if (cursorValues.length != sortColumns.size()) { + LOG.warn("Cursor values count ({}) does not match sort columns count ({}). Ignoring cursor.", + cursorValues.length, sortColumns.size()); + return; + } + + // Build the OR clauses for each level of sorting declaratively. + List combinedRowConditions = IntStream.range(0, sortColumns.size()) + .mapToObj(i -> { + // For each sort column, create a clause like: (colA = valA AND colB > valB) + List clauseParts = new ArrayList<>(); + + // Add equality conditions for all preceding sort columns (the "prefix") + IntStream.range(0, i) + .forEach(j -> clauseParts.add( + buildCursorSubCondition(sortColumns.get(j), "=", cursorValues[j], params))); + + // Add the boundary condition ('>' or '<') for the current sort column + String operator = (sortOrders.get(i) == Sorting.Order.ASC) ? ">" : "<"; + clauseParts.add( + buildCursorSubCondition(sortColumns.get(i), operator, cursorValues[i], params)); + + return "(" + String.join(" AND ", clauseParts) + ")"; + }) + .collect(Collectors.toList()); + + if (!combinedRowConditions.isEmpty()) { + conditions.add("(" + String.join(" OR ", combinedRowConditions) + ")"); + } + } + + /** + * Helper method to create a single parameterized SQL condition for the cursor. + * Example: "name = @cursor_param_name" or "created > @cursor_param_created" + */ + private String buildCursorSubCondition(String column, String operator, String value, Map params) { + String paramName = "cursor_param_" + column + "_" + params.size(); + params.put(paramName, Value.string(value)); + return String.format("%s %s @%s", column, operator, paramName); + } + + /** + * Creates the "actualCursor" part of the next cursor. + */ + private String createNextCursorKey(ResultSet resultSet, List sortColumns) { + String[] cursorValues = new String[sortColumns.size()]; + + for (int i = 0; i < sortColumns.size(); i++) { + String columnName = sortColumns.get(i); + if (columnName.equals("create_time")) { + cursorValues[i] = String.valueOf(resultSet.getLong(columnName)); + } else { + cursorValues[i] = resultSet.getString(columnName); + } + } + return String.join(",", cursorValues); + } + + /** + * Creates the final SearchResponse, packaging the next cursor string. + */ + private SearchResponse createSearchResponse(SearchRequest request, List results, + String nextActualCursor) { + String finalCursorString = (nextActualCursor != null) ? + getCursor(request, results, nextActualCursor).toString() : null; + return new SearchResponse(request, finalCursorString, request.getOffset(), + request.getLimit(), results.size(), results); + } + + /** + * Creates a pagination cursor for the next set of search results. + * + * @param request The original search request. + * @param results The list of metadata records returned in the current page. + * @param nextActualCursor The database-specific cursor string for the next page. + * @return A {@link Cursor} object representing the next page's starting point. + */ + private Cursor getCursor(SearchRequest request, List results, String nextActualCursor) { + int nextOffset = request.getOffset() + results.size(); + String sortingString = Optional.ofNullable(request.getSorting()) + .map(sorting -> sorting.getKey() + MetadataConstants.KEYVALUE_SEPARATOR + sorting.getOrder().name()) + .orElse(null); + + return new Cursor( + nextOffset, + request.getLimit(), + false, + request.getScope(), + request.getNamespaces(), + request.getTypes(), + sortingString, + nextActualCursor, + request.getQuery() + ); } + /** + * Maps a Spanner {@link ResultSet} row to a {@link MetadataRecord}. + * Extracts the metadata entity ID and JSON metadata from the result set. + * + * @param resultSet The Spanner result set, positioned at the current row. + * @return A {@link MetadataRecord} representing the current row's data. + */ + private MetadataRecord mapResult(ResultSet resultSet) { + String metadataId = resultSet.getString(Tables.Metadata.METADATA_ID_FIELD); + Struct row = resultSet.getCurrentRowAsStruct(); + String metadataJson = row.getJson(Tables.Metadata.METADATA_COLUMN_FIELD); + Metadata metadata = GSON.fromJson(metadataJson, Metadata.class); + MetadataEntity entity = toMetadataEntity(metadataId); + return new MetadataRecord(entity, metadata); + } + + /** + * Maps a user-provided sort key string to its corresponding Spanner column name. + * + * @param key The user-provided sort key (e.g., "name", "namespace", "entity_type"). + * @return The Spanner database column name for the given sort key. + * @throws IllegalArgumentException if the sort key is not supported. + */ + private static String mapSortKey(String key) { + String newKey = SORT_KEY_MAP.get(key); + if (newKey != null) { + return newKey; + } + + throw new IllegalArgumentException("Unsupported sort key: " + key); + } + + /** + * Translate a metadata id in the index into a metadata entity. + */ + private static MetadataEntity toMetadataEntity(String metadataId) { + int index = metadataId.indexOf(':'); + if (index < 0) { + throw new IllegalArgumentException( + "Metadata Id must be of the form 'type:k=v,...' but is " + metadataId); + } + String type = metadataId.substring(0, index); + MetadataEntity.Builder builder = MetadataEntity.builder(); + for (String part : metadataId.substring(index + 1).split(",")) { + String[] parts = part.split("=", 2); + if (parts[0].equals(type)) { + builder.appendAsType(parts[0], parts[1]); + } else { + builder.append(parts[0], parts[1]); + } + } + + // if it is a versioned entity then add the default version + return MetadataUtil.addVersionIfNeeded(builder.build()); + } + + @Override public synchronized void close() { if (spanner != null) { @@ -498,5 +950,49 @@ public synchronized void close() { spanner = null; } } + + // Define a class to encapsulate the results of buildQuery + static class QueryBuildResult { + private final String sql; + private final Map params; + private final List sortColumns; + + public QueryBuildResult(String sql, Map params, List sortColumns) { + this.sql = sql; + this.params = params; + this.sortColumns = sortColumns; + } + + public String getSql() { + return sql; + } + + public Map getParams() { + return params; + } + + public List getSortColumns() { + return sortColumns; + } + } + + // Define a class to encapsulate the results of getSortDetails + static class SortDetailsResult { + private final List columns; + private final List orders; + + public SortDetailsResult(List columns, List orders) { + this.columns = columns; + this.orders = orders; + } + + public List getColumns() { + return columns; + } + + public List getOrders() { + return orders; + } + } } diff --git a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/Tables.java b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/Tables.java index d1c6a88f199e..84a36c286ccd 100644 --- a/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/Tables.java +++ b/cdap-metadata-ext-spanner/src/main/java/io/cdap/cdap/metadata/spanner/Tables.java @@ -34,6 +34,10 @@ public static class Metadata { public static final String CREATED_FIELD = "create_time"; public static final String USER_FIELD = "user"; public static final String SYSTEM_FIELD = "system"; + public static final String USER_TOKEN_FIELD = "user_tokens"; + public static final String SYSTEM_TOKEN_FIELD = "system_tokens"; + public static final String TEXT_TOKEN_FIELD = "text_tokens"; + } /** @@ -42,10 +46,9 @@ public static class Metadata { */ public static class MetadataProps { public static final String METADATA_ID_FIELD = "metadata_id"; - public static final String NAMESPACE_FIELD = "namespace"; - public static final String TYPE_FIELD = "entity_type"; public static final String NESTED_NAME_FIELD = "name"; public static final String NESTED_SCOPE_FIELD = "scope"; public static final String NESTED_VALUE_FIELD = "value"; + public static final String NESTED_VALUE_TOKEN_FIELD = "value_tokens"; } } diff --git a/cdap-metadata-ext-spanner/src/test/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorageTest.java b/cdap-metadata-ext-spanner/src/test/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorageTest.java index 2765f8142301..e684deaa1b82 100644 --- a/cdap-metadata-ext-spanner/src/test/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorageTest.java +++ b/cdap-metadata-ext-spanner/src/test/java/io/cdap/cdap/metadata/spanner/SpannerMetadataStorageTest.java @@ -16,12 +16,6 @@ package io.cdap.cdap.metadata.spanner; -import static io.cdap.cdap.metadata.spanner.SpannerMetadataStorage.toMetadataId; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; @@ -32,30 +26,28 @@ import com.google.cloud.spanner.InstanceId; import com.google.cloud.spanner.InstanceInfo; import com.google.cloud.spanner.InstanceNotFoundException; -import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.ReadOnlyTransaction; -import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerOptions; -import com.google.cloud.spanner.Statement; -import io.cdap.cdap.api.metadata.MetadataEntity; +import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.metadata.MetadataScope; +import io.cdap.cdap.common.metadata.Cursor; import io.cdap.cdap.spi.metadata.Metadata; +import io.cdap.cdap.spi.metadata.MetadataKind; +import io.cdap.cdap.spi.metadata.MetadataStorage; import io.cdap.cdap.spi.metadata.MetadataStorageContext; +import io.cdap.cdap.spi.metadata.MetadataStorageTest; import io.cdap.cdap.spi.metadata.ScopedName; -import io.cdap.cdap.spi.metadata.VersionedMetadata; +import io.cdap.cdap.spi.metadata.ScopedNameOfKind; import java.io.IOException; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; + import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Assume; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -93,7 +85,7 @@ * {@link SpannerMetadataStorage} implementation is not yet feature-complete. Once all * required methods are implemented, this class will be updated to ensure full test coverage. */ -public class SpannerMetadataStorageTest { +public class SpannerMetadataStorageTest extends MetadataStorageTest { private static final String PROJECT_ID = "test-project"; private static final String INSTANCE_ID = "test-instance"; @@ -105,10 +97,6 @@ public class SpannerMetadataStorageTest { private static InstanceAdminClient adminClient; private static Spanner spanner; - // Metadata table names - private static final String METADATA_TABLE = "metadata"; - private static final String METADATA_PROPS_TABLE = "metadata_props"; - // Indicates whether the Spanner emulator is active, guiding cleanup decisions. private static boolean isEmulatorRunning; @@ -176,7 +164,7 @@ public static void cleanUp() { if (dbAdminClient != null) { try { dbAdminClient.dropDatabase(INSTANCE_ID, DATABASE_ID); - } catch (DatabaseNotFoundException e) { + } catch (DatabaseNotFoundException ignored) { } catch (Exception e) { throw new RuntimeException("Failed to drop database during @AfterClass cleanup", e); } @@ -185,7 +173,7 @@ public static void cleanUp() { if (adminClient != null) { try { adminClient.deleteInstance(INSTANCE_ID); - } catch (InstanceNotFoundException e) { + } catch (InstanceNotFoundException ignored) { } catch (Exception e) { throw new RuntimeException("Failed to delete instance during @AfterClass cleanup", e); } @@ -198,46 +186,6 @@ public static void cleanUp() { } } - @Before - public void beforeTest() throws IOException { - spannerMetadataStorage.createIndex(); - } - - /** - * Helper method to simulate initial metadata creation by directly inserting into Spanner. - * This is used because `create` and `writeToSpanner` in `SpannerMetadataStorage` are "NOT IMPLEMENTED". - * In a real scenario, these would be part of the `MetadataStorage` implementation. - */ - private void simulateInitialMetadata(MetadataEntity entity, Metadata metadata, long version) throws IOException { - try { - dbClient.readWriteTransaction().run(transaction -> { - long currentTime = System.currentTimeMillis(); - String metadataJson = SpannerMetadataStorage.GSON.toJson(metadata); - - Mutation mutation = Mutation.newInsertOrUpdateBuilder(METADATA_TABLE) - .set(Tables.Metadata.METADATA_ID_FIELD).to(toMetadataId(entity)) - .set(Tables.Metadata.NAMESPACE_FIELD).to(entity.getValue(MetadataEntity.NAMESPACE)) - .set(Tables.Metadata.TYPE_FIELD).to(entity.getType()) - .set(Tables.Metadata.NAME_FIELD).to(entity.getValue(entity.getType())) - .set(Tables.Metadata.CREATED_FIELD).to(currentTime) - .set(Tables.Metadata.USER_FIELD).to(metadata.getTags().stream() - .filter(s -> false).map(ScopedName::getName) - .collect(Collectors.joining(","))) - .set(Tables.Metadata.SYSTEM_FIELD).to(metadata.getTags().stream() - .filter(s -> false) - .map(ScopedName::getName) - .collect(Collectors.joining(","))) - .set(Tables.Metadata.METADATA_COLUMN_FIELD).to(metadataJson) - .set(Tables.Metadata.VERSION).to(version) - .build(); - transaction.buffer(mutation); - return null; // The callable must return something, null is fine for side effects - }); - } catch (Exception e) { - throw new IOException("Failed to setup initial metadata", e); - } - } - /** * Tests the `createIndex` method. * Purpose: Verify that the necessary tables (`metadata`, `metadata_props`) and search indexes @@ -245,107 +193,240 @@ private void simulateInitialMetadata(MetadataEntity entity, Metadata metadata, l * This test will explicitly call `createIndex` and then query the `INFORMATION_SCHEMA` using `dbClient` * to verify table and index existence. */ - @Test - public void testCreateIndex() { - try (ReadOnlyTransaction tx = dbClient.readOnlyTransaction()) { - Set tables = new HashSet<>(); - Statement combinedTableStatement = Statement.newBuilder( - "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " - + "WHERE TABLE_SCHEMA = '' AND TABLE_NAME IN (@metadataTable, @metadataPropsTable)") - .bind("metadataTable").to(METADATA_TABLE) - .bind("metadataPropsTable").to(METADATA_PROPS_TABLE) - .build(); - - try (ResultSet resultSet = tx.executeQuery(combinedTableStatement)) { - while (resultSet.next()) { - String tableName = resultSet.getString(0); - tables.add(tableName); - } - } - assertTrue("Metadata table should exist", tables.contains(METADATA_TABLE)); - assertTrue("Metadata properties table should exist", tables.contains(METADATA_PROPS_TABLE)); - - Set metadataTableIndexes = new HashSet<>(); - Set metadataPropsTableIndexes = new HashSet<>(); - Statement combinedIndexStatement = Statement.newBuilder( - "SELECT TABLE_NAME, INDEX_NAME FROM INFORMATION_SCHEMA.INDEXES " - + "WHERE TABLE_SCHEMA = '' AND TABLE_NAME IN (@metadataTable, @metadataPropsTable)") - .bind("metadataTable").to(METADATA_TABLE) - .bind("metadataPropsTable").to(METADATA_PROPS_TABLE) - .build(); - - try (ResultSet resultSet = tx.executeQuery(combinedIndexStatement)) { - while (resultSet.next()) { - String tableName = resultSet.getString("TABLE_NAME"); - String indexName = resultSet.getString("INDEX_NAME"); - - if (METADATA_TABLE.equals(tableName)) { - metadataTableIndexes.add(indexName); - } else if (METADATA_PROPS_TABLE.equals(tableName)) { - metadataPropsTableIndexes.add(indexName); - } - } - } - - assertTrue("UserNgramIndex should exist", metadataTableIndexes.contains("UserNgramIndex")); - assertTrue("SystemNgramIndex should exist", metadataTableIndexes.contains("SystemNgramIndex")); - assertTrue("TextNgramIndex should exist", metadataTableIndexes.contains("TextNgramIndex")); - assertTrue("ValueNgramIndex should exist", metadataPropsTableIndexes.contains("ValueNgramIndex")); - } + @BeforeClass + public static void testCreateIndex() throws IOException { + spannerMetadataStorage.createIndex(); } - - /** - * Tests that reading metadata for an existing entity retrieves the correct data and version. - */ @Test - public void testReadVersionedMetadata_existingEntity() throws IOException { - MetadataEntity existingEntity = MetadataEntity.builder() - .append(MetadataEntity.NAMESPACE, "default") - .appendAsType(MetadataEntity.DATASET, "my_dataset_for_read_test") - .build(); - - Metadata expectedMetadata = new Metadata( - Collections.singleton(new ScopedName(MetadataScope.USER, "read_tag")), - Collections.singletonMap(new ScopedName(MetadataScope.USER, "read_prop"), "read_value") - ); - long expectedVersion = 10L; - - // Pre-populate the database with test data - simulateInitialMetadata(existingEntity, expectedMetadata, expectedVersion); - - VersionedMetadata actualVersionedMetadata; - try (ReadOnlyTransaction readOnlyTx = dbClient.readOnlyTransaction()) { - actualVersionedMetadata = spannerMetadataStorage.readVersionedMetadata(existingEntity, readOnlyTx); - } - - assertNotNull("Returned VersionedMetadata should not be null", actualVersionedMetadata); - assertNotEquals("Returned VersionedMetadata should not be NONE", VersionedMetadata.NONE, - actualVersionedMetadata); - assertEquals("Metadata tags should match", expectedMetadata.getTags(), - actualVersionedMetadata.getMetadata().getTags()); - assertEquals("Metadata properties should match", expectedMetadata.getProperties(), - actualVersionedMetadata.getMetadata().getProperties()); - assertEquals("Version should match", expectedVersion, (long) actualVersionedMetadata.getVersion()); + public void testFiltering() { + ScopedName sys = new ScopedName(MetadataScope.SYSTEM, "s"); + ScopedName user = new ScopedName(MetadataScope.USER, "u"); + String sval = "S"; + String uval = "U"; + Metadata before = new Metadata(tags(sys, user), props(sys, sval, user, uval)); + + // test selection to remove + Assert.assertEquals(new Metadata(tags(sys), props(user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.NONE, + MetadataScope.NONE, + ImmutableSet.of(new ScopedNameOfKind(MetadataKind.TAG, user), + new ScopedNameOfKind(MetadataKind.PROPERTY, sys)))); + + // test selection is not affected by scopes or kinds + Assert.assertEquals(new Metadata(tags(sys), props(user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.ALL, + MetadataScope.ALL, + ImmutableSet.of(new ScopedNameOfKind(MetadataKind.TAG, user), + new ScopedNameOfKind(MetadataKind.PROPERTY, sys)))); + + // test selection to keep + Assert.assertEquals(new Metadata(tags(user), props(sys, sval)), + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.NONE, + MetadataScope.NONE, + ImmutableSet.of(new ScopedNameOfKind(MetadataKind.TAG, user), + new ScopedNameOfKind(MetadataKind.PROPERTY, sys)))); + + // test selection is not affected by scopes or kinds + Assert.assertEquals(new Metadata(tags(user), props(sys, sval)), + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.ALL, + MetadataScope.ALL, + ImmutableSet.of(new ScopedNameOfKind(MetadataKind.TAG, user), + new ScopedNameOfKind(MetadataKind.PROPERTY, sys)))); + + // test removing nothing + Assert.assertEquals(before, + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.NONE, + MetadataScope.NONE, + null)); + Assert.assertEquals(before, + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.NONE, + MetadataScope.ALL, + null)); + Assert.assertEquals(before, + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.ALL, + MetadataScope.NONE, + null)); + + // test keeping all + Assert.assertEquals(before, + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.ALL, + MetadataScope.ALL, + null)); + + // test removing all + Assert.assertEquals(Metadata.EMPTY, + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.ALL, + MetadataScope.ALL, + null)); + + // test keeping nothing + Assert.assertEquals(Metadata.EMPTY, + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.NONE, + MetadataScope.NONE, + null)); + // test keeping nothing + Assert.assertEquals(Metadata.EMPTY, + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.ALL, + MetadataScope.NONE, + null)); + // test keeping nothing + Assert.assertEquals(Metadata.EMPTY, + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.NONE, + MetadataScope.ALL, + null)); + + // test removing all SYSTEM + Assert.assertEquals(new Metadata(tags(user), props(user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.ALL, + Collections.singleton(MetadataScope.SYSTEM), + null)); + // test removing all USER + Assert.assertEquals(new Metadata(tags(sys), props(sys, sval)), + SpannerMetadataStorage.filterMetadata( + before, + false, + MetadataKind.ALL, + Collections.singleton(MetadataScope.USER), + null)); + // test keeping all SYSTEM + Assert.assertEquals(new Metadata(tags(sys), props(sys, sval)), + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.ALL, + Collections.singleton(MetadataScope.SYSTEM), + null)); + // test keeping all USER + Assert.assertEquals(new Metadata(tags(user), props(user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + true, + MetadataKind.ALL, + Collections.singleton(MetadataScope.USER), + null)); + + // test removing all tags + Assert.assertEquals(new Metadata(tags(), props(sys, sval, user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + false, + Collections.singleton(MetadataKind.TAG), + MetadataScope.ALL, + null)); + + // test removing all properties + Assert.assertEquals(new Metadata(tags(sys, user), props()), + SpannerMetadataStorage.filterMetadata( + before, + false, + Collections.singleton(MetadataKind.PROPERTY), + MetadataScope.ALL, + null)); + + // test keeping all tags + Assert.assertEquals(new Metadata(tags(sys, user), props()), + SpannerMetadataStorage.filterMetadata( + before, + true, + Collections.singleton(MetadataKind.TAG), + MetadataScope.ALL, + null)); + + // test keeping all properties + Assert.assertEquals(new Metadata(tags(), props(sys, sval, user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + true, + Collections.singleton(MetadataKind.PROPERTY), + MetadataScope.ALL, + null)); + + // test removing all tags in SYSTEM scope + Assert.assertEquals(new Metadata(tags(user), props(sys, sval, user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + false, + Collections.singleton(MetadataKind.TAG), + Collections.singleton(MetadataScope.SYSTEM), + null)); + + // test removing all properties in USER scope + Assert.assertEquals(new Metadata(tags(sys, user), props(sys, sval)), + SpannerMetadataStorage.filterMetadata( + before, + false, + Collections.singleton(MetadataKind.PROPERTY), + Collections.singleton(MetadataScope.USER), + null)); + + // test keeping all tags in SYSTEM scope + Assert.assertEquals(new Metadata(tags(sys), props()), + SpannerMetadataStorage.filterMetadata( + before, + true, + Collections.singleton(MetadataKind.TAG), + Collections.singleton(MetadataScope.SYSTEM), + null)); + + // test keeping all properties in USER scope + Assert.assertEquals(new Metadata(tags(), props(user, uval)), + SpannerMetadataStorage.filterMetadata( + before, + true, + Collections.singleton(MetadataKind.PROPERTY), + Collections.singleton(MetadataScope.USER), + null)); } - /** - * Tests that reading metadata for a non-existent entity correctly returns VersionedMetadata.NONE. - */ - @Test - public void testReadVersionedMetadata_nonExistentEntity() throws IOException { - MetadataEntity nonExistentEntity = MetadataEntity.builder() - .append(MetadataEntity.NAMESPACE, "default") - .appendAsType("application", "non_existent_app") - .build(); - - VersionedMetadata actualVersionedMetadata; - try (ReadOnlyTransaction readOnlyTx = dbClient.readOnlyTransaction()) { - actualVersionedMetadata = spannerMetadataStorage.readVersionedMetadata(nonExistentEntity, readOnlyTx); - } + @Override + protected MetadataStorage getMetadataStorage() { + return spannerMetadataStorage; + } - assertNotNull("Returned VersionedMetadata should not be null", actualVersionedMetadata); - assertEquals("Expected VersionedMetadata.NONE for a non-existent entity", - VersionedMetadata.NONE, actualVersionedMetadata); + @Override + protected void validateCursor(String cursor, int expectedOffset, int expectedPageSize) { + Cursor c = Cursor.fromString(cursor); + Assert.assertEquals(expectedOffset, c.getOffset()); + Assert.assertEquals(expectedPageSize, c.getLimit()); } private static final class MockMetadataStorageContext implements MetadataStorageContext {