diff --git a/.github/workflows/DB-CE.yml b/.github/workflows/DB-CE.yml index a6a941df54..f1a26934dc 100644 --- a/.github/workflows/DB-CE.yml +++ b/.github/workflows/DB-CE.yml @@ -27,7 +27,7 @@ on: description: "The database to run the test on" type: string required: false - default: '["FileSystem", "IoTDB12", "InfluxDB", "PostgreSQL", "Redis", "MongoDB", "Parquet", "MySQL"]' + default: '[ "Parquet"]' env: VERSION: 0.6.0-SNAPSHOT diff --git a/.github/workflows/standard-test-suite.yml b/.github/workflows/standard-test-suite.yml index a4f8234894..f5b7a22f3b 100644 --- a/.github/workflows/standard-test-suite.yml +++ b/.github/workflows/standard-test-suite.yml @@ -11,27 +11,27 @@ concurrency: cancel-in-progress: true jobs: - unit-test: - uses: ./.github/workflows/unit-test.yml - unit-mds: - uses: ./.github/workflows/unit-mds.yml - case-regression: - uses: ./.github/workflows/case-regression.yml - with: - metadata-matrix: '["zookeeper"]' - standalone-test: - uses: ./.github/workflows/standalone-test.yml - with: - metadata-matrix: '["zookeeper"]' - standalone-test-pushdown: - uses: ./.github/workflows/standalone-test-pushdown.yml - with: - metadata-matrix: '["zookeeper"]' +# unit-test: +# uses: ./.github/workflows/unit-test.yml +# unit-mds: +# uses: ./.github/workflows/unit-mds.yml +# case-regression: +# uses: ./.github/workflows/case-regression.yml +# with: +# metadata-matrix: '["zookeeper"]' +# standalone-test: +# uses: ./.github/workflows/standalone-test.yml +# with: +# metadata-matrix: '["zookeeper"]' +# standalone-test-pushdown: +# uses: ./.github/workflows/standalone-test-pushdown.yml +# with: +# metadata-matrix: '["zookeeper"]' db-ce: uses: ./.github/workflows/DB-CE.yml with: metadata-matrix: '["zookeeper"]' - remote-test: - uses: ./.github/workflows/remote-test.yml - with: - metadata-matrix: '["zookeeper"]' +# remote-test: +# uses: ./.github/workflows/remote-test.yml +# with: +# metadata-matrix: '["zookeeper"]' diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java index b2d83e47c7..dda2f745f7 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java @@ -26,10 +26,12 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; import cn.edu.tsinghua.iginx.engine.shared.operator.Project; import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval; import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.List; +import java.util.Set; public interface IStorage { /** 对非叠加分片查询数据 */ @@ -55,7 +57,7 @@ TaskExecuteResult executeProjectDummyWithSelect( TaskExecuteResult executeInsert(Insert insert, DataArea dataArea); /** 获取所有列信息 */ - List getColumns() throws PhysicalException; + List getColumns(Set pattern, TagFilter tagFilter) throws PhysicalException; /** 获取指定前缀的数据边界 */ Pair getBoundaryOfStorage(String prefix) throws PhysicalException; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java index 074e5f1834..b6fbea2da0 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java @@ -31,7 +31,6 @@ import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue; -import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils; import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask; import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask; import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask; @@ -316,7 +315,8 @@ public TaskExecuteResult executeGlobalTask(GlobalPhysicalTask task) { public TaskExecuteResult executeShowColumns(ShowColumns showColumns) { List storageList = metaManager.getStorageEngineList(); - Set columnSet = new HashSet<>(); + TreeSet columnSetAfterFilter = + new TreeSet<>(Comparator.comparing(Column::getPhysicalPath)); for (StorageEngineMeta storage : storageList) { long id = storage.getId(); Pair pair = storageManager.getStorage(id); @@ -324,56 +324,58 @@ public TaskExecuteResult executeShowColumns(ShowColumns showColumns) { continue; } try { - List columnList = pair.k.getColumns(); - // fix the schemaPrefix + Set patternSet = showColumns.getPathRegexSet(); + TagFilter tagFilter = showColumns.getTagFilter(); + + List columnList = pair.k.getColumns(patternSet, tagFilter); + + // fix the schemaPrefix and dataPrefix String schemaPrefix = storage.getSchemaPrefix(); - if (schemaPrefix != null) { + String dataPrefixRegex = + storage.getDataPrefix() == null + ? null + : StringUtils.reformatPath(storage.getDataPrefix() + ".*"); + if (tagFilter == null) { for (Column column : columnList) { if (column.isDummy()) { - column.setPath(schemaPrefix + "." + column.getPath()); + if (dataPrefixRegex == null || Pattern.matches(dataPrefixRegex, column.getPath())) { + if (schemaPrefix != null) { + column.setPath(schemaPrefix + "." + column.getPath()); + boolean isMatch = patternSet.isEmpty(); + for (String pathRegex : patternSet) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) { + isMatch = true; + break; + } + } + if (isMatch) { + columnSetAfterFilter.add(column); + } + } else { + columnSetAfterFilter.add(column); + } + } + } else { + columnSetAfterFilter.add(column); } } + } else { + columnSetAfterFilter.addAll(columnList); } - columnSet.addAll(columnList); } catch (PhysicalException e) { return new TaskExecuteResult(e); } } - Set pathRegexSet = showColumns.getPathRegexSet(); - TagFilter tagFilter = showColumns.getTagFilter(); - - TreeSet tsSetAfterFilter = new TreeSet<>(Comparator.comparing(Column::getPhysicalPath)); - for (Column column : columnSet) { - boolean isTarget = true; - if (!pathRegexSet.isEmpty()) { - isTarget = false; - for (String pathRegex : pathRegexSet) { - if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) { - isTarget = true; - break; - } - } - } - if (tagFilter != null) { - if (!TagKVUtils.match(column.getTags(), tagFilter)) { - isTarget = false; - } - } - if (isTarget) { - tsSetAfterFilter.add(column); - } - } - int limit = showColumns.getLimit(); int offset = showColumns.getOffset(); if (limit == Integer.MAX_VALUE && offset == 0) { - return new TaskExecuteResult(Column.toRowStream(tsSetAfterFilter)); + return new TaskExecuteResult(Column.toRowStream(columnSetAfterFilter)); } else { // only need part of data. List tsList = new ArrayList<>(); - int cur = 0, size = tsSetAfterFilter.size(); - for (Iterator iter = tsSetAfterFilter.iterator(); iter.hasNext(); cur++) { + int cur = 0, size = columnSetAfterFilter.size(); + for (Iterator iter = columnSetAfterFilter.iterator(); iter.hasNext(); cur++) { if (cur >= size || cur - offset >= limit) { break; } diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java index 9c32e4fd9d..6f31d9e666 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java @@ -32,6 +32,7 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.filesystem.exec.Executor; import cn.edu.tsinghua.iginx.filesystem.exec.LocalExecutor; import cn.edu.tsinghua.iginx.filesystem.exec.RemoteExecutor; @@ -43,6 +44,7 @@ import cn.edu.tsinghua.iginx.utils.Pair; import java.util.Arrays; import java.util.List; +import java.util.Set; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,8 +151,9 @@ public TaskExecuteResult executeDelete(Delete delete, DataArea dataArea) { } @Override - public List getColumns() throws PhysicalException { - return executor.getColumnsOfStorageUnit(WILDCARD); + public List getColumns(Set pattern, TagFilter tagFilter) + throws PhysicalException { + return executor.getColumnsOfStorageUnit(WILDCARD, pattern, tagFilter); } @Override diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java index 5c59a40c50..b5b71fa32a 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java @@ -11,6 +11,7 @@ import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.List; +import java.util.Set; public interface Executor { @@ -26,7 +27,8 @@ TaskExecuteResult executeProjectTask( TaskExecuteResult executeDeleteTask( List paths, List keyRanges, TagFilter tagFilter, String storageUnit); - List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException; + List getColumnsOfStorageUnit(String storageUnit, Set pattern, TagFilter tagFilter) + throws PhysicalException; Pair getBoundaryOfStorage(String dataPrefix) throws PhysicalException; diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java index e7510fb036..d7f2160a2a 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java @@ -7,6 +7,7 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream.EmptyRowStream; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; +import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils; import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; import cn.edu.tsinghua.iginx.engine.shared.KeyRange; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; @@ -29,11 +30,14 @@ import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; +import cn.edu.tsinghua.iginx.utils.StringUtils; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -308,36 +312,54 @@ public TaskExecuteResult executeDeleteTask( return new TaskExecuteResult(null, null); } + boolean isPathMatchPattern(String path, Set pattern) { + if (pattern.isEmpty()) { + return true; + } + for (String pathRegex : pattern) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) { + return true; + } + } + return false; + } + @Override - public List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException { + public List getColumnsOfStorageUnit( + String storageUnit, Set pattern, TagFilter tagFilter) throws PhysicalException { List columns = new ArrayList<>(); if (root != null) { File directory = new File(FilePathUtils.toIginxPath(root, storageUnit, null)); for (File file : fileSystemManager.getAllFiles(directory, false)) { FileMeta meta = fileSystemManager.getFileMeta(file); + String columnPath = + FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit); if (meta == null) { throw new PhysicalException( String.format( "encounter error when getting columns of storage unit because file meta %s is null", file.getAbsolutePath())); } - columns.add( - new Column( - FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit), - meta.getDataType(), - meta.getTags(), - false)); + // get columns by pattern + if (!isPathMatchPattern(columnPath, pattern)) { + continue; + } + // get columns by tag filter + if (tagFilter != null && !TagKVUtils.match(meta.getTags(), tagFilter)) { + continue; + } + columns.add(new Column(columnPath, meta.getDataType(), meta.getTags(), false)); } } - if (hasData && dummyRoot != null) { + // get columns from dummy storage unit + if (hasData && dummyRoot != null && tagFilter == null) { for (File file : fileSystemManager.getAllFiles(new File(realDummyRoot), true)) { - columns.add( - new Column( - FilePathUtils.convertAbsolutePathToPath( - dummyRoot, file.getAbsolutePath(), storageUnit), - DataType.BINARY, - null, - true)); + String dummyPath = + FilePathUtils.convertAbsolutePathToPath(dummyRoot, file.getAbsolutePath(), storageUnit); + if (!isPathMatchPattern(dummyPath, pattern)) { + continue; + } + columns.add(new Column(dummyPath, DataType.BINARY, null, true)); } } return columns; diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java index 32c9c31087..21e2dacd23 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java @@ -28,10 +28,7 @@ import cn.edu.tsinghua.iginx.utils.Pair; import cn.edu.tsinghua.iginx.utils.ThriftConnPool; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -207,11 +204,13 @@ public TaskExecuteResult executeDeleteTask( } @Override - public List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException { + public List getColumnsOfStorageUnit( + String storageUnit, Set pattern, TagFilter tagFilter) throws PhysicalException { try { TTransport transport = thriftConnPool.borrowTransport(); Client client = new Client(new TBinaryProtocol(transport)); - GetColumnsOfStorageUnitResp resp = client.getColumnsOfStorageUnit(storageUnit); + GetColumnsOfStorageUnitResp resp = + client.getColumnsOfStorageUnit(storageUnit, pattern, constructRawTagFilter(tagFilter)); thriftConnPool.returnTransport(transport); List columns = new ArrayList<>(); resp.getPathList() @@ -254,6 +253,9 @@ public void close() { private RawTagFilter constructRawTagFilter(TagFilter tagFilter) { RawTagFilter filter = null; + if (tagFilter == null) { + return null; + } switch (tagFilter.getType()) { case Base: { diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java index 73383ae362..fb44573bf6 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java @@ -198,10 +198,12 @@ public Status executeDelete(DeleteReq req) throws TException { } @Override - public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(String storageUnit) throws TException { + public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit( + String storageUnit, Set pattern, RawTagFilter tagFilter) throws TException { List ret = new ArrayList<>(); try { - List columns = executor.getColumnsOfStorageUnit(storageUnit); + List columns = + executor.getColumnsOfStorageUnit(storageUnit, pattern, resolveRawTagFilter(tagFilter)); columns.forEach( column -> { FSColumn fsColumn = diff --git a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java index ece91da52a..0f91947422 100644 --- a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java +++ b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java @@ -27,6 +27,7 @@ import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; +import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils; import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; import cn.edu.tsinghua.iginx.engine.shared.KeyRange; import cn.edu.tsinghua.iginx.engine.shared.data.write.BitmapView; @@ -230,7 +231,7 @@ private String findExtremeRecordPath( } @Override - public List getColumns() { + public List getColumns(Set pattern, TagFilter tagFilter) { List timeseries = new ArrayList<>(); for (Bucket bucket : @@ -242,7 +243,10 @@ public List getColumns() { boolean isDummy = meta.isHasData() && (meta.getDataPrefix() == null - || bucket.getName().startsWith(meta.getDataPrefix())); + || bucket + .getName() + .startsWith( + meta.getDataPrefix().substring(0, meta.getDataPrefix().indexOf(".")))); if (bucket.getType() == Bucket.TypeEnum.SYSTEM || (!isUnit && !isDummy)) { continue; } @@ -263,6 +267,17 @@ public List getColumns() { String val = (String) table.getRecords().get(0).getValues().get(key); tag.put(key, val); } + if (isDummy && !isUnit) { + path = bucket.getName() + "." + path; + } + // get columns by pattern + if (!isPathMatchPattern(path, pattern)) { + continue; + } + // get columns by tag filter + if (tagFilter != null && !TagKVUtils.match(tag, tagFilter)) { + continue; + } DataType dataType; switch (column.get(5).getDataType()) { // the index 1 is the type of the data @@ -289,16 +304,25 @@ public List getColumns() { LOGGER.warn("DataType don't match and default is String"); break; } - if (isDummy && !isUnit) { - path = bucket.getName() + "." + path; - } - timeseries.add(new Column(path, dataType, tag)); + timeseries.add(new Column(path, dataType, tag, isDummy)); } } return timeseries; } + boolean isPathMatchPattern(String path, Set pattern) { + if (pattern == null || pattern.isEmpty()) { + return true; + } + for (String pathRegex : pattern) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) { + return true; + } + } + return false; + } + @Override public void release() throws PhysicalException { client.close(); diff --git a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java index 74be91517d..092f4bcd6c 100644 --- a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java +++ b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java @@ -192,13 +192,30 @@ public void release() throws PhysicalException { } @Override - public List getColumns() throws PhysicalException { + public List getColumns(Set pattern, TagFilter tagFilter) + throws PhysicalException { List columns = new ArrayList<>(); - getColumns2StorageUnit(columns, null); + getColumns2StorageUnit(columns, null, pattern, tagFilter); return columns; } - private void getColumns2StorageUnit(List columns, Map columns2StorageUnit) + boolean isPathMatchPattern(String path, Set pattern) { + if (pattern.isEmpty()) { + return true; + } + for (String pathRegex : pattern) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) { + return true; + } + } + return false; + } + + private void getColumns2StorageUnit( + List columns, + Map columns2StorageUnit, + Set pattern, + TagFilter tagFilter) throws PhysicalException { try { SessionDataSetWrapper dataSet = sessionPool.executeQueryStatement(SHOW_TIMESERIES); @@ -221,6 +238,14 @@ private void getColumns2StorageUnit(List columns, Map co if (columns2StorageUnit != null) { columns2StorageUnit.put(pair.k, fragment); } + // get columns by pattern + if (!isPathMatchPattern(pair.k, pattern)) { + continue; + } + // get columns by tag filter + if (tagFilter != null && !TagKVUtils.match(pair.v, tagFilter)) { + continue; + } switch (dataTypeName) { case "BOOLEAN": @@ -790,7 +815,7 @@ private List determineDeletePathList(String storageUnit, Delete delete) } else { List patterns = delete.getPatterns(); TagFilter tagFilter = delete.getTagFilter(); - List timeSeries = getColumns(); + List timeSeries = getColumns(new HashSet<>(), null); List pathList = new ArrayList<>(); for (Column ts : timeSeries) { @@ -859,7 +884,7 @@ private String getFilterString(Filter filter, String storageUnit) throws Physica if (filterStr.contains("*")) { List columns = new ArrayList<>(); Map columns2Fragment = new HashMap<>(); - getColumns2StorageUnit(columns, columns2Fragment); + getColumns2StorageUnit(columns, columns2Fragment, new HashSet<>(), null); filterStr = FilterTransformer.toString( expandFilterWildcard(filter.copy(), columns, columns2Fragment, storageUnit)); diff --git a/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java b/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java index 2dde3d2ca8..4178ea2fab 100644 --- a/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java +++ b/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java @@ -295,7 +295,11 @@ private static long getDuplicateKey(WriteError error) { } @Override - public List getColumns() { + public List getColumns(Set pattern, TagFilter tagFilter) { + List patternList = new ArrayList<>(pattern); + if (patternList.isEmpty()) { + patternList.add("*"); + } List columns = new ArrayList<>(); for (String dbName : getDatabaseNames(this.client)) { MongoDatabase db = this.client.getDatabase(dbName); @@ -303,7 +307,9 @@ public List getColumns() { try { if (dbName.startsWith("unit")) { Field field = NameUtils.parseCollectionName(collectionName); - columns.add(new Column(field.getName(), field.getType(), field.getTags(), false)); + if (NameUtils.match(field.getName(), field.getTags(), patternList, tagFilter)) { + columns.add(new Column(field.getName(), field.getType(), field.getTags(), false)); + } continue; } } catch (Exception ignored) { @@ -315,7 +321,9 @@ public List getColumns() { Map sampleSchema = new SchemaSample(schemaSampleSize).query(collection, true); for (Map.Entry entry : sampleSchema.entrySet()) { - columns.add(new Column(entry.getKey(), entry.getValue(), null, true)); + if (NameUtils.match(entry.getKey(), Collections.emptyMap(), patternList, null)) { + columns.add(new Column(entry.getKey(), entry.getValue(), null, true)); + } } continue; } diff --git a/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/tools/NameUtils.java b/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/tools/NameUtils.java index 5871b82b20..14c9901eb9 100644 --- a/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/tools/NameUtils.java +++ b/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/tools/NameUtils.java @@ -63,19 +63,26 @@ public static List match( Iterable fieldList, Iterable patterns, TagFilter tagFilter) { List fields = new ArrayList<>(); for (Field field : fieldList) { - if (tagFilter != null && !TagKVUtils.match(field.getTags(), tagFilter)) { - continue; - } - for (String pattern : patterns) { - if (Pattern.matches(StringUtils.reformatPath(pattern), field.getName())) { - fields.add(field); - break; - } + if (match(field.getName(), field.getTags(), patterns, tagFilter)) { + fields.add(field); } } return fields; } + public static boolean match( + String columnName, Map tags, Iterable patterns, TagFilter tagFilter) { + if (tagFilter != null && !TagKVUtils.match(tags, tagFilter)) { + return false; + } + for (String pattern : patterns) { + if (Pattern.matches(StringUtils.reformatPath(pattern), columnName)) { + return true; + } + } + return false; + } + public static boolean isWildcard(String node) { return node.contains("*"); } diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java index d39b62d931..6bc2298e71 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java @@ -32,6 +32,7 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.metadata.entity.*; import cn.edu.tsinghua.iginx.parquet.exec.Executor; import cn.edu.tsinghua.iginx.parquet.exec.LocalExecutor; @@ -44,6 +45,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,8 +176,9 @@ public TaskExecuteResult executeInsert(Insert insert, DataArea dataArea) { } @Override - public List getColumns() throws PhysicalException { - return executor.getColumnsOfStorageUnit("*"); + public List getColumns(Set pattern, TagFilter tagFilter) + throws PhysicalException { + return executor.getColumnsOfStorageUnit("*", pattern, tagFilter); } @Override diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java index 16ed980af1..672f75c8b1 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/Executor.java @@ -27,6 +27,7 @@ import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.List; +import java.util.Set; public interface Executor { @@ -42,7 +43,8 @@ TaskExecuteResult executeProjectTask( TaskExecuteResult executeDeleteTask( List paths, List keyRanges, TagFilter tagFilter, String storageUnit); - List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException; + List getColumnsOfStorageUnit(String storageUnit, Set pattern, TagFilter tagFilter) + throws PhysicalException; Pair getBoundaryOfStorage(String dataPrefix) throws PhysicalException; diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java index 175811369a..697c2fec91 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/LocalExecutor.java @@ -49,6 +49,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -285,11 +286,16 @@ public TaskExecuteResult executeDeleteTask( } @Override - public List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException { + public List getColumnsOfStorageUnit( + String storageUnit, Set pattern, TagFilter tagFilter) throws PhysicalException { + List patternList = new ArrayList<>(pattern); + if (patternList.isEmpty()) { + patternList.add("*"); + } if (storageUnit.equals("*")) { List columns = new ArrayList<>(); for (Manager manager : getAllManagers()) { - columns.addAll(manager.getColumns()); + columns.addAll(manager.getColumns(patternList, tagFilter)); } return columns; } else { @@ -303,7 +309,7 @@ public Pair getBoundaryOfStorage(String dataPrefix List paths = new ArrayList<>(); long start = Long.MAX_VALUE, end = Long.MIN_VALUE; for (Manager manager : getAllManagers()) { - for (Column column : manager.getColumns()) { + for (Column column : manager.getColumns(Collections.singletonList("*"), null)) { paths.add(column.getPath()); } KeyInterval interval = manager.getKeyInterval(); diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java index 2bae22e064..6fdccafcc9 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/exec/RemoteExecutor.java @@ -29,28 +29,17 @@ import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView; import cn.edu.tsinghua.iginx.engine.shared.data.write.RawDataType; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; -import cn.edu.tsinghua.iginx.engine.shared.operator.tag.AndTagFilter; -import cn.edu.tsinghua.iginx.engine.shared.operator.tag.BasePreciseTagFilter; -import cn.edu.tsinghua.iginx.engine.shared.operator.tag.BaseTagFilter; -import cn.edu.tsinghua.iginx.engine.shared.operator.tag.OrTagFilter; -import cn.edu.tsinghua.iginx.engine.shared.operator.tag.PreciseTagFilter; -import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.*; import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval; import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.parquet.server.FilterTransformer; import cn.edu.tsinghua.iginx.parquet.thrift.*; import cn.edu.tsinghua.iginx.parquet.thrift.ParquetService.Client; +import cn.edu.tsinghua.iginx.parquet.thrift.TagFilterType; import cn.edu.tsinghua.iginx.thrift.DataType; -import cn.edu.tsinghua.iginx.utils.Bitmap; -import cn.edu.tsinghua.iginx.utils.ByteUtils; -import cn.edu.tsinghua.iginx.utils.DataTypeUtils; -import cn.edu.tsinghua.iginx.utils.Pair; -import cn.edu.tsinghua.iginx.utils.ThriftConnPool; +import cn.edu.tsinghua.iginx.utils.*; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TTransport; @@ -277,6 +266,9 @@ public TaskExecuteResult executeDeleteTask( } private RawTagFilter constructRawTagFilter(TagFilter tagFilter) { + if(tagFilter == null) { + return null; + } switch (tagFilter.getType()) { case Base: { @@ -335,11 +327,13 @@ private RawTagFilter constructRawTagFilter(TagFilter tagFilter) { } @Override - public List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException { + public List getColumnsOfStorageUnit( + String storageUnit, Set pattern, TagFilter tagFilter) throws PhysicalException { try { TTransport transport = thriftConnPool.borrowTransport(); Client client = new Client(new TBinaryProtocol(transport)); - GetColumnsOfStorageUnitResp resp = client.getColumnsOfStorageUnit(storageUnit); + GetColumnsOfStorageUnitResp resp = + client.getColumnsOfStorageUnit(storageUnit, pattern, constructRawTagFilter(tagFilter)); thriftConnPool.returnTransport(transport); List columnList = new ArrayList<>(); resp.getTsList() diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/Manager.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/Manager.java index e072f50b2b..3156376ec3 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/Manager.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/Manager.java @@ -36,7 +36,7 @@ RowStream project(List paths, TagFilter tagFilter, Filter filter) void delete(List paths, List keyRanges, TagFilter tagFilter) throws PhysicalException;; - List getColumns() throws PhysicalException; + List getColumns(List paths, TagFilter tagFilter) throws PhysicalException; KeyInterval getKeyInterval() throws PhysicalException; } diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/data/DataManager.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/data/DataManager.java index bbe4a16527..38b94363c7 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/data/DataManager.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/data/DataManager.java @@ -18,6 +18,7 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; +import cn.edu.tsinghua.iginx.engine.physical.storage.domain.ColumnKey; import cn.edu.tsinghua.iginx.engine.shared.KeyRange; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; import cn.edu.tsinghua.iginx.engine.shared.data.write.DataView; @@ -31,6 +32,7 @@ import cn.edu.tsinghua.iginx.parquet.db.util.iterator.Scanner; import cn.edu.tsinghua.iginx.parquet.manager.Manager; import cn.edu.tsinghua.iginx.parquet.manager.utils.RangeUtils; +import cn.edu.tsinghua.iginx.parquet.manager.utils.TagKVUtils; import cn.edu.tsinghua.iginx.parquet.util.Constants; import cn.edu.tsinghua.iginx.parquet.util.Shared; import cn.edu.tsinghua.iginx.parquet.util.exception.StorageException; @@ -126,12 +128,17 @@ public void delete(List paths, List keyRanges, TagFilter tagFi } @Override - public List getColumns() throws StorageException { + public List getColumns(List paths, TagFilter tagFilter) throws StorageException { List columns = new ArrayList<>(); for (Map.Entry entry : db.schema().entrySet()) { Map.Entry> pathWithTags = DataViewWrapper.parseFieldName(entry.getKey()); - columns.add(new Column(pathWithTags.getKey(), entry.getValue(), pathWithTags.getValue())); + DataType dataType = entry.getValue(); + ColumnKey columnKey = new ColumnKey(pathWithTags.getKey(), pathWithTags.getValue()); + if (!TagKVUtils.match(columnKey, paths, tagFilter)) { + continue; + } + columns.add(new Column(columnKey.getPath(), dataType, columnKey.getTags())); } return columns; } diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/DummyManager.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/DummyManager.java index 24afeaddee..325ad78353 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/DummyManager.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/DummyManager.java @@ -29,7 +29,6 @@ import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.parquet.manager.Manager; import cn.edu.tsinghua.iginx.parquet.manager.utils.TagKVUtils; -import cn.edu.tsinghua.iginx.utils.StringUtils; import java.io.IOException; import java.nio.file.Files; import java.nio.file.NoSuchFileException; @@ -103,21 +102,9 @@ private List determinePathList( Set paths, List patterns, TagFilter tagFilter) { List ret = new ArrayList<>(); for (String path : paths) { - for (String pattern : patterns) { - ColumnKey columnKey = TagKVUtils.splitFullName(path); - if (tagFilter == null) { - if (StringUtils.match(columnKey.getPath(), pattern)) { - ret.add(path); - break; - } - } else { - if (StringUtils.match(columnKey.getPath(), pattern) - && cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils.match( - columnKey.getTags(), tagFilter)) { - ret.add(path); - break; - } - } + ColumnKey columnKey = TagKVUtils.splitFullName(path); + if (TagKVUtils.match(columnKey, patterns, tagFilter)) { + ret.add(path); } } return ret; @@ -135,13 +122,16 @@ public void delete(List paths, List keyRanges, TagFilter tagFi } @Override - public List getColumns() throws PhysicalException { + public List getColumns(List paths, TagFilter tagFilter) throws PhysicalException { List columns = new ArrayList<>(); for (Path path : getFilePaths()) { try { List fields = new Loader(path).getHeader(); for (Field field : fields) { ColumnKey columnKey = TagKVUtils.splitFullName(field.getName()); + if (!TagKVUtils.match(columnKey, paths, tagFilter)) { + continue; + } Column column = new Column( prefix + "." + columnKey.getPath(), field.getType(), columnKey.getTags(), true); diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/EmptyManager.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/EmptyManager.java index cbc6daf1b6..fd3507740e 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/EmptyManager.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/dummy/EmptyManager.java @@ -71,7 +71,7 @@ public void delete(List paths, List keyRanges, TagFilter tagFi } @Override - public List getColumns() throws PhysicalException { + public List getColumns(List paths, TagFilter tagFilter) throws PhysicalException { return Collections.emptyList(); } diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/utils/TagKVUtils.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/utils/TagKVUtils.java index 36dca6da5d..b4bfc19ef4 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/utils/TagKVUtils.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/manager/utils/TagKVUtils.java @@ -2,10 +2,13 @@ import cn.edu.tsinghua.iginx.engine.physical.storage.domain.ColumnKey; import cn.edu.tsinghua.iginx.engine.physical.storage.utils.ColumnKeyTranslator; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.utils.Escaper; +import cn.edu.tsinghua.iginx.utils.StringUtils; import java.text.ParseException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class TagKVUtils { @@ -35,4 +38,21 @@ public static ColumnKey splitFullName(String fullName) { throw new IllegalStateException("Failed to parse identifier: " + fullName, e); } } + + public static boolean match(ColumnKey columnKey, List patterns, TagFilter tagFilter) { + for (String pattern : patterns) { + if (tagFilter == null) { + if (StringUtils.match(columnKey.getPath(), pattern)) { + return true; + } + } else { + if (StringUtils.match(columnKey.getPath(), pattern) + && cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils.match( + columnKey.getTags(), tagFilter)) { + return true; + } + } + } + return false; + } } diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java index 3201bbf694..5f5f829a68 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/server/ParquetWorker.java @@ -44,11 +44,7 @@ import cn.edu.tsinghua.iginx.utils.DataTypeUtils; import cn.edu.tsinghua.iginx.utils.Pair; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -283,10 +279,12 @@ private TagFilter resolveRawTagFilter(RawTagFilter rawTagFilter) { } @Override - public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(String storageUnit) throws TException { + public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit( + String storageUnit, Set pattern, RawTagFilter tagFilter) throws TException { List ret = new ArrayList<>(); try { - List tsList = executor.getColumnsOfStorageUnit(storageUnit); + List tsList = + executor.getColumnsOfStorageUnit(storageUnit, pattern, resolveRawTagFilter(tagFilter)); tsList.forEach( timeseries -> { TS ts = new TS(timeseries.getPath(), timeseries.getDataType().toString()); diff --git a/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java b/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java index 831a7b9c1c..abd7d16e78 100644 --- a/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java +++ b/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java @@ -467,50 +467,66 @@ public TaskExecuteResult executeInsert(Insert insert, DataArea dataArea) { } @Override - public List getColumns() { - List ret = new ArrayList<>(); - getIginxColumns(ret::add); - getDummyColumns(ret::add); - return ret; + public List getColumns(Set pattern, TagFilter tagFilter) { + try { + List ret = new ArrayList<>(); + getIginxColumns(ret::add, pattern, tagFilter); + getDummyColumns(ret::add, pattern); + return ret; + } catch (PhysicalException e) { + throw new IllegalStateException("get columns error", e); + } } - private void getIginxColumns(Consumer ret) { + private void getIginxColumns(Consumer ret, Set pattern, TagFilter tagFilter) + throws PhysicalException { + List patternList = new ArrayList<>(pattern); + if (patternList.isEmpty()) { + patternList.add("*"); + } + List allPaths = determinePathList("*", patternList, tagFilter); try (Jedis jedis = getDataConnection()) { - Map pathsAndTypes = jedis.hgetAll(KEY_DATA_TYPE); - pathsAndTypes.forEach( - (k, v) -> { - DataType type = DataTransformer.fromStringDataType(v); - Pair> pair = TagKVUtils.splitFullName(k); - ret.accept(new Column(pair.k, type, pair.v)); - }); + for (String path : allPaths) { + String typeStr = jedis.hget(KEY_DATA_TYPE, path); + if (typeStr == null) { + continue; + } + DataType type = DataTransformer.fromStringDataType(typeStr); + Pair> pair = TagKVUtils.splitFullName(path); + ret.accept(new Column(pair.k, type, pair.v)); + } } } - private void getDummyColumns(Consumer ret) { + private void getDummyColumns(Consumer ret, Set patterns) { + List patternList = new ArrayList<>(patterns); + if (patternList.isEmpty()) { + patternList.add("*"); + } try (Jedis jedis = getDummyConnection()) { - String pattern = STAR; - if (dataPrefix != null) { - pattern = dataPrefix + "." + pattern; - } - Set keys = jedis.keys(pattern); - for (String key : keys) { - String type = jedis.type(key); - switch (type) { - case "string": - case "list": - case "set": - case "zset": - ret.accept(new Column(key, DataType.BINARY, Collections.emptyMap(), true)); - break; - case "hash": - ret.accept(new Column(key + SUFFIX_KEY, DataType.BINARY, Collections.emptyMap(), true)); - ret.accept( - new Column(key + SUFFIX_VALUE, DataType.BINARY, Collections.emptyMap(), true)); - break; - case "none": - LOGGER.warn("key {} not exists", key); - default: - LOGGER.warn("unknown key type, type={}", type); + for (String pattern : patternList) { + String redisPattern = TagKVUtils.escapeRedisSpecialCharInPattern(pattern); + Set keys = jedis.keys(redisPattern); + for (String key : keys) { + String type = jedis.type(key); + switch (type) { + case "string": + case "list": + case "set": + case "zset": + ret.accept(new Column(key, DataType.BINARY, Collections.emptyMap(), true)); + break; + case "hash": + ret.accept( + new Column(key + SUFFIX_KEY, DataType.BINARY, Collections.emptyMap(), true)); + ret.accept( + new Column(key + SUFFIX_VALUE, DataType.BINARY, Collections.emptyMap(), true)); + break; + case "none": + LOGGER.warn("key {} not exists", key); + default: + LOGGER.warn("unknown key type, type={}", type); + } } } } diff --git a/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/tools/TagKVUtils.java b/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/tools/TagKVUtils.java index c42ffba5d3..d1ac9055e5 100644 --- a/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/tools/TagKVUtils.java +++ b/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/tools/TagKVUtils.java @@ -67,7 +67,11 @@ public static Pair> splitFullName(String fullName) { public static String getPattern(String name) { String escaped = COLUMN_KEY_TRANSLATOR.getEscaper().escape(name); - return escaped.replaceAll("[?^{}\\[\\]\\\\]", "\\\\$0"); + return escapeRedisSpecialCharInPattern(escaped); + } + + public static String escapeRedisSpecialCharInPattern(String name) { + return name.replaceAll("[?^{}\\[\\]\\\\]", "\\\\$0"); } public static boolean match(Map tags, TagFilter tagFilter) { diff --git a/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java b/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java index 484814477d..f730b3733b 100644 --- a/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java +++ b/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java @@ -335,7 +335,8 @@ private boolean filterContainsType(List types, Filter filter) { } @Override - public List getColumns() throws RelationalTaskExecuteFailureException { + public List getColumns(Set pattern, TagFilter tagFilter) + throws RelationalTaskExecuteFailureException { List columns = new ArrayList<>(); Map extraParams = meta.getExtraParams(); try { @@ -344,6 +345,9 @@ public List getColumns() throws RelationalTaskExecuteFailureException { && !databaseName.startsWith(DATABASE_PREFIX)) { continue; } + boolean isDummy = + extraParams.get("has_data") != null + && extraParams.get("has_data").equalsIgnoreCase("true"); List tables = getTables(databaseName, "%"); for (String tableName : tables) { @@ -356,18 +360,25 @@ public List getColumns() throws RelationalTaskExecuteFailureException { } Pair> nameAndTags = splitFullName(columnName); if (databaseName.startsWith(DATABASE_PREFIX)) { - columns.add( - new Column( - tableName + SEPARATOR + nameAndTags.k, - relationalMeta.getDataTypeTransformer().fromEngineType(typeName), - nameAndTags.v)); + columnName = tableName + SEPARATOR + nameAndTags.k; } else { - columns.add( - new Column( - databaseName + SEPARATOR + tableName + SEPARATOR + nameAndTags.k, - relationalMeta.getDataTypeTransformer().fromEngineType(typeName), - nameAndTags.v)); + columnName = databaseName + SEPARATOR + tableName + SEPARATOR + nameAndTags.k; } + + // get columns by pattern + if (!isPathMatchPattern(columnName, pattern)) { + continue; + } + // get columns by tag filter + if (tagFilter != null && !TagKVUtils.match(nameAndTags.v, tagFilter)) { + continue; + } + columns.add( + new Column( + columnName, + relationalMeta.getDataTypeTransformer().fromEngineType(typeName), + nameAndTags.v, + isDummy)); } } } @@ -377,6 +388,18 @@ public List getColumns() throws RelationalTaskExecuteFailureException { return columns; } + boolean isPathMatchPattern(String path, Set pattern) { + if (pattern == null || pattern.isEmpty()) { + return true; + } + for (String pathRegex : pattern) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) { + return true; + } + } + return false; + } + @Override public TaskExecuteResult executeProject(Project project, DataArea dataArea) { KeyInterval keyInterval = dataArea.getKeyInterval(); @@ -1833,7 +1856,7 @@ private void executeBatchInsert( private List> determineDeletedPaths( List paths, TagFilter tagFilter) { try { - List columns = getColumns(); + List columns = getColumns(null, null); List> deletedPaths = new ArrayList<>(); for (Column column : columns) { diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java index 1577c7587b..29bd00b176 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java @@ -1,32 +1,41 @@ package cn.edu.tsinghua.iginx.integration.expansion; -import static cn.edu.tsinghua.iginx.integration.controller.Controller.SUPPORT_KEY; -import static cn.edu.tsinghua.iginx.integration.expansion.constant.Constant.*; -import static cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools.executeShellScript; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import cn.edu.tsinghua.iginx.exception.SessionException; import cn.edu.tsinghua.iginx.integration.controller.Controller; import cn.edu.tsinghua.iginx.integration.expansion.filesystem.FileSystemCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.influxdb.InfluxDBCapacityExpansionIT; +import cn.edu.tsinghua.iginx.integration.expansion.mongodb.MongoDBCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.parquet.ParquetCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools; import cn.edu.tsinghua.iginx.integration.tool.ConfLoader; import cn.edu.tsinghua.iginx.session.ClusterInfo; +import cn.edu.tsinghua.iginx.session.Column; import cn.edu.tsinghua.iginx.session.QueryDataSet; import cn.edu.tsinghua.iginx.session.Session; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.thrift.RemovedStorageEngineInfo; import cn.edu.tsinghua.iginx.thrift.StorageEngineType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.stream.Collectors; -/** 原始节点相关的变量命名统一用 ori 扩容节点相关的变量命名统一用 exp */ +import static cn.edu.tsinghua.iginx.integration.controller.Controller.SUPPORT_KEY; +import static cn.edu.tsinghua.iginx.integration.expansion.constant.Constant.*; +import static cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools.executeShellScript; +import static org.junit.Assert.*; + +/** + * 原始节点相关的变量命名统一用 ori 扩容节点相关的变量命名统一用 exp + */ public abstract class BaseCapacityExpansionIT { private static final Logger LOGGER = LoggerFactory.getLogger(BaseCapacityExpansionIT.class); @@ -85,7 +94,8 @@ protected String addStorageEngine( if (IS_PARQUET_OR_FILE_SYSTEM) { statement.append(String.format(", dummy_dir:%s/", DBCE_PARQUET_FS_TEST_DIR)); statement.append(PORT_TO_ROOT.get(port)); - statement.append(String.format(", dir:%s/iginx_", DBCE_PARQUET_FS_TEST_DIR)); + statement.append( + String.format(", dir:%s/" + IGINX_DATA_PATH_PREFIX_NAME, DBCE_PARQUET_FS_TEST_DIR)); statement.append(PORT_TO_ROOT.get(port)); statement.append(", iginx_port:" + oriPortIginx); } @@ -340,7 +350,8 @@ protected void queryExtendedColDummy() { SQLTestTools.executeAndCompare(session, statement, new ArrayList<>(), new ArrayList<>()); } - protected void testQuerySpecialHistoryData() {} + protected void testQuerySpecialHistoryData() { + } private void testQueryHistoryDataOriHasData() { String statement = "select wf01.wt01.status, wf01.wt01.temperature from mn;"; @@ -455,6 +466,52 @@ private void queryAllNewData() { SQLTestTools.executeAndCompare(session, statement, expect); } + protected void testShowAllColumnsInExpansion(boolean before) { + if (before) { + testShowColumns( + Arrays.asList( + new Column("b.b.b", DataType.LONG), + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE), + new Column( + "zzzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzzzz", + DataType.LONG))); + } else { + testShowColumns( + Arrays.asList( + new Column("b.b.b", DataType.LONG), + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("p1.nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE), + new Column( + "zzzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzzzz", + DataType.LONG))); + } + } + + protected void testShowColumns(List expectColumns) { + try { + List columns = session.showColumns(); + LOGGER.info("show columns: {}", columns); + + // 对期望列表和实际列表中的Column对象按路径排序 + List sortedExpectPaths = + expectColumns.stream().map(Column::getPath).sorted().collect(Collectors.toList()); + + List sortedActualPaths = + columns.stream().map(Column::getPath).sorted().collect(Collectors.toList()); + + // 检查排序后的路径列表是否相同 + assertArrayEquals(sortedExpectPaths.toArray(), sortedActualPaths.toArray()); + } catch (SessionException e) { + LOGGER.error("show columns error: ", e); + } + } + private void testAddAndRemoveStorageEngineWithPrefix() { String dataPrefix1 = "nt.wf03"; String dataPrefix2 = "nt.wf04"; @@ -465,9 +522,13 @@ private void testAddAndRemoveStorageEngineWithPrefix() { List> valuesList = EXP_VALUES_LIST1; + testShowAllColumnsInExpansion(true); + // 添加不同 schemaPrefix,相同 dataPrefix addStorageEngine(expPort, true, true, dataPrefix1, schemaPrefix1, extraParams); + testShowAllColumnsInExpansion(false); + // 添加节点 dataPrefix = dataPrefix1 && schemaPrefix = p1 后查询 String statement = "select status2 from *;"; List pathList = Arrays.asList("nt.wf03.wt01.status2", "p1.nt.wf03.wt01.status2"); @@ -680,8 +741,8 @@ private void testSameKeyWarning() { QueryDataSet res = session.executeQuery(statement); if ((res.getWarningMsg() == null - || res.getWarningMsg().isEmpty() - || !res.getWarningMsg().contains("The query results contain overlapped keys.")) + || res.getWarningMsg().isEmpty() + || !res.getWarningMsg().contains("The query results contain overlapped keys.")) && SUPPORT_KEY.get(testConf.getStorageType())) { LOGGER.error("未抛出重叠key的警告"); fail(); @@ -746,7 +807,7 @@ protected void startStorageEngineWithIginx(int port, boolean hasData, boolean is hasData ? DBCE_PARQUET_FS_TEST_DIR + "/" + PORT_TO_ROOT.get(port) : DBCE_PARQUET_FS_TEST_DIR + "/" + INIT_PATH_LIST.get(0).replace(".", "/"), - DBCE_PARQUET_FS_TEST_DIR + "/iginx_" + PORT_TO_ROOT.get(port), + DBCE_PARQUET_FS_TEST_DIR + "/" + IGINX_DATA_PATH_PREFIX_NAME + PORT_TO_ROOT.get(port), String.valueOf(hasData), String.valueOf(isReadOnly), "core/target/iginx-core-*/conf/config.properties", diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/constant/Constant.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/constant/Constant.java index 7ef5adbc14..5031b2ef49 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/constant/Constant.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/constant/Constant.java @@ -13,6 +13,8 @@ public class Constant { public static final String EXP_PORT_NAME = "exp_port"; public static final String READ_ONLY_PORT_NAME = "read_only_port"; + public static final String IGINX_DATA_PATH_PREFIX_NAME = "iginx_"; + // port public static int oriPort = 6667; diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemCapacityExpansionIT.java index 657bf4de45..32868945f2 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemCapacityExpansionIT.java @@ -4,6 +4,9 @@ import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools; +import cn.edu.tsinghua.iginx.session.Column; +import cn.edu.tsinghua.iginx.thrift.DataType; +import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +25,26 @@ protected void testInvalidDummyParams( LOGGER.info("filesystem skips test for wrong dummy engine params."); } + @Override + protected void testShowAllColumnsInExpansion(boolean before) { + if (before) { + testShowColumns( + Arrays.asList( + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE))); + } else { + testShowColumns( + Arrays.asList( + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("p1.nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE))); + } + } + @Override public void testShowColumns() { String statement = "SHOW COLUMNS mn.*;"; diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemHistoryDataGenerator.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemHistoryDataGenerator.java index 0b3ebfea51..64a04d429e 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemHistoryDataGenerator.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/FileSystemHistoryDataGenerator.java @@ -43,14 +43,23 @@ public void writeHistoryData( @Override public void clearHistoryDataForGivenPort(int port) { - Path rootPath = Paths.get(PORT_TO_ROOT.get(port)); - if (!Files.exists(rootPath)) { - return; - } - try (Stream walk = Files.walk(rootPath)) { - walk.sorted(Comparator.reverseOrder()).forEach(this::deleteDirectoryStream); - } catch (IOException e) { - LOGGER.error("delete {} failure", rootPath); + Path rootPath; + for (int i = 0; i < 2; i++) { + if (i == 0) { + rootPath = Paths.get(PORT_TO_ROOT.get(port)); + } else { + rootPath = Paths.get(IGINX_DATA_PATH_PREFIX_NAME + PORT_TO_ROOT.get(port)); + } + LOGGER.info("clear path {}", rootPath.toFile().getAbsolutePath()); + if (!Files.exists(rootPath)) { + LOGGER.info("path {} does not exist", rootPath.toFile().getAbsolutePath()); + continue; + } + try (Stream walk = Files.walk(rootPath)) { + walk.sorted(Comparator.reverseOrder()).forEach(this::deleteDirectoryStream); + } catch (IOException e) { + LOGGER.error("delete {} failure", rootPath); + } } } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/datasource/DataSourceIT.java similarity index 99% rename from test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java rename to test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/datasource/DataSourceIT.java index aede81abd9..a004fbdadd 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/datasource/DataSourceIT.java @@ -1,4 +1,4 @@ -package cn.edu.tsinghua.iginx.integration.datasource; +package cn.edu.tsinghua.iginx.integration.expansion.filesystem.datasource; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java index 603db64f97..74d184a42a 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java @@ -8,6 +8,10 @@ import cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools; import cn.edu.tsinghua.iginx.integration.tool.ConfLoader; import cn.edu.tsinghua.iginx.integration.tool.DBConf; +import cn.edu.tsinghua.iginx.session.Column; +import cn.edu.tsinghua.iginx.thrift.DataType; +import java.util.ArrayList; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +27,33 @@ public MongoDBCapacityExpansionIT() { Constant.readOnlyPort = dbConf.getDBCEPortMap().get(Constant.READ_ONLY_PORT_NAME); } + @Override + protected void testShowAllColumnsInExpansion(boolean before) { + List columns = new ArrayList<>(); + columns.add(new Column("b.b._id", DataType.BINARY)); + columns.add(new Column("nt.wf03._id", DataType.BINARY)); + columns.add(new Column("nt.wf04._id", DataType.BINARY)); + columns.add( + new Column( + "zzzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzz._id", DataType.BINARY)); + columns.add(new Column("b.b.b", DataType.LONG)); + columns.add(new Column("ln.wf02.status", DataType.BOOLEAN)); + columns.add(new Column("ln.wf02.version", DataType.BINARY)); + columns.add(new Column("nt.wf03.wt01.status2", DataType.LONG)); + columns.add(new Column("nt.wf04.wt01.temperature", DataType.DOUBLE)); + columns.add( + new Column( + "zzzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzzzz", + DataType.LONG)); + + if (!before) { + columns.add(new Column("p1.nt.wf03._id", DataType.BINARY)); + columns.add(new Column("p1.nt.wf03.wt01.status2", DataType.LONG)); + } + + testShowColumns(columns); + } + @Override protected void testQuerySpecialHistoryData() { testProject(); diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetCapacityExpansionIT.java index 178847bcd6..f8b739a08f 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetCapacityExpansionIT.java @@ -3,6 +3,9 @@ import static cn.edu.tsinghua.iginx.thrift.StorageEngineType.parquet; import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT; +import cn.edu.tsinghua.iginx.session.Column; +import cn.edu.tsinghua.iginx.thrift.DataType; +import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,4 +23,24 @@ protected void testInvalidDummyParams( int port, boolean hasData, boolean isReadOnly, String dataPrefix, String schemaPrefix) { LOGGER.info("parquet skips test for wrong dummy engine params."); } + + @Override + protected void testShowAllColumnsInExpansion(boolean before) { + if (before) { + testShowColumns( + Arrays.asList( + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE))); + } else { + testShowColumns( + Arrays.asList( + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("p1.nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE))); + } + } } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetHistoryDataGenerator.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetHistoryDataGenerator.java index fb8edc8443..64990f94f4 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetHistoryDataGenerator.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/parquet/ParquetHistoryDataGenerator.java @@ -11,10 +11,7 @@ import java.io.IOException; import java.nio.file.*; import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.List; -import java.util.SortedMap; -import java.util.TreeMap; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nonnull; @@ -148,17 +145,24 @@ public void clearHistoryDataForGivenPort(int port) { "delete {}/{} error: does not exist or is not a file.", dir, file.getAbsoluteFile()); } + List pathList = Arrays.asList( + IT_DATA_DIR, + IGINX_DATA_PATH_PREFIX_NAME + PARQUET_PARAMS.get(port).get(0)); // delete the normal IT data - dir = DBCE_PARQUET_FS_TEST_DIR + System.getProperty("file.separator") + IT_DATA_DIR; - parquetPath = Paths.get("../" + dir); - - try { - Files.walkFileTree(parquetPath, new DeleteFileVisitor()); - } catch (NoSuchFileException e) { - LOGGER.warn( - "no such file or directory: {}", new File(parquetPath.toString()).getAbsoluteFile()); - } catch (IOException e) { - LOGGER.warn("delete {} error: ", new File(parquetPath.toString()).getAbsoluteFile(), e); + for(String path : pathList) { + Path dataPath = Paths.get(path); + if (Files.exists(dataPath)) { + try { + Files.walkFileTree(dataPath, new DeleteFileVisitor()); + } catch (NoSuchFileException e) { + LOGGER.warn( + "no such file or directory: {}", new File(dataPath.toString()).getAbsoluteFile()); + } catch (IOException e) { + LOGGER.warn("delete {} error: ", new File(dataPath.toString()).getAbsoluteFile(), e); + } + } else { + LOGGER.warn("delete {} error: does not exist.", new File(dataPath.toString()).getAbsoluteFile()); + } } } diff --git a/thrift/src/main/proto/filesystem.thrift b/thrift/src/main/proto/filesystem.thrift index 51a6a5f180..384d407ff6 100644 --- a/thrift/src/main/proto/filesystem.thrift +++ b/thrift/src/main/proto/filesystem.thrift @@ -157,7 +157,7 @@ service FileSystemService { Status executeDelete(1: DeleteReq req); - GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(1: string storageUnit); + GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(1: string storageUnit, 2: set patterns, 3: RawTagFilter tagFilter); GetBoundaryOfStorageResp getBoundaryOfStorage(1: string dataPrefix); diff --git a/thrift/src/main/proto/parquet.thrift b/thrift/src/main/proto/parquet.thrift index da88565a8f..f8d1818f43 100644 --- a/thrift/src/main/proto/parquet.thrift +++ b/thrift/src/main/proto/parquet.thrift @@ -155,7 +155,7 @@ service ParquetService { Status executeDelete(1: DeleteReq req); - GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(1: string storageUnit); + GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(1: string storageUnit, 2: set patterns, 3: RawTagFilter tagFilter); GetStorageBoundaryResp getBoundaryOfStorage(1: string dataPrefix);