Skip to content

Commit

Permalink
Merge branch 'main' into mmap2
Browse files Browse the repository at this point in the history
  • Loading branch information
small-turtle-1 authored Dec 30, 2024
2 parents 02f98ad + 1094cff commit 977dca5
Show file tree
Hide file tree
Showing 48 changed files with 19,515 additions and 17,620 deletions.
2 changes: 2 additions & 0 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ mem_index_capacity = 65536
# secret_key = "minioadmin"
# enable_https = false

snapshot_dir = "/var/infinity/snapshots"

[buffer]
buffer_manager_size = "4GB"
lru_num = 7
Expand Down
43 changes: 43 additions & 0 deletions src/admin/admin_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3399,6 +3399,28 @@ QueryResult AdminExecutor::ListConfigs(QueryContext *query_context, const AdminS
}
}

{
{
// option name
Value value = Value::MakeVarchar(SNAPSHOT_DIR_OPTION_NAME);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option name type
Value value = Value::MakeVarchar(std::to_string(global_config->CleanupInterval()));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option name type
Value value = Value::MakeVarchar("Snapshots store directory");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
}


{
{
// option name
Expand Down Expand Up @@ -3483,6 +3505,27 @@ QueryResult AdminExecutor::ListConfigs(QueryContext *query_context, const AdminS
}
}

{
{
// option name
Value value = Value::MakeVarchar(SNAPSHOT_DIR_OPTION_NAME);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option name type
Value value = Value::MakeVarchar(global_config->SnapshotDir());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option name type
Value value = Value::MakeVarchar("Snapshot storage directory");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
}

{
{
// option name
Expand Down
5 changes: 4 additions & 1 deletion src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,10 @@ export {
constexpr std::string_view DEFAULT_RESULT_CACHE = "off";
constexpr SizeT DEFAULT_CACHE_RESULT_CAPACITY = 10000;

constexpr std::string_view DEFAULT_SNAPSHOT_DIR = "/var/infinity/snapshot";

// default persistence parameter
constexpr std::string_view DEFAULT_PERSISTENCE_DIR = "/var/infinity/persistence"; // Empty means disabled
constexpr std::string_view DEFAULT_PERSISTENCE_DIR = "/var/infinity/persistence";
constexpr std::string_view DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT_STR = "128MB"; // 128MB
constexpr SizeT DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT = 128 * 1024lu * 1024lu; // 128MB

Expand Down Expand Up @@ -293,6 +295,7 @@ export {

constexpr std::string_view RECORD_RUNNING_QUERY_OPTION_NAME = "record_running_query";

constexpr std::string_view SNAPSHOT_DIR_OPTION_NAME = "snapshot_dir";
// Variable name
constexpr std::string_view QUERY_COUNT_VAR_NAME = "query_count"; // global and session
constexpr std::string_view SESSION_COUNT_VAR_NAME = "session_count"; // global
Expand Down
36 changes: 36 additions & 0 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1824,6 +1824,42 @@ void ExplainPhysicalPlan::Explain(const PhysicalShow *show_node, SharedPtr<Vecto
result->emplace_back(MakeShared<String>(output_columns_str));
break;
}
case ShowStmtType::kListSnapshots: {
String show_str;
if (intent_size != 0) {
show_str = String(intent_size - 2, ' ');
show_str += "-> SHOW SNAPSHOTS ";
} else {
show_str = "SHOW SNAPSHOTS ";
}
show_str += "(";
show_str += std::to_string(show_node->node_id());
show_str += ")";
result->emplace_back(MakeShared<String>(show_str));

String output_columns_str = String(intent_size, ' ');
output_columns_str += " - output columns: [name, count, total_size]";
result->emplace_back(MakeShared<String>(output_columns_str));
break;
}
case ShowStmtType::kShowSnapshot: {
String show_str;
if (intent_size != 0) {
show_str = String(intent_size - 2, ' ');
show_str += "-> SHOW MEMORY ALLOCATION ";
} else {
show_str = "SHOW MEMORY ALLOCATION ";
}
show_str += "(";
show_str += std::to_string(show_node->node_id());
show_str += ")";
result->emplace_back(MakeShared<String>(show_str));

String output_columns_str = String(intent_size, ' ');
output_columns_str += " - output columns: [name, count, total_size]";
result->emplace_back(MakeShared<String>(output_columns_str));
break;
}
case ShowStmtType::kFunction: {
String show_str;
if (intent_size != 0) {
Expand Down
53 changes: 53 additions & 0 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,59 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
auto *compact_processor = query_context->storage()->compaction_processor();
compact_processor->AddTestCommand(BGTaskType::kTestCommand, "stuck for 3 seconds");
} else if (test_command->command_content() == "delta checkpoint") {
LOG_INFO(fmt::format("test command: delta checkpoint"));
} else {
LOG_INFO(fmt::format("test command: other"));
}
break;
}
case CommandType::kSnapshot: {
SnapshotCmd *snapshot_cmd = static_cast<SnapshotCmd *>(command_info_.get());
LOG_INFO(fmt::format("Execute snapshot command"));
SnapshotOp snapshot_operation = snapshot_cmd->operation();
switch(snapshot_operation) {
case SnapshotOp::kCreate: {
LOG_INFO(fmt::format("Execute snapshot create"));
break;
}
case SnapshotOp::kDrop: {
LOG_INFO(fmt::format("Execute snapshot drop"));
break;
}
case SnapshotOp::kRestore: {
LOG_INFO(fmt::format("Execute snapshot restore"));
break;
}
default: {
String error_message = "Invalid snapshot operation type";
UnrecoverableError(error_message);
break;
}
}

SnapshotScope snapshot_scope = snapshot_cmd->scope();
switch(snapshot_scope) {
case SnapshotScope::kSystem: {
LOG_INFO(fmt::format("Execute snapshot system"));
break;
}
case SnapshotScope::kDatabase: {
LOG_INFO(fmt::format("Execute snapshot database"));
break;
}
case SnapshotScope::kTable: {
LOG_INFO(fmt::format("Execute snapshot table"));
break;
}
case SnapshotScope::kIgnore: {
LOG_INFO(fmt::format("Execute snapshot ignore"));
break;
}
default: {
String error_message = "Invalid snapshot scope";
UnrecoverableError(error_message);
break;
}
}
break;
}
Expand Down
118 changes: 118 additions & 0 deletions src/executor/operator/physical_show.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,30 @@ void PhysicalShow::Init() {
output_types_->emplace_back(varchar_type);
break;
}
case ShowStmtType::kListSnapshots: {
output_names_->reserve(5);
output_types_->reserve(5);
output_names_->emplace_back("name");
output_names_->emplace_back("scope");
output_names_->emplace_back("time");
output_names_->emplace_back("commit");
output_names_->emplace_back("size");
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(bigint_type);
output_types_->emplace_back(bigint_type);
break;
}
case ShowStmtType::kShowSnapshot: {
output_names_->reserve(2);
output_types_->reserve(2);
output_names_->emplace_back("name");
output_names_->emplace_back("value");
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(varchar_type);
break;
}
default: {
Status status = Status::NotSupport("Not implemented show type");
RecoverableError(status);
Expand Down Expand Up @@ -716,6 +740,14 @@ bool PhysicalShow::Execute(QueryContext *query_context, OperatorState *operator_
ExecuteShowFunction(query_context, show_operator_state);
break;
}
case ShowStmtType::kListSnapshots: {
ExecuteListSnapshots(query_context, show_operator_state);
break;
}
case ShowStmtType::kShowSnapshot: {
ExecuteShowSnapshot(query_context, show_operator_state);
break;
}
default: {
String error_message = "Invalid chunk scan type";
UnrecoverableError(error_message);
Expand Down Expand Up @@ -3184,6 +3216,27 @@ void PhysicalShow::ExecuteShowConfigs(QueryContext *query_context, ShowOperatorS
}
}

{
{
// option name
Value value = Value::MakeVarchar(SNAPSHOT_DIR_OPTION_NAME);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[0]);
}
{
// option name type
Value value = Value::MakeVarchar(global_config->SnapshotDir());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[1]);
}
{
// option name type
Value value = Value::MakeVarchar("Snapshot storage directory");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[2]);
}
}

{
{
// option name
Expand Down Expand Up @@ -6407,4 +6460,69 @@ void PhysicalShow::ExecuteShowFunction(QueryContext *query_context, ShowOperator
return;
}

void PhysicalShow::ExecuteListSnapshots(QueryContext *query_context, ShowOperatorState *operator_state) {
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);
auto bigint_type = MakeShared<DataType>(LogicalType::kBigInt);
UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
Vector<SharedPtr<DataType>> column_types{
varchar_type,
varchar_type,
varchar_type,
bigint_type,
bigint_type
};

output_block_ptr->Init(column_types);
output_block_ptr->Finalize();
operator_state->output_.emplace_back(std::move(output_block_ptr));
return;
}

void PhysicalShow::ExecuteShowSnapshot(QueryContext *query_context, ShowOperatorState *operator_state) {
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);
UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
Vector<SharedPtr<DataType>> column_types{
varchar_type,
varchar_type,
};

output_block_ptr->Init(column_types);

// {
// SizeT column_id = 0;
// {
// Value value = Value::MakeVarchar("memory_objects");
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
//
// ++column_id;
// {
// Value value = Value::MakeVarchar(GlobalResourceUsage::GetObjectCountInfo());
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
// }
//
// {
// SizeT column_id = 0;
// {
// Value value = Value::MakeVarchar("memory_allocation");
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
//
// ++column_id;
// {
// Value value = Value::MakeVarchar(GlobalResourceUsage::GetRawMemoryInfo());
// ValueExpression value_expr(value);
// value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
// }
// }

output_block_ptr->Finalize();
operator_state->output_.emplace_back(std::move(output_block_ptr));
return;
}

} // namespace infinity
4 changes: 4 additions & 0 deletions src/executor/operator/physical_show.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ private:

void ExecuteShowFunction(QueryContext *query_context, ShowOperatorState *operator_state);

void ExecuteListSnapshots(QueryContext *query_context, ShowOperatorState *operator_state);

void ExecuteShowSnapshot(QueryContext *query_context, ShowOperatorState *operator_state);

private:
ShowStmtType show_type_{ShowStmtType::kInvalid};
String db_name_{};
Expand Down
33 changes: 33 additions & 0 deletions src/executor/operator/snapshot/snapshot.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

module;

export module snapshot;

import stl;
import status;
import snapshot_info;
import query_context;

namespace infinity {

class Snapshot {
public:
static Status CreateTableSnapshot(QueryContext *query_context, const String &snapshot_name, const String& table_name);
static Status RestoreTableSnapshot();
static Status DropSnapshot(QueryContext *query_context, const String &snapshot_name);
};

} // namespace infinity
Loading

0 comments on commit 977dca5

Please sign in to comment.