diff --git a/.github/workflows/DB-CE.yml b/.github/workflows/DB-CE.yml index a6a941df54..bb294b5df3 100644 --- a/.github/workflows/DB-CE.yml +++ b/.github/workflows/DB-CE.yml @@ -27,7 +27,7 @@ on: description: "The database to run the test on" type: string required: false - default: '["FileSystem", "IoTDB12", "InfluxDB", "PostgreSQL", "Redis", "MongoDB", "Parquet", "MySQL"]' + default: '["IoTDB12"]' env: VERSION: 0.6.0-SNAPSHOT diff --git a/.github/workflows/standard-test-suite.yml b/.github/workflows/standard-test-suite.yml index a4f8234894..f5b7a22f3b 100644 --- a/.github/workflows/standard-test-suite.yml +++ b/.github/workflows/standard-test-suite.yml @@ -11,27 +11,27 @@ concurrency: cancel-in-progress: true jobs: - unit-test: - uses: ./.github/workflows/unit-test.yml - unit-mds: - uses: ./.github/workflows/unit-mds.yml - case-regression: - uses: ./.github/workflows/case-regression.yml - with: - metadata-matrix: '["zookeeper"]' - standalone-test: - uses: ./.github/workflows/standalone-test.yml - with: - metadata-matrix: '["zookeeper"]' - standalone-test-pushdown: - uses: ./.github/workflows/standalone-test-pushdown.yml - with: - metadata-matrix: '["zookeeper"]' +# unit-test: +# uses: ./.github/workflows/unit-test.yml +# unit-mds: +# uses: ./.github/workflows/unit-mds.yml +# case-regression: +# uses: ./.github/workflows/case-regression.yml +# with: +# metadata-matrix: '["zookeeper"]' +# standalone-test: +# uses: ./.github/workflows/standalone-test.yml +# with: +# metadata-matrix: '["zookeeper"]' +# standalone-test-pushdown: +# uses: ./.github/workflows/standalone-test-pushdown.yml +# with: +# metadata-matrix: '["zookeeper"]' db-ce: uses: ./.github/workflows/DB-CE.yml with: metadata-matrix: '["zookeeper"]' - remote-test: - uses: ./.github/workflows/remote-test.yml - with: - metadata-matrix: '["zookeeper"]' +# remote-test: +# uses: ./.github/workflows/remote-test.yml +# with: +# metadata-matrix: '["zookeeper"]' diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java index b2d83e47c7..dda2f745f7 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/IStorage.java @@ -26,10 +26,12 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.Insert; import cn.edu.tsinghua.iginx.engine.shared.operator.Project; import cn.edu.tsinghua.iginx.engine.shared.operator.Select; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval; import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.List; +import java.util.Set; public interface IStorage { /** 对非叠加分片查询数据 */ @@ -55,7 +57,7 @@ TaskExecuteResult executeProjectDummyWithSelect( TaskExecuteResult executeInsert(Insert insert, DataArea dataArea); /** 获取所有列信息 */ - List getColumns() throws PhysicalException; + List getColumns(Set pattern, TagFilter tagFilter) throws PhysicalException; /** 获取指定前缀的数据边界 */ Pair getBoundaryOfStorage(String prefix) throws PhysicalException; diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java index 074e5f1834..658e615973 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/storage/execute/StoragePhysicalTaskExecutor.java @@ -31,7 +31,6 @@ import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea; import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue; -import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils; import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask; import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask; import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask; @@ -316,7 +315,8 @@ public TaskExecuteResult executeGlobalTask(GlobalPhysicalTask task) { public TaskExecuteResult executeShowColumns(ShowColumns showColumns) { List storageList = metaManager.getStorageEngineList(); - Set columnSet = new HashSet<>(); + TreeSet columnSetAfterFilter = + new TreeSet<>(Comparator.comparing(Column::getPhysicalPath)); for (StorageEngineMeta storage : storageList) { long id = storage.getId(); Pair pair = storageManager.getStorage(id); @@ -324,56 +324,55 @@ public TaskExecuteResult executeShowColumns(ShowColumns showColumns) { continue; } try { - List columnList = pair.k.getColumns(); - // fix the schemaPrefix + Set patternSet = showColumns.getPathRegexSet(); + TagFilter tagFilter = showColumns.getTagFilter(); + + List columnList = pair.k.getColumns(patternSet, tagFilter); + + // fix the schemaPrefix and dataPrefix String schemaPrefix = storage.getSchemaPrefix(); - if (schemaPrefix != null) { + String dataPrefixRegex = storage.getDataPrefix() == null ? null : StringUtils.reformatPath(storage.getDataPrefix() + ".*"); + if (tagFilter == null) { for (Column column : columnList) { if (column.isDummy()) { - column.setPath(schemaPrefix + "." + column.getPath()); + if (dataPrefixRegex == null || Pattern.matches(dataPrefixRegex, column.getPath())) { + if (schemaPrefix != null) { + column.setPath(schemaPrefix + "." + column.getPath()); + boolean isMatch = patternSet.isEmpty(); + for (String pathRegex : patternSet) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) { + isMatch = true; + break; + } + } + if (isMatch) { + columnSetAfterFilter.add(column); + } + } else { + columnSetAfterFilter.add(column); + } + } + } else { + columnSetAfterFilter.add(column); } } + } else { + columnSetAfterFilter.addAll(columnList); } - columnSet.addAll(columnList); } catch (PhysicalException e) { return new TaskExecuteResult(e); } } - Set pathRegexSet = showColumns.getPathRegexSet(); - TagFilter tagFilter = showColumns.getTagFilter(); - - TreeSet tsSetAfterFilter = new TreeSet<>(Comparator.comparing(Column::getPhysicalPath)); - for (Column column : columnSet) { - boolean isTarget = true; - if (!pathRegexSet.isEmpty()) { - isTarget = false; - for (String pathRegex : pathRegexSet) { - if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) { - isTarget = true; - break; - } - } - } - if (tagFilter != null) { - if (!TagKVUtils.match(column.getTags(), tagFilter)) { - isTarget = false; - } - } - if (isTarget) { - tsSetAfterFilter.add(column); - } - } - int limit = showColumns.getLimit(); int offset = showColumns.getOffset(); if (limit == Integer.MAX_VALUE && offset == 0) { - return new TaskExecuteResult(Column.toRowStream(tsSetAfterFilter)); + return new TaskExecuteResult(Column.toRowStream(columnSetAfterFilter)); } else { // only need part of data. List tsList = new ArrayList<>(); - int cur = 0, size = tsSetAfterFilter.size(); - for (Iterator iter = tsSetAfterFilter.iterator(); iter.hasNext(); cur++) { + int cur = 0, size = columnSetAfterFilter.size(); + for (Iterator iter = columnSetAfterFilter.iterator(); iter.hasNext(); cur++) { if (cur >= size || cur - offset >= limit) { break; } diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java index 9c32e4fd9d..6f31d9e666 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/FileSystemStorage.java @@ -32,6 +32,7 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.filesystem.exec.Executor; import cn.edu.tsinghua.iginx.filesystem.exec.LocalExecutor; import cn.edu.tsinghua.iginx.filesystem.exec.RemoteExecutor; @@ -43,6 +44,7 @@ import cn.edu.tsinghua.iginx.utils.Pair; import java.util.Arrays; import java.util.List; +import java.util.Set; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,8 +151,9 @@ public TaskExecuteResult executeDelete(Delete delete, DataArea dataArea) { } @Override - public List getColumns() throws PhysicalException { - return executor.getColumnsOfStorageUnit(WILDCARD); + public List getColumns(Set pattern, TagFilter tagFilter) + throws PhysicalException { + return executor.getColumnsOfStorageUnit(WILDCARD, pattern, tagFilter); } @Override diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java index 5c59a40c50..b5b71fa32a 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/Executor.java @@ -11,6 +11,7 @@ import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.utils.Pair; import java.util.List; +import java.util.Set; public interface Executor { @@ -26,7 +27,8 @@ TaskExecuteResult executeProjectTask( TaskExecuteResult executeDeleteTask( List paths, List keyRanges, TagFilter tagFilter, String storageUnit); - List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException; + List getColumnsOfStorageUnit(String storageUnit, Set pattern, TagFilter tagFilter) + throws PhysicalException; Pair getBoundaryOfStorage(String dataPrefix) throws PhysicalException; diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java index e7510fb036..d7f2160a2a 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/LocalExecutor.java @@ -7,6 +7,7 @@ import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException; import cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream.EmptyRowStream; import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column; +import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils; import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult; import cn.edu.tsinghua.iginx.engine.shared.KeyRange; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; @@ -29,11 +30,14 @@ import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval; import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.utils.Pair; +import cn.edu.tsinghua.iginx.utils.StringUtils; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -308,36 +312,54 @@ public TaskExecuteResult executeDeleteTask( return new TaskExecuteResult(null, null); } + boolean isPathMatchPattern(String path, Set pattern) { + if (pattern.isEmpty()) { + return true; + } + for (String pathRegex : pattern) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) { + return true; + } + } + return false; + } + @Override - public List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException { + public List getColumnsOfStorageUnit( + String storageUnit, Set pattern, TagFilter tagFilter) throws PhysicalException { List columns = new ArrayList<>(); if (root != null) { File directory = new File(FilePathUtils.toIginxPath(root, storageUnit, null)); for (File file : fileSystemManager.getAllFiles(directory, false)) { FileMeta meta = fileSystemManager.getFileMeta(file); + String columnPath = + FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit); if (meta == null) { throw new PhysicalException( String.format( "encounter error when getting columns of storage unit because file meta %s is null", file.getAbsolutePath())); } - columns.add( - new Column( - FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit), - meta.getDataType(), - meta.getTags(), - false)); + // get columns by pattern + if (!isPathMatchPattern(columnPath, pattern)) { + continue; + } + // get columns by tag filter + if (tagFilter != null && !TagKVUtils.match(meta.getTags(), tagFilter)) { + continue; + } + columns.add(new Column(columnPath, meta.getDataType(), meta.getTags(), false)); } } - if (hasData && dummyRoot != null) { + // get columns from dummy storage unit + if (hasData && dummyRoot != null && tagFilter == null) { for (File file : fileSystemManager.getAllFiles(new File(realDummyRoot), true)) { - columns.add( - new Column( - FilePathUtils.convertAbsolutePathToPath( - dummyRoot, file.getAbsolutePath(), storageUnit), - DataType.BINARY, - null, - true)); + String dummyPath = + FilePathUtils.convertAbsolutePathToPath(dummyRoot, file.getAbsolutePath(), storageUnit); + if (!isPathMatchPattern(dummyPath, pattern)) { + continue; + } + columns.add(new Column(dummyPath, DataType.BINARY, null, true)); } } return columns; diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java index 32c9c31087..21e2dacd23 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/exec/RemoteExecutor.java @@ -28,10 +28,7 @@ import cn.edu.tsinghua.iginx.utils.Pair; import cn.edu.tsinghua.iginx.utils.ThriftConnPool; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -207,11 +204,13 @@ public TaskExecuteResult executeDeleteTask( } @Override - public List getColumnsOfStorageUnit(String storageUnit) throws PhysicalException { + public List getColumnsOfStorageUnit( + String storageUnit, Set pattern, TagFilter tagFilter) throws PhysicalException { try { TTransport transport = thriftConnPool.borrowTransport(); Client client = new Client(new TBinaryProtocol(transport)); - GetColumnsOfStorageUnitResp resp = client.getColumnsOfStorageUnit(storageUnit); + GetColumnsOfStorageUnitResp resp = + client.getColumnsOfStorageUnit(storageUnit, pattern, constructRawTagFilter(tagFilter)); thriftConnPool.returnTransport(transport); List columns = new ArrayList<>(); resp.getPathList() @@ -254,6 +253,9 @@ public void close() { private RawTagFilter constructRawTagFilter(TagFilter tagFilter) { RawTagFilter filter = null; + if (tagFilter == null) { + return null; + } switch (tagFilter.getType()) { case Base: { diff --git a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java index 73383ae362..fb44573bf6 100644 --- a/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java +++ b/dataSources/filesystem/src/main/java/cn/edu/tsinghua/iginx/filesystem/server/FileSystemWorker.java @@ -198,10 +198,12 @@ public Status executeDelete(DeleteReq req) throws TException { } @Override - public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(String storageUnit) throws TException { + public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit( + String storageUnit, Set pattern, RawTagFilter tagFilter) throws TException { List ret = new ArrayList<>(); try { - List columns = executor.getColumnsOfStorageUnit(storageUnit); + List columns = + executor.getColumnsOfStorageUnit(storageUnit, pattern, resolveRawTagFilter(tagFilter)); columns.forEach( column -> { FSColumn fsColumn = diff --git a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java index ece91da52a..ca623f630b 100644 --- a/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java +++ b/dataSources/influxdb/src/main/java/cn/edu/tsinghua/iginx/influxdb/InfluxDBStorage.java @@ -230,7 +230,7 @@ private String findExtremeRecordPath( } @Override - public List getColumns() { + public List getColumns(Set pattern, TagFilter tagFilter) { List timeseries = new ArrayList<>(); for (Bucket bucket : diff --git a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java index 74be91517d..dff0ff4b4d 100644 --- a/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java +++ b/dataSources/iotdb12/src/main/java/cn/edu/tsinghua/iginx/iotdb/IoTDBStorage.java @@ -192,13 +192,34 @@ public void release() throws PhysicalException { } @Override - public List getColumns() throws PhysicalException { + public List getColumns(Set pattern, TagFilter tagFilter) + throws PhysicalException { List columns = new ArrayList<>(); - getColumns2StorageUnit(columns, null); + getColumns2StorageUnit(columns, null, pattern, tagFilter); return columns; } - private void getColumns2StorageUnit(List columns, Map columns2StorageUnit) + boolean isPathMatchPattern(String path, Set pattern) { + LOGGER.info("LHZ-DEBUG: path: {}, pattern: {}", path, pattern); + if (pattern.isEmpty()) { + LOGGER.info("return true"); + return true; + } + for (String pathRegex : pattern) { + if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) { + LOGGER.info("match success"); + return true; + } + } + LOGGER.info("match failure"); + return false; + } + + private void getColumns2StorageUnit( + List columns, + Map columns2StorageUnit, + Set pattern, + TagFilter tagFilter) throws PhysicalException { try { SessionDataSetWrapper dataSet = sessionPool.executeQueryStatement(SHOW_TIMESERIES); @@ -215,12 +236,24 @@ private void getColumns2StorageUnit(List columns, Map co isDummy = false; } Pair> pair = TagKVUtils.splitFullName(path); + LOGGER.info("LHZ-DEBUG: path: {}, tag: {}", pair.k, pair.v); String dataTypeName = record.getFields().get(3).getStringValue(); String fragment = isDummy ? "" : record.getFields().get(2).getStringValue().substring(5); if (columns2StorageUnit != null) { columns2StorageUnit.put(pair.k, fragment); } + // get columns by pattern + if (!isPathMatchPattern(pair.k, pattern)) { + continue; + } + LOGGER.info("LHZ-DEBUG: path: {}, tag: {}", pair.k, pair.v); + // get columns by tag filter + if (tagFilter != null && !TagKVUtils.match(pair.v, tagFilter)) { + LOGGER.info("tagFIlter {}, tag{}", tagFilter, pair.v); + continue; + } + LOGGER.info("next"); switch (dataTypeName) { case "BOOLEAN": @@ -790,7 +823,7 @@ private List determineDeletePathList(String storageUnit, Delete delete) } else { List patterns = delete.getPatterns(); TagFilter tagFilter = delete.getTagFilter(); - List timeSeries = getColumns(); + List timeSeries = getColumns(new HashSet<>(), null); List pathList = new ArrayList<>(); for (Column ts : timeSeries) { @@ -859,7 +892,7 @@ private String getFilterString(Filter filter, String storageUnit) throws Physica if (filterStr.contains("*")) { List columns = new ArrayList<>(); Map columns2Fragment = new HashMap<>(); - getColumns2StorageUnit(columns, columns2Fragment); + getColumns2StorageUnit(columns, columns2Fragment, new HashSet<>(), null); filterStr = FilterTransformer.toString( expandFilterWildcard(filter.copy(), columns, columns2Fragment, storageUnit)); diff --git a/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java b/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java index 2dde3d2ca8..ca8b450d6b 100644 --- a/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java +++ b/dataSources/mongodb/src/main/java/cn/edu/tsinghua/iginx/mongodb/MongoDBStorage.java @@ -295,7 +295,7 @@ private static long getDuplicateKey(WriteError error) { } @Override - public List getColumns() { + public List getColumns(Set pattern, TagFilter tagFilter) { List columns = new ArrayList<>(); for (String dbName : getDatabaseNames(this.client)) { MongoDatabase db = this.client.getDatabase(dbName); diff --git a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java index d39b62d931..c2bc012f09 100644 --- a/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java +++ b/dataSources/parquet/src/main/java/cn/edu/tsinghua/iginx/parquet/ParquetStorage.java @@ -32,6 +32,7 @@ import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter; import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op; +import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter; import cn.edu.tsinghua.iginx.metadata.entity.*; import cn.edu.tsinghua.iginx.parquet.exec.Executor; import cn.edu.tsinghua.iginx.parquet.exec.LocalExecutor; @@ -44,6 +45,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,7 +176,8 @@ public TaskExecuteResult executeInsert(Insert insert, DataArea dataArea) { } @Override - public List getColumns() throws PhysicalException { + public List getColumns(Set pattern, TagFilter tagFilter) + throws PhysicalException { return executor.getColumnsOfStorageUnit("*"); } diff --git a/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java b/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java index 831a7b9c1c..37ecff2dc2 100644 --- a/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java +++ b/dataSources/redis/src/main/java/cn/edu/tsinghua/iginx/redis/RedisStorage.java @@ -467,7 +467,7 @@ public TaskExecuteResult executeInsert(Insert insert, DataArea dataArea) { } @Override - public List getColumns() { + public List getColumns(Set pattern, TagFilter tagFilter) { List ret = new ArrayList<>(); getIginxColumns(ret::add); getDummyColumns(ret::add); diff --git a/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java b/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java index 484814477d..1084b6e69c 100644 --- a/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java +++ b/dataSources/relational/src/main/java/cn/edu/tsinghua/iginx/relational/RelationalStorage.java @@ -335,7 +335,8 @@ private boolean filterContainsType(List types, Filter filter) { } @Override - public List getColumns() throws RelationalTaskExecuteFailureException { + public List getColumns(Set pattern, TagFilter tagFilter) + throws RelationalTaskExecuteFailureException { List columns = new ArrayList<>(); Map extraParams = meta.getExtraParams(); try { @@ -1833,7 +1834,7 @@ private void executeBatchInsert( private List> determineDeletedPaths( List paths, TagFilter tagFilter) { try { - List columns = getColumns(); + List columns = getColumns(null, null); List> deletedPaths = new ArrayList<>(); for (Column column : columns) { diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java index 1577c7587b..ce2df11604 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java @@ -3,8 +3,7 @@ import static cn.edu.tsinghua.iginx.integration.controller.Controller.SUPPORT_KEY; import static cn.edu.tsinghua.iginx.integration.expansion.constant.Constant.*; import static cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools.executeShellScript; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import cn.edu.tsinghua.iginx.exception.SessionException; import cn.edu.tsinghua.iginx.integration.controller.Controller; @@ -14,14 +13,17 @@ import cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools; import cn.edu.tsinghua.iginx.integration.tool.ConfLoader; import cn.edu.tsinghua.iginx.session.ClusterInfo; +import cn.edu.tsinghua.iginx.session.Column; import cn.edu.tsinghua.iginx.session.QueryDataSet; import cn.edu.tsinghua.iginx.session.Session; +import cn.edu.tsinghua.iginx.thrift.DataType; import cn.edu.tsinghua.iginx.thrift.RemovedStorageEngineInfo; import cn.edu.tsinghua.iginx.thrift.StorageEngineType; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -323,6 +325,16 @@ protected void queryExtendedKeyDummy() { statement = "select wf05.wt01.status, wf05.wt01.temperature from tm;"; SQLTestTools.executeAndContainValue( session, statement, READ_ONLY_PATH_LIST, READ_ONLY_EXTEND_VALUES_LIST); + + // test show columns + testShowColumns( + Arrays.asList( + new Column("mn.wf01.wt01.temperature", DataType.DOUBLE), + new Column("mn.wf01.wt01.status", DataType.LONG), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE), + new Column("tm.wf05.wt01.status", DataType.LONG), + new Column("tm.wf05.wt01.temperature", DataType.DOUBLE))); } protected void queryExtendedColDummy() { @@ -455,6 +467,25 @@ private void queryAllNewData() { SQLTestTools.executeAndCompare(session, statement, expect); } + private void testShowColumns(List expectColumns) { + try { + List columns = session.showColumns(); + LOGGER.info("show columns: {}", columns); + + // 对期望列表和实际列表中的Column对象按路径排序 + List sortedExpectPaths = + expectColumns.stream().map(Column::getPath).sorted().collect(Collectors.toList()); + + List sortedActualPaths = + columns.stream().map(Column::getPath).sorted().collect(Collectors.toList()); + + // 检查排序后的路径列表是否相同 + assertArrayEquals(sortedExpectPaths.toArray(), sortedActualPaths.toArray()); + } catch (SessionException e) { + LOGGER.error("show columns error: ", e); + } + } + private void testAddAndRemoveStorageEngineWithPrefix() { String dataPrefix1 = "nt.wf03"; String dataPrefix2 = "nt.wf04"; @@ -465,9 +496,31 @@ private void testAddAndRemoveStorageEngineWithPrefix() { List> valuesList = EXP_VALUES_LIST1; + testShowColumns( + Arrays.asList( + new Column("b.b.b", DataType.LONG), + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE), + new Column( + "zzzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzzzz", + DataType.LONG))); + // 添加不同 schemaPrefix,相同 dataPrefix addStorageEngine(expPort, true, true, dataPrefix1, schemaPrefix1, extraParams); + testShowColumns( + Arrays.asList( + new Column("b.b.b", DataType.LONG), + new Column("ln.wf02.status", DataType.BOOLEAN), + new Column("ln.wf02.version", DataType.BINARY), + new Column("nt.wf03.wt01.status2", DataType.LONG),new Column("p1.nt.wf03.wt01.status2", DataType.LONG), + new Column("nt.wf04.wt01.temperature", DataType.DOUBLE),new Column("p1.nt.wf04.wt01.temperature", DataType.DOUBLE), + new Column( + "zzzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzz.zzzzzzzzzzzzzzzzzzzzzzzzzzzzz", + DataType.LONG))); + // 添加节点 dataPrefix = dataPrefix1 && schemaPrefix = p1 后查询 String statement = "select status2 from *;"; List pathList = Arrays.asList("nt.wf03.wt01.status2", "p1.nt.wf03.wt01.status2"); diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/datasource/DataSourceIT.java similarity index 99% rename from test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java rename to test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/datasource/DataSourceIT.java index aede81abd9..a004fbdadd 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/datasource/DataSourceIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/filesystem/datasource/DataSourceIT.java @@ -1,4 +1,4 @@ -package cn.edu.tsinghua.iginx.integration.datasource; +package cn.edu.tsinghua.iginx.integration.expansion.filesystem.datasource; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; diff --git a/thrift/src/main/proto/filesystem.thrift b/thrift/src/main/proto/filesystem.thrift index 51a6a5f180..384d407ff6 100644 --- a/thrift/src/main/proto/filesystem.thrift +++ b/thrift/src/main/proto/filesystem.thrift @@ -157,7 +157,7 @@ service FileSystemService { Status executeDelete(1: DeleteReq req); - GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(1: string storageUnit); + GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(1: string storageUnit, 2: set patterns, 3: RawTagFilter tagFilter); GetBoundaryOfStorageResp getBoundaryOfStorage(1: string dataPrefix);