diff --git a/src/silo/query_engine/actions/action.cpp b/src/silo/query_engine/actions/action.cpp index 6db9be924..42e0ee44d 100644 --- a/src/silo/query_engine/actions/action.cpp +++ b/src/silo/query_engine/actions/action.cpp @@ -221,12 +221,13 @@ QueryPlan Action::toQueryPlan( std::string_view request_id ) { validateOrderByFields(table->schema); - auto query_plan = + auto query_plan_or_error = toQueryPlanImpl(std::move(table), std::move(partition_filters), query_options, request_id); - if (!query_plan.status().ok()) { - SILO_PANIC("Arrow error: {}", query_plan.status().ToString()); + if (!query_plan_or_error.status().ok()) { + SILO_PANIC("Arrow error: {}", query_plan_or_error.status().ToString()); }; - return query_plan.ValueUnsafe(); + auto query_plan = std::move(query_plan_or_error).ValueUnsafe(); + return query_plan; } arrow::Result Action::addSortNode( diff --git a/src/silo/query_engine/exec_node/table_scan.h b/src/silo/query_engine/exec_node/table_scan.h index e021bf2d5..ca03027c1 100644 --- a/src/silo/query_engine/exec_node/table_scan.h +++ b/src/silo/query_engine/exec_node/table_scan.h @@ -73,18 +73,11 @@ class TableScanGenerator { arrow::Future> operator()() { SPDLOG_TRACE("TableScanGenerator::operator()"); - auto future = arrow::Future>::Make(); - // We do this to guard against https://github.com/apache/arrow/issues/47641 - // and https://github.com/apache/arrow/issues/47642 - std::thread([future, this]() mutable { - try { - auto result = produceNextBatch(); - future.MarkFinished(std::move(result)); - } catch (const std::exception& exception) { - future.MarkFinished(arrow::Status::ExecutionError(exception.what())); - } - }).detach(); - return future; + try { + return produceNextBatch(); + } catch (const std::exception& exception) { + return arrow::Status::ExecutionError(exception.what()); + } }; private: diff --git a/src/silo/query_engine/query_plan.cpp b/src/silo/query_engine/query_plan.cpp index aef182653..5754f2d19 100644 --- a/src/silo/query_engine/query_plan.cpp +++ b/src/silo/query_engine/query_plan.cpp @@ -36,11 +36,6 @@ arrow::Status QueryPlan::executeAndWriteImpl( EVOBENCH_SCOPE("QueryPlan", "execute"); SPDLOG_TRACE("{}", arrow_plan->ToString()); SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Starting the plan.", request_id); - arrow_plan->StartProducing(); - SPDLOG_DEBUG( - "Request Id [{}] - QueryPlan - Plan started producing, will now read the resulting batches.", - request_id - ); // Ensure plan is stopped on any exit path (timeout/error/exception). struct PlanStopGuard { @@ -86,43 +81,68 @@ arrow::Status QueryPlan::executeAndWriteImpl( } } guard{.request_id = request_id, .plan = arrow_plan}; - while (true) { - arrow::Future> future_batch = results_generator(); - SPDLOG_DEBUG("Request Id [{}] - QueryPlan - await the next batch", request_id); - bool finished_batch_in_time = future_batch.Wait(static_cast(timeout_in_seconds)); - if (!finished_batch_in_time) { - SPDLOG_WARN( - "Request Id [{}] - QueryPlan - Batch wait timed out after {} s — stopping plan.", - request_id, - timeout_in_seconds - ); - return arrow::Status::ExecutionError( - fmt::format("Request timed out, no batch within {} seconds.", timeout_in_seconds) + auto plan_finished_future = + arrow::Loop([&]() -> arrow::Future>> { + return results_generator().Then( + [&](std::optional optional_batch + ) -> arrow::ControlFlow> { + SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Batch received", request_id); + SPDLOG_DEBUG( + "Request Id [{}] - QueryPlan - Current backpressure size: {} bytes, operation is " + "{}", + request_id, + backpressure_monitor->bytes_in_use(), + backpressure_monitor->is_paused() ? "paused" : "running" + ); + + if (optional_batch == std::nullopt) { + SPDLOG_DEBUG( + "Request Id [{}] - QueryPlan - Finished reading all batches.", request_id + ); + return arrow::Break(arrow::Result{std::monostate{}}); // end of input + } + SPDLOG_DEBUG( + "Request Id [{}] - QueryPlan - Batch contains data with {} values.", + request_id, + optional_batch.value().length + ); + + // TODO(#764) make output format configurable + // The returned error-status is implicitly converted to an arrow::Break + ARROW_RETURN_NOT_OK(output_sink.writeBatch(optional_batch.value())); + + SPDLOG_DEBUG("Request Id [{}] - QueryPlan - await the next batch", request_id); + return arrow::Continue(); + } ); - } - ARROW_ASSIGN_OR_RAISE(std::optional optional_batch, future_batch.result()); - SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Batch received", request_id); - SPDLOG_DEBUG( - "Request Id [{}] - QueryPlan - Current backpressure size: {} bytes, operation is {}", - request_id, - backpressure_monitor->bytes_in_use(), - backpressure_monitor->is_paused() ? "paused" : "running" - ); + }); - if (!optional_batch.has_value()) { - break; // end of input - } - SPDLOG_DEBUG( - "Request Id [{}] - QueryPlan - Batch contains data with {} values.", + arrow_plan->StartProducing(); + + SPDLOG_DEBUG( + "Request Id [{}] - QueryPlan - Plan started producing, will now read the resulting batches.", + request_id + ); + + bool finished_plan_in_time = plan_finished_future.Wait(static_cast(timeout_in_seconds)); + if (!finished_plan_in_time) { + // TODO.TAE check for progress + SPDLOG_WARN( + "Request Id [{}] - QueryPlan - Batch wait timed out after {} s — stopping plan.", request_id, - optional_batch.value().length + timeout_in_seconds + ); + return arrow::Status::ExecutionError( + fmt::format("Request timed out, no batch within {} seconds.", timeout_in_seconds) ); + } - // TODO(#764) make output format configurable - ARROW_RETURN_NOT_OK(output_sink.writeBatch(optional_batch.value())); - }; + // Be sure that the plan finished and had no errors. + ARROW_RETURN_NOT_OK(plan_finished_future.status()); + + SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Plan finished producing", request_id); ARROW_RETURN_NOT_OK(output_sink.finish()); - SPDLOG_DEBUG("Request Id [{}] - QueryPlan - Finished reading all batches.", request_id); + return arrow::Status::OK(); } @@ -142,8 +162,7 @@ void QueryPlan::executeAndWrite( throw std::runtime_error(fmt::format( "Request Id [{}] - Internal server error. Please notify developers. SILO likely " "constructed an invalid arrow plan and more user-input validation needs to be " - "added: " - "{}", + "added: {}", request_id, status.message() ));