Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
a4ae3d1
fix show columns
RemHero Jun 4, 2024
a406f2f
test
RemHero Jun 4, 2024
918a760
test
RemHero Jun 4, 2024
e083c74
test
RemHero Jun 5, 2024
4a4c798
fix pom
RemHero Jun 5, 2024
621cf38
add interface
RemHero Jun 6, 2024
f065562
fix
RemHero Jun 6, 2024
e16e2ae
fix
RemHero Jun 6, 2024
aa69181
format
RemHero Jun 6, 2024
4c73402
add test
RemHero Jun 7, 2024
7acf6a8
remove test
RemHero Jun 7, 2024
aa3b103
feat(redis): show columns with pattern and tagFilter
aqni Jun 7, 2024
4c08d43
feat(mongodb): show columns with pattern and tagFilter
aqni Jun 7, 2024
2dc19cb
feat(parquet): show columns with pattern and tagFilter
aqni Jun 7, 2024
1d23dfc
Merge pull request #152 from aqni/feat/redis/show_columns_prefix_filt…
RemHero Jun 7, 2024
9360aae
Merge pull request #153 from aqni/feat/mongodb/show_columns_prefix_fi…
RemHero Jun 7, 2024
23ae9f6
Merge pull request #154 from aqni/feat/parquet/show_columns_prefix_fi…
RemHero Jun 7, 2024
f0b7fc5
influxdb get columns by pattern and tag
shinyano Jun 8, 2024
fe7a188
Merge pull request #156 from shinyano/fix/get-columns
RemHero Jun 8, 2024
0a0d7b5
relational get columns by pattern and tag
shinyano Jun 8, 2024
7c5e8c9
Merge pull request #157 from shinyano/fix/get-columns
RemHero Jun 8, 2024
0e368fb
fix influxdb & relational
shinyano Jun 8, 2024
d104944
Merge pull request #158 from shinyano/fix/get-columns
RemHero Jun 10, 2024
4836912
add test
RemHero Jun 10, 2024
9122833
fix mongo
aqni Jun 8, 2024
4cad9c4
fix mongodb
aqni Jun 11, 2024
cae24f7
fix
aqni Jun 8, 2024
48ca3fd
test parquet
aqni Jun 11, 2024
366cd86
fix parquet
aqni Jun 11, 2024
54562b3
Revert "test parquet"
aqni Jun 11, 2024
2eb9735
Merge pull request #160 from aqni/feat/mongodb/show_columns_prefix_fi…
RemHero Jun 11, 2024
fb01b6b
Merge pull request #161 from aqni/feat/parquet/show_columns_prefix_fi…
RemHero Jun 11, 2024
55d84e2
format
RemHero Jun 11, 2024
f23bd5d
fix parquet
RemHero Jun 11, 2024
e258635
fix parquet
RemHero Jun 11, 2024
39e11a7
add test
RemHero Jun 11, 2024
155f95b
test
RemHero Jun 11, 2024
039a7b4
test
RemHero Jun 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/DB-CE.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ on:
description: "The database to run the test on"
type: string
required: false
default: '["FileSystem", "IoTDB12", "InfluxDB", "PostgreSQL", "Redis", "MongoDB", "Parquet", "MySQL"]'
default: '[ "Parquet"]'

env:
VERSION: 0.6.0-SNAPSHOT
Expand Down
40 changes: 20 additions & 20 deletions .github/workflows/standard-test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,27 @@ concurrency:
cancel-in-progress: true

jobs:
unit-test:
uses: ./.github/workflows/unit-test.yml
unit-mds:
uses: ./.github/workflows/unit-mds.yml
case-regression:
uses: ./.github/workflows/case-regression.yml
with:
metadata-matrix: '["zookeeper"]'
standalone-test:
uses: ./.github/workflows/standalone-test.yml
with:
metadata-matrix: '["zookeeper"]'
standalone-test-pushdown:
uses: ./.github/workflows/standalone-test-pushdown.yml
with:
metadata-matrix: '["zookeeper"]'
# unit-test:
# uses: ./.github/workflows/unit-test.yml
# unit-mds:
# uses: ./.github/workflows/unit-mds.yml
# case-regression:
# uses: ./.github/workflows/case-regression.yml
# with:
# metadata-matrix: '["zookeeper"]'
# standalone-test:
# uses: ./.github/workflows/standalone-test.yml
# with:
# metadata-matrix: '["zookeeper"]'
# standalone-test-pushdown:
# uses: ./.github/workflows/standalone-test-pushdown.yml
# with:
# metadata-matrix: '["zookeeper"]'
db-ce:
uses: ./.github/workflows/DB-CE.yml
with:
metadata-matrix: '["zookeeper"]'
remote-test:
uses: ./.github/workflows/remote-test.yml
with:
metadata-matrix: '["zookeeper"]'
# remote-test:
# uses: ./.github/workflows/remote-test.yml
# with:
# metadata-matrix: '["zookeeper"]'
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval;
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface IStorage {
/** 对非叠加分片查询数据 */
Expand All @@ -55,7 +57,7 @@ TaskExecuteResult executeProjectDummyWithSelect(
TaskExecuteResult executeInsert(Insert insert, DataArea dataArea);

/** 获取所有列信息 */
List<Column> getColumns() throws PhysicalException;
List<Column> getColumns(Set<String> pattern, TagFilter tagFilter) throws PhysicalException;

/** 获取指定前缀的数据边界 */
Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String prefix) throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
Expand Down Expand Up @@ -316,64 +315,67 @@ public TaskExecuteResult executeGlobalTask(GlobalPhysicalTask task) {

public TaskExecuteResult executeShowColumns(ShowColumns showColumns) {
List<StorageEngineMeta> storageList = metaManager.getStorageEngineList();
Set<Column> columnSet = new HashSet<>();
TreeSet<Column> columnSetAfterFilter =
new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (StorageEngineMeta storage : storageList) {
long id = storage.getId();
Pair<IStorage, ThreadPoolExecutor> pair = storageManager.getStorage(id);
if (pair == null) {
continue;
}
try {
List<Column> columnList = pair.k.getColumns();
// fix the schemaPrefix
Set<String> patternSet = showColumns.getPathRegexSet();
TagFilter tagFilter = showColumns.getTagFilter();

List<Column> columnList = pair.k.getColumns(patternSet, tagFilter);

// fix the schemaPrefix and dataPrefix
String schemaPrefix = storage.getSchemaPrefix();
if (schemaPrefix != null) {
String dataPrefixRegex =
storage.getDataPrefix() == null
? null
: StringUtils.reformatPath(storage.getDataPrefix() + ".*");
if (tagFilter == null) {
for (Column column : columnList) {
if (column.isDummy()) {
column.setPath(schemaPrefix + "." + column.getPath());
if (dataPrefixRegex == null || Pattern.matches(dataPrefixRegex, column.getPath())) {
if (schemaPrefix != null) {
column.setPath(schemaPrefix + "." + column.getPath());
boolean isMatch = patternSet.isEmpty();
for (String pathRegex : patternSet) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) {
isMatch = true;
break;
}
}
if (isMatch) {
columnSetAfterFilter.add(column);
}
} else {
columnSetAfterFilter.add(column);
}
}
} else {
columnSetAfterFilter.add(column);
}
}
} else {
columnSetAfterFilter.addAll(columnList);
}
columnSet.addAll(columnList);
} catch (PhysicalException e) {
return new TaskExecuteResult(e);
}
}

Set<String> pathRegexSet = showColumns.getPathRegexSet();
TagFilter tagFilter = showColumns.getTagFilter();

TreeSet<Column> tsSetAfterFilter = new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (Column column : columnSet) {
boolean isTarget = true;
if (!pathRegexSet.isEmpty()) {
isTarget = false;
for (String pathRegex : pathRegexSet) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) {
isTarget = true;
break;
}
}
}
if (tagFilter != null) {
if (!TagKVUtils.match(column.getTags(), tagFilter)) {
isTarget = false;
}
}
if (isTarget) {
tsSetAfterFilter.add(column);
}
}

int limit = showColumns.getLimit();
int offset = showColumns.getOffset();
if (limit == Integer.MAX_VALUE && offset == 0) {
return new TaskExecuteResult(Column.toRowStream(tsSetAfterFilter));
return new TaskExecuteResult(Column.toRowStream(columnSetAfterFilter));
} else {
// only need part of data.
List<Column> tsList = new ArrayList<>();
int cur = 0, size = tsSetAfterFilter.size();
for (Iterator<Column> iter = tsSetAfterFilter.iterator(); iter.hasNext(); cur++) {
int cur = 0, size = columnSetAfterFilter.size();
for (Iterator<Column> iter = columnSetAfterFilter.iterator(); iter.hasNext(); cur++) {
if (cur >= size || cur - offset >= limit) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.filesystem.exec.Executor;
import cn.edu.tsinghua.iginx.filesystem.exec.LocalExecutor;
import cn.edu.tsinghua.iginx.filesystem.exec.RemoteExecutor;
Expand All @@ -43,6 +44,7 @@
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -149,8 +151,9 @@ public TaskExecuteResult executeDelete(Delete delete, DataArea dataArea) {
}

@Override
public List<Column> getColumns() throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD);
public List<Column> getColumns(Set<String> pattern, TagFilter tagFilter)
throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD, pattern, tagFilter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface Executor {

Expand All @@ -26,7 +27,8 @@ TaskExecuteResult executeProjectTask(
TaskExecuteResult executeDeleteTask(
List<String> paths, List<KeyRange> keyRanges, TagFilter tagFilter, String storageUnit);

List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException;
List<Column> getColumnsOfStorageUnit(String storageUnit, Set<String> pattern, TagFilter tagFilter)
throws PhysicalException;

Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix)
throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream.EmptyRowStream;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.KeyRange;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
Expand All @@ -29,11 +30,14 @@
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -308,36 +312,54 @@ public TaskExecuteResult executeDeleteTask(
return new TaskExecuteResult(null, null);
}

boolean isPathMatchPattern(String path, Set<String> pattern) {
if (pattern.isEmpty()) {
return true;
}
for (String pathRegex : pattern) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) {
return true;
}
}
return false;
}

@Override
public List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException {
public List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> pattern, TagFilter tagFilter) throws PhysicalException {
List<Column> columns = new ArrayList<>();
if (root != null) {
File directory = new File(FilePathUtils.toIginxPath(root, storageUnit, null));
for (File file : fileSystemManager.getAllFiles(directory, false)) {
FileMeta meta = fileSystemManager.getFileMeta(file);
String columnPath =
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit);
if (meta == null) {
throw new PhysicalException(
String.format(
"encounter error when getting columns of storage unit because file meta %s is null",
file.getAbsolutePath()));
}
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit),
meta.getDataType(),
meta.getTags(),
false));
// get columns by pattern
if (!isPathMatchPattern(columnPath, pattern)) {
continue;
}
// get columns by tag filter
if (tagFilter != null && !TagKVUtils.match(meta.getTags(), tagFilter)) {
continue;
}
columns.add(new Column(columnPath, meta.getDataType(), meta.getTags(), false));
}
}
if (hasData && dummyRoot != null) {
// get columns from dummy storage unit
if (hasData && dummyRoot != null && tagFilter == null) {
for (File file : fileSystemManager.getAllFiles(new File(realDummyRoot), true)) {
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(
dummyRoot, file.getAbsolutePath(), storageUnit),
DataType.BINARY,
null,
true));
String dummyPath =
FilePathUtils.convertAbsolutePathToPath(dummyRoot, file.getAbsolutePath(), storageUnit);
if (!isPathMatchPattern(dummyPath, pattern)) {
continue;
}
columns.add(new Column(dummyPath, DataType.BINARY, null, true));
}
}
return columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.ThriftConnPool;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand Down Expand Up @@ -207,11 +204,13 @@ public TaskExecuteResult executeDeleteTask(
}

@Override
public List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException {
public List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> pattern, TagFilter tagFilter) throws PhysicalException {
try {
TTransport transport = thriftConnPool.borrowTransport();
Client client = new Client(new TBinaryProtocol(transport));
GetColumnsOfStorageUnitResp resp = client.getColumnsOfStorageUnit(storageUnit);
GetColumnsOfStorageUnitResp resp =
client.getColumnsOfStorageUnit(storageUnit, pattern, constructRawTagFilter(tagFilter));
thriftConnPool.returnTransport(transport);
List<Column> columns = new ArrayList<>();
resp.getPathList()
Expand Down Expand Up @@ -254,6 +253,9 @@ public void close() {

private RawTagFilter constructRawTagFilter(TagFilter tagFilter) {
RawTagFilter filter = null;
if (tagFilter == null) {
return null;
}
switch (tagFilter.getType()) {
case Base:
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,12 @@ public Status executeDelete(DeleteReq req) throws TException {
}

@Override
public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(String storageUnit) throws TException {
public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(
String storageUnit, Set<String> pattern, RawTagFilter tagFilter) throws TException {
List<FSColumn> ret = new ArrayList<>();
try {
List<Column> columns = executor.getColumnsOfStorageUnit(storageUnit);
List<Column> columns =
executor.getColumnsOfStorageUnit(storageUnit, pattern, resolveRawTagFilter(tagFilter));
columns.forEach(
column -> {
FSColumn fsColumn =
Expand Down
Loading