Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/silo/query_engine/actions/action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,13 @@ QueryPlan Action::toQueryPlan(
std::string_view request_id
) {
validateOrderByFields(table->schema);
auto query_plan =
auto query_plan_or_error =
toQueryPlanImpl(std::move(table), std::move(partition_filters), query_options, request_id);
if (!query_plan.status().ok()) {
SILO_PANIC("Arrow error: {}", query_plan.status().ToString());
if (!query_plan_or_error.status().ok()) {
SILO_PANIC("Arrow error: {}", query_plan_or_error.status().ToString());
};
return query_plan.ValueUnsafe();
auto query_plan = std::move(query_plan_or_error).ValueUnsafe();
return query_plan;
}

arrow::Result<arrow::acero::ExecNode*> Action::addSortNode(
Expand Down
17 changes: 5 additions & 12 deletions src/silo/query_engine/exec_node/table_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,11 @@ class TableScanGenerator {

arrow::Future<std::optional<arrow::ExecBatch>> operator()() {
SPDLOG_TRACE("TableScanGenerator::operator()");
auto future = arrow::Future<std::optional<arrow::ExecBatch>>::Make();
// We do this to guard against https://github.com/apache/arrow/issues/47641
// and https://github.com/apache/arrow/issues/47642
std::thread([future, this]() mutable {
try {
auto result = produceNextBatch();
future.MarkFinished(std::move(result));
} catch (const std::exception& exception) {
future.MarkFinished(arrow::Status::ExecutionError(exception.what()));
}
}).detach();
return future;
try {
return produceNextBatch();
} catch (const std::exception& exception) {
return arrow::Status::ExecutionError(exception.what());
}
};

private:
Expand Down
95 changes: 57 additions & 38 deletions src/silo/query_engine/query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ arrow::Status QueryPlan::executeAndWriteImpl(
EVOBENCH_SCOPE("QueryPlan", "execute");
SPDLOG_TRACE("{}", arrow_plan->ToString());
SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Starting the plan.", request_id);
arrow_plan->StartProducing();
SPDLOG_DEBUG(
"Request Id [{}] - QueryPlan - Plan started producing, will now read the resulting batches.",
request_id
);

// Ensure plan is stopped on any exit path (timeout/error/exception).
struct PlanStopGuard {
Expand Down Expand Up @@ -86,43 +81,68 @@ arrow::Status QueryPlan::executeAndWriteImpl(
}
} guard{.request_id = request_id, .plan = arrow_plan};

while (true) {
arrow::Future<std::optional<arrow::ExecBatch>> future_batch = results_generator();
SPDLOG_DEBUG("Request Id [{}] - QueryPlan - await the next batch", request_id);
bool finished_batch_in_time = future_batch.Wait(static_cast<double>(timeout_in_seconds));
if (!finished_batch_in_time) {
SPDLOG_WARN(
"Request Id [{}] - QueryPlan - Batch wait timed out after {} s — stopping plan.",
request_id,
timeout_in_seconds
);
return arrow::Status::ExecutionError(
fmt::format("Request timed out, no batch within {} seconds.", timeout_in_seconds)
auto plan_finished_future =
arrow::Loop([&]() -> arrow::Future<arrow::ControlFlow<arrow::Result<std::monostate>>> {
return results_generator().Then(
[&](std::optional<arrow::ExecBatch> optional_batch
) -> arrow::ControlFlow<arrow::Result<std::monostate>> {
SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Batch received", request_id);
SPDLOG_DEBUG(
"Request Id [{}] - QueryPlan - Current backpressure size: {} bytes, operation is "
"{}",
request_id,
backpressure_monitor->bytes_in_use(),
backpressure_monitor->is_paused() ? "paused" : "running"
);

if (optional_batch == std::nullopt) {
SPDLOG_DEBUG(
"Request Id [{}] - QueryPlan - Finished reading all batches.", request_id
);
return arrow::Break(arrow::Result{std::monostate{}}); // end of input
}
SPDLOG_DEBUG(
"Request Id [{}] - QueryPlan - Batch contains data with {} values.",
request_id,
optional_batch.value().length
);

// TODO(#764) make output format configurable
// The returned error-status is implicitly converted to an arrow::Break
ARROW_RETURN_NOT_OK(output_sink.writeBatch(optional_batch.value()));

SPDLOG_DEBUG("Request Id [{}] - QueryPlan - await the next batch", request_id);
return arrow::Continue();
}
);
}
ARROW_ASSIGN_OR_RAISE(std::optional<arrow::ExecBatch> optional_batch, future_batch.result());
SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Batch received", request_id);
SPDLOG_DEBUG(
"Request Id [{}] - QueryPlan - Current backpressure size: {} bytes, operation is {}",
request_id,
backpressure_monitor->bytes_in_use(),
backpressure_monitor->is_paused() ? "paused" : "running"
);
});

if (!optional_batch.has_value()) {
break; // end of input
}
SPDLOG_DEBUG(
"Request Id [{}] - QueryPlan - Batch contains data with {} values.",
arrow_plan->StartProducing();

SPDLOG_DEBUG(
"Request Id [{}] - QueryPlan - Plan started producing, will now read the resulting batches.",
request_id
);

bool finished_plan_in_time = plan_finished_future.Wait(static_cast<double>(timeout_in_seconds));
if (!finished_plan_in_time) {
// TODO.TAE check for progress
SPDLOG_WARN(
"Request Id [{}] - QueryPlan - Batch wait timed out after {} s — stopping plan.",
request_id,
optional_batch.value().length
timeout_in_seconds
);
return arrow::Status::ExecutionError(
fmt::format("Request timed out, no batch within {} seconds.", timeout_in_seconds)
);
}

// TODO(#764) make output format configurable
ARROW_RETURN_NOT_OK(output_sink.writeBatch(optional_batch.value()));
};
// Be sure that the plan finished and had no errors.
ARROW_RETURN_NOT_OK(plan_finished_future.status());

SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Plan finished producing", request_id);
ARROW_RETURN_NOT_OK(output_sink.finish());
SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Finished reading all batches.", request_id);

return arrow::Status::OK();
}

Expand All @@ -142,8 +162,7 @@ void QueryPlan::executeAndWrite(
throw std::runtime_error(fmt::format(
"Request Id [{}] - Internal server error. Please notify developers. SILO likely "
"constructed an invalid arrow plan and more user-input validation needs to be "
"added: "
"{}",
"added: {}",
request_id,
status.message()
));
Expand Down
Loading