Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/DB-CE.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ on:
description: "The database to run the test on"
type: string
required: false
default: '["FileSystem", "IoTDB12", "InfluxDB", "PostgreSQL", "Redis", "MongoDB", "Parquet", "MySQL"]'
default: '["IoTDB12"]'

env:
VERSION: 0.6.0-SNAPSHOT
Expand Down
40 changes: 20 additions & 20 deletions .github/workflows/standard-test-suite.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,27 @@ concurrency:
cancel-in-progress: true

jobs:
unit-test:
uses: ./.github/workflows/unit-test.yml
unit-mds:
uses: ./.github/workflows/unit-mds.yml
case-regression:
uses: ./.github/workflows/case-regression.yml
with:
metadata-matrix: '["zookeeper"]'
standalone-test:
uses: ./.github/workflows/standalone-test.yml
with:
metadata-matrix: '["zookeeper"]'
standalone-test-pushdown:
uses: ./.github/workflows/standalone-test-pushdown.yml
with:
metadata-matrix: '["zookeeper"]'
# unit-test:
# uses: ./.github/workflows/unit-test.yml
# unit-mds:
# uses: ./.github/workflows/unit-mds.yml
# case-regression:
# uses: ./.github/workflows/case-regression.yml
# with:
# metadata-matrix: '["zookeeper"]'
# standalone-test:
# uses: ./.github/workflows/standalone-test.yml
# with:
# metadata-matrix: '["zookeeper"]'
# standalone-test-pushdown:
# uses: ./.github/workflows/standalone-test-pushdown.yml
# with:
# metadata-matrix: '["zookeeper"]'
db-ce:
uses: ./.github/workflows/DB-CE.yml
with:
metadata-matrix: '["zookeeper"]'
remote-test:
uses: ./.github/workflows/remote-test.yml
with:
metadata-matrix: '["zookeeper"]'
# remote-test:
# uses: ./.github/workflows/remote-test.yml
# with:
# metadata-matrix: '["zookeeper"]'
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.Insert;
import cn.edu.tsinghua.iginx.engine.shared.operator.Project;
import cn.edu.tsinghua.iginx.engine.shared.operator.Select;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.metadata.entity.ColumnsInterval;
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface IStorage {
/** 对非叠加分片查询数据 */
Expand All @@ -55,7 +57,7 @@ TaskExecuteResult executeProjectDummyWithSelect(
TaskExecuteResult executeInsert(Insert insert, DataArea dataArea);

/** 获取所有列信息 */
List<Column> getColumns() throws PhysicalException;
List<Column> getColumns(Set<String> pattern, TagFilter tagFilter) throws PhysicalException;

/** 获取指定前缀的数据边界 */
Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String prefix) throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
import cn.edu.tsinghua.iginx.engine.physical.storage.queue.StoragePhysicalTaskQueue;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.GlobalPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.MemoryPhysicalTask;
import cn.edu.tsinghua.iginx.engine.physical.task.StoragePhysicalTask;
Expand Down Expand Up @@ -316,64 +315,64 @@ public TaskExecuteResult executeGlobalTask(GlobalPhysicalTask task) {

public TaskExecuteResult executeShowColumns(ShowColumns showColumns) {
List<StorageEngineMeta> storageList = metaManager.getStorageEngineList();
Set<Column> columnSet = new HashSet<>();
TreeSet<Column> columnSetAfterFilter =
new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (StorageEngineMeta storage : storageList) {
long id = storage.getId();
Pair<IStorage, ThreadPoolExecutor> pair = storageManager.getStorage(id);
if (pair == null) {
continue;
}
try {
List<Column> columnList = pair.k.getColumns();
// fix the schemaPrefix
Set<String> patternSet = showColumns.getPathRegexSet();
TagFilter tagFilter = showColumns.getTagFilter();

List<Column> columnList = pair.k.getColumns(patternSet, tagFilter);

// fix the schemaPrefix and dataPrefix
String schemaPrefix = storage.getSchemaPrefix();
if (schemaPrefix != null) {
String dataPrefixRegex = storage.getDataPrefix() == null ? null : StringUtils.reformatPath(storage.getDataPrefix() + ".*");
if (tagFilter == null) {
for (Column column : columnList) {
if (column.isDummy()) {
column.setPath(schemaPrefix + "." + column.getPath());
if (dataPrefixRegex == null || Pattern.matches(dataPrefixRegex, column.getPath())) {
if (schemaPrefix != null) {
column.setPath(schemaPrefix + "." + column.getPath());
boolean isMatch = patternSet.isEmpty();
for (String pathRegex : patternSet) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) {
isMatch = true;
break;
}
}
if (isMatch) {
columnSetAfterFilter.add(column);
}
} else {
columnSetAfterFilter.add(column);
}
}
} else {
columnSetAfterFilter.add(column);
}
}
} else {
columnSetAfterFilter.addAll(columnList);
}
columnSet.addAll(columnList);
} catch (PhysicalException e) {
return new TaskExecuteResult(e);
}
}

Set<String> pathRegexSet = showColumns.getPathRegexSet();
TagFilter tagFilter = showColumns.getTagFilter();

TreeSet<Column> tsSetAfterFilter = new TreeSet<>(Comparator.comparing(Column::getPhysicalPath));
for (Column column : columnSet) {
boolean isTarget = true;
if (!pathRegexSet.isEmpty()) {
isTarget = false;
for (String pathRegex : pathRegexSet) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), column.getPath())) {
isTarget = true;
break;
}
}
}
if (tagFilter != null) {
if (!TagKVUtils.match(column.getTags(), tagFilter)) {
isTarget = false;
}
}
if (isTarget) {
tsSetAfterFilter.add(column);
}
}

int limit = showColumns.getLimit();
int offset = showColumns.getOffset();
if (limit == Integer.MAX_VALUE && offset == 0) {
return new TaskExecuteResult(Column.toRowStream(tsSetAfterFilter));
return new TaskExecuteResult(Column.toRowStream(columnSetAfterFilter));
} else {
// only need part of data.
List<Column> tsList = new ArrayList<>();
int cur = 0, size = tsSetAfterFilter.size();
for (Iterator<Column> iter = tsSetAfterFilter.iterator(); iter.hasNext(); cur++) {
int cur = 0, size = columnSetAfterFilter.size();
for (Iterator<Column> iter = columnSetAfterFilter.iterator(); iter.hasNext(); cur++) {
if (cur >= size || cur - offset >= limit) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Filter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.KeyFilter;
import cn.edu.tsinghua.iginx.engine.shared.operator.filter.Op;
import cn.edu.tsinghua.iginx.engine.shared.operator.tag.TagFilter;
import cn.edu.tsinghua.iginx.filesystem.exec.Executor;
import cn.edu.tsinghua.iginx.filesystem.exec.LocalExecutor;
import cn.edu.tsinghua.iginx.filesystem.exec.RemoteExecutor;
Expand All @@ -43,6 +44,7 @@
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -149,8 +151,9 @@ public TaskExecuteResult executeDelete(Delete delete, DataArea dataArea) {
}

@Override
public List<Column> getColumns() throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD);
public List<Column> getColumns(Set<String> pattern, TagFilter tagFilter)
throws PhysicalException {
return executor.getColumnsOfStorageUnit(WILDCARD, pattern, tagFilter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.utils.Pair;
import java.util.List;
import java.util.Set;

public interface Executor {

Expand All @@ -26,7 +27,8 @@ TaskExecuteResult executeProjectTask(
TaskExecuteResult executeDeleteTask(
List<String> paths, List<KeyRange> keyRanges, TagFilter tagFilter, String storageUnit);

List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException;
List<Column> getColumnsOfStorageUnit(String storageUnit, Set<String> pattern, TagFilter tagFilter)
throws PhysicalException;

Pair<ColumnsInterval, KeyInterval> getBoundaryOfStorage(String dataPrefix)
throws PhysicalException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.stream.EmptyRowStream;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.utils.TagKVUtils;
import cn.edu.tsinghua.iginx.engine.physical.task.TaskExecuteResult;
import cn.edu.tsinghua.iginx.engine.shared.KeyRange;
import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream;
Expand All @@ -29,11 +30,14 @@
import cn.edu.tsinghua.iginx.metadata.entity.KeyInterval;
import cn.edu.tsinghua.iginx.thrift.DataType;
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -308,36 +312,54 @@ public TaskExecuteResult executeDeleteTask(
return new TaskExecuteResult(null, null);
}

boolean isPathMatchPattern(String path, Set<String> pattern) {
if (pattern.isEmpty()) {
return true;
}
for (String pathRegex : pattern) {
if (Pattern.matches(StringUtils.reformatPath(pathRegex), path)) {
return true;
}
}
return false;
}

@Override
public List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException {
public List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> pattern, TagFilter tagFilter) throws PhysicalException {
List<Column> columns = new ArrayList<>();
if (root != null) {
File directory = new File(FilePathUtils.toIginxPath(root, storageUnit, null));
for (File file : fileSystemManager.getAllFiles(directory, false)) {
FileMeta meta = fileSystemManager.getFileMeta(file);
String columnPath =
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit);
if (meta == null) {
throw new PhysicalException(
String.format(
"encounter error when getting columns of storage unit because file meta %s is null",
file.getAbsolutePath()));
}
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(root, file.getAbsolutePath(), storageUnit),
meta.getDataType(),
meta.getTags(),
false));
// get columns by pattern
if (!isPathMatchPattern(columnPath, pattern)) {
continue;
}
// get columns by tag filter
if (tagFilter != null && !TagKVUtils.match(meta.getTags(), tagFilter)) {
continue;
}
columns.add(new Column(columnPath, meta.getDataType(), meta.getTags(), false));
}
}
if (hasData && dummyRoot != null) {
// get columns from dummy storage unit
if (hasData && dummyRoot != null && tagFilter == null) {
for (File file : fileSystemManager.getAllFiles(new File(realDummyRoot), true)) {
columns.add(
new Column(
FilePathUtils.convertAbsolutePathToPath(
dummyRoot, file.getAbsolutePath(), storageUnit),
DataType.BINARY,
null,
true));
String dummyPath =
FilePathUtils.convertAbsolutePathToPath(dummyRoot, file.getAbsolutePath(), storageUnit);
if (!isPathMatchPattern(dummyPath, pattern)) {
continue;
}
columns.add(new Column(dummyPath, DataType.BINARY, null, true));
}
}
return columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@
import cn.edu.tsinghua.iginx.utils.Pair;
import cn.edu.tsinghua.iginx.utils.ThriftConnPool;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand Down Expand Up @@ -207,11 +204,13 @@ public TaskExecuteResult executeDeleteTask(
}

@Override
public List<Column> getColumnsOfStorageUnit(String storageUnit) throws PhysicalException {
public List<Column> getColumnsOfStorageUnit(
String storageUnit, Set<String> pattern, TagFilter tagFilter) throws PhysicalException {
try {
TTransport transport = thriftConnPool.borrowTransport();
Client client = new Client(new TBinaryProtocol(transport));
GetColumnsOfStorageUnitResp resp = client.getColumnsOfStorageUnit(storageUnit);
GetColumnsOfStorageUnitResp resp =
client.getColumnsOfStorageUnit(storageUnit, pattern, constructRawTagFilter(tagFilter));
thriftConnPool.returnTransport(transport);
List<Column> columns = new ArrayList<>();
resp.getPathList()
Expand Down Expand Up @@ -254,6 +253,9 @@ public void close() {

private RawTagFilter constructRawTagFilter(TagFilter tagFilter) {
RawTagFilter filter = null;
if (tagFilter == null) {
return null;
}
switch (tagFilter.getType()) {
case Base:
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,12 @@ public Status executeDelete(DeleteReq req) throws TException {
}

@Override
public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(String storageUnit) throws TException {
public GetColumnsOfStorageUnitResp getColumnsOfStorageUnit(
String storageUnit, Set<String> pattern, RawTagFilter tagFilter) throws TException {
List<FSColumn> ret = new ArrayList<>();
try {
List<Column> columns = executor.getColumnsOfStorageUnit(storageUnit);
List<Column> columns =
executor.getColumnsOfStorageUnit(storageUnit, pattern, resolveRawTagFilter(tagFilter));
columns.forEach(
column -> {
FSColumn fsColumn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private String findExtremeRecordPath(
}

@Override
public List<Column> getColumns() {
public List<Column> getColumns(Set<String> pattern, TagFilter tagFilter) {
List<Column> timeseries = new ArrayList<>();

for (Bucket bucket :
Expand Down
Loading