Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Also make joins between two index scans individually observable. #1666

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,25 @@
}

// _____________________________________________________________________________
Result::Generator IndexScan::chunkedIndexScan() const {
Result::Generator IndexScan::chunkedIndexScan() {
auto optBlockSpan = getBlockMetadata();
if (!optBlockSpan.has_value()) {
co_return;
}
const auto& blockSpan = optBlockSpan.value();
size_t numBlocksAll = blockSpan.size();
// Note: Given a `PrefilterIndexPair` is available, the corresponding
// prefiltering will be applied in `getLazyScan`.
for (IdTable& idTable : getLazyScan({blockSpan.begin(), blockSpan.end()})) {
auto innerGenerator = getLazyScan({blockSpan.begin(), blockSpan.end()});
auto setDetails = ad_utility::makeOnDestructionDontThrowDuringStackUnwinding(
[

this, numBlocksAll, &innerGenerator]() {
auto details = innerGenerator.details();
details.numBlocksAll_ = numBlocksAll;
updateRuntimeInfoForLazyScan(details);
});
for (IdTable& idTable : innerGenerator) {
co_yield {std::move(idTable), LocalVocab{}};
}
}
Expand Down Expand Up @@ -339,7 +349,14 @@
// _____________________________________________________________________________
std::optional<std::vector<CompressedBlockMetadata>>
IndexScan::getBlockMetadataOptionallyPrefiltered() const {
auto optBlockSpan = getBlockMetadata();
auto optBlockSpan =
[&]() -> std::optional<std::span<const CompressedBlockMetadata>> {
if (prefilteredBlocks_.has_value()) {
return prefilteredBlocks_.value();
} else {
return getBlockMetadata();
}
}();
std::optional<std::vector<CompressedBlockMetadata>> optBlocks = std::nullopt;
if (optBlockSpan.has_value()) {
const auto& blockSpan = optBlockSpan.value();
Expand Down Expand Up @@ -389,6 +406,7 @@
};

// _____________________________________________________________________________
// TODO<joka921> This can be removed now.
std::array<Permutation::IdTableGenerator, 2>
IndexScan::lazyScanForJoinOfTwoScans(const IndexScan& s1, const IndexScan& s2) {
AD_CONTRACT_CHECK(s1.numVariables_ <= 3 && s2.numVariables_ <= 3);
Expand Down Expand Up @@ -656,3 +674,57 @@
return {createPrefilteredJoinSide(state),
createPrefilteredIndexScanSide(state)};
}

// _____________________________________________________________________________
void IndexScan::setBlocksForJoinOfIndexScans(Operation* left,
Operation* right) {
auto& leftScan = dynamic_cast<IndexScan&>(*left);
auto& rightScan = dynamic_cast<IndexScan&>(*right);

auto getBlocks = [](IndexScan& scan) {
auto metaBlocks = scan.getMetadataForScan();
if (!metaBlocks.has_value()) {
return metaBlocks;
}

Check warning on line 688 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L687-L688

Added lines #L687 - L688 were not covered by tests
if (scan.prefilteredBlocks_.has_value()) {
metaBlocks.value().blockMetadata_ = scan.prefilteredBlocks_.value();
}
return metaBlocks;
};

auto metaBlocks1 = getBlocks(leftScan);
auto metaBlocks2 = getBlocks(rightScan);
if (!metaBlocks1.has_value() || !metaBlocks2.has_value()) {
return;
}

Check warning on line 699 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L698-L699

Added lines #L698 - L699 were not covered by tests
auto [blocks1, blocks2] = CompressedRelationReader::getBlocksForJoin(
metaBlocks1.value(), metaBlocks2.value());
leftScan.prefilteredBlocks_ = std::move(blocks1);
rightScan.prefilteredBlocks_ = std::move(blocks2);
}

// _____________________________________________________________________________
std::vector<Operation*> IndexScan::getIndexScansForSortVariables(
std::span<const Variable> variables) {
const auto& sorted = resultSortedOn();
if (resultSortedOn().size() < variables.size()) {
return {};
}

Check warning on line 712 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L711-L712

Added lines #L711 - L712 were not covered by tests
const auto& varColMap = getExternallyVisibleVariableColumns();
for (size_t i = 0; i < variables.size(); ++i) {
auto it = varColMap.find(variables[i]);
if (it == varColMap.end() ||
it->second.columnIndex_ != resultSortedOn().at(i)) {
return {};
}

Check warning on line 719 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L718-L719

Added lines #L718 - L719 were not covered by tests
}
return {this};
}

// _____________________________________________________________________________
void IndexScan::setPrefilteredBlocks(
std::vector<CompressedBlockMetadata> prefilteredBlocks) {
prefilteredBlocks_ = std::move(prefilteredBlocks);

Check warning on line 727 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L726-L727

Added lines #L726 - L727 were not covered by tests
// TODO<joka921> once the other PR is merged we have to assert that the result
// is never cached AD_CORRECTNESS_CHECK(!canBeStoredInCache());
}

Check warning on line 730 in src/engine/IndexScan.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/IndexScan.cpp#L730

Added line #L730 was not covered by tests
15 changes: 14 additions & 1 deletion src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class IndexScan final : public Operation {
std::vector<ColumnIndex> additionalColumns_;
std::vector<Variable> additionalVariables_;

// TODO<joka921> Comment
std::optional<std::vector<CompressedBlockMetadata>> prefilteredBlocks_;

public:
IndexScan(QueryExecutionContext* qec, Permutation::Enum permutation,
const SparqlTriple& triple, Graphs graphsToFilter = std::nullopt,
Expand Down Expand Up @@ -108,6 +111,9 @@ class IndexScan final : public Operation {
std::pair<Result::Generator, Result::Generator> prefilterTables(
Result::Generator input, ColumnIndex joinColumn);

// TODO<joka921> Comment
static void setBlocksForJoinOfIndexScans(Operation* left, Operation* right);

private:
// Implementation detail that allows to consume a generator from two other
// cooperating generators. Needs to be forward declared as it is used by
Expand Down Expand Up @@ -202,7 +208,7 @@ class IndexScan final : public Operation {
PrefilterIndexPair prefilter) const;

// Return the (lazy) `IdTable` for this `IndexScan` in chunks.
Result::Generator chunkedIndexScan() const;
Result::Generator chunkedIndexScan();
// Get the `IdTable` for this `IndexScan` in one piece.
IdTable materializedIndexScan() const;

Expand Down Expand Up @@ -234,4 +240,11 @@ class IndexScan final : public Operation {
Permutation::IdTableGenerator getLazyScan(
std::vector<CompressedBlockMetadata> blocks) const;
std::optional<Permutation::MetadataAndBlocks> getMetadataForScan() const;

// TODO<joka921> Comment.
void setPrefilteredBlocks(
std::vector<CompressedBlockMetadata> prefilteredBlocks);

std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) override;
};
67 changes: 21 additions & 46 deletions src/engine/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@
auto rightResIfCached = getCachedOrSmallResult(*_right);
checkCancellation();

// TODO<joka921> Copy and move to separate function.
std::span joinVarSpan{&_joinVar, 1};
auto leftIndexScans = _left->getIndexScansForSortVariables(joinVarSpan);
auto rightIndexScans = _right->getIndexScansForSortVariables(joinVarSpan);
for (auto* left : leftIndexScans) {
for (auto* right : rightIndexScans) {
IndexScan::setBlocksForJoinOfIndexScans(left, right);
}
}

auto leftIndexScan =
std::dynamic_pointer_cast<IndexScan>(_left->getRootOperation());
if (leftIndexScan &&
Expand All @@ -189,9 +199,6 @@
AD_CORRECTNESS_CHECK(rightResIfCached->isFullyMaterialized());
return computeResultForIndexScanAndIdTable<true>(
requestLaziness, std::move(rightResIfCached), leftIndexScan);

} else if (!leftResIfCached) {
return computeResultForTwoIndexScans(requestLaziness);
}
}

Expand All @@ -216,9 +223,10 @@
if (leftRes->isFullyMaterialized()) {
return computeResultForIndexScanAndIdTable<false>(
requestLaziness, std::move(leftRes), rightIndexScan);
} else if (!leftIndexScan) {
return computeResultForIndexScanAndLazyOperation(
requestLaziness, std::move(leftRes), rightIndexScan);
}
return computeResultForIndexScanAndLazyOperation(
requestLaziness, std::move(leftRes), rightIndexScan);
}

std::shared_ptr<const Result> rightRes =
Expand Down Expand Up @@ -647,47 +655,6 @@
}
}

// ______________________________________________________________________________________________________
ProtoResult Join::computeResultForTwoIndexScans(bool requestLaziness) const {
return createResult(
requestLaziness,
[this](std::function<void(IdTable&, LocalVocab&)> yieldTable) {
auto leftScan =
std::dynamic_pointer_cast<IndexScan>(_left->getRootOperation());
auto rightScan =
std::dynamic_pointer_cast<IndexScan>(_right->getRootOperation());
AD_CORRECTNESS_CHECK(leftScan && rightScan);
// The join column already is the first column in both inputs, so we
// don't have to permute the inputs and results for the
// `AddCombinedRowToIdTable` class to work correctly.
AD_CORRECTNESS_CHECK(_leftJoinCol == 0 && _rightJoinCol == 0);
auto rowAdder = makeRowAdder(std::move(yieldTable));

ad_utility::Timer timer{
ad_utility::timer::Timer::InitialStatus::Started};
auto [leftBlocksInternal, rightBlocksInternal] =
IndexScan::lazyScanForJoinOfTwoScans(*leftScan, *rightScan);
runtimeInfo().addDetail("time-for-filtering-blocks", timer.msecs());

auto leftBlocks = convertGenerator(std::move(leftBlocksInternal));
auto rightBlocks = convertGenerator(std::move(rightBlocksInternal));

ad_utility::zipperJoinForBlocksWithoutUndef(leftBlocks, rightBlocks,
std::less{}, rowAdder);

leftScan->updateRuntimeInfoForLazyScan(leftBlocks.details());
rightScan->updateRuntimeInfoForLazyScan(rightBlocks.details());

AD_CORRECTNESS_CHECK(leftBlocks.details().numBlocksRead_ <=
rightBlocks.details().numElementsRead_);
AD_CORRECTNESS_CHECK(rightBlocks.details().numBlocksRead_ <=
leftBlocks.details().numElementsRead_);
auto localVocab = std::move(rowAdder.localVocab());
return Result::IdTableVocabPair{std::move(rowAdder).resultTable(),
std::move(localVocab)};
});
}

// ______________________________________________________________________________________________________
template <bool idTableIsRightInput>
ProtoResult Join::computeResultForIndexScanAndIdTable(
Expand Down Expand Up @@ -834,3 +801,11 @@
1, IdTable{getResultWidth(), allocator()}, cancellationHandle_,
CHUNK_SIZE, std::move(callback)};
}
// _____________________________________________________________________________
std::vector<Operation*> Join::getIndexScansForSortVariables(
std::span<const Variable> variables) {
auto result = _left->getIndexScansForSortVariables(variables);
auto right = _right->getIndexScansForSortVariables(variables);
result.insert(result.end(), right.begin(), right.end());
return result;
}

Check warning on line 811 in src/engine/Join.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/Join.cpp#L806-L811

Added lines #L806 - L811 were not covered by tests
9 changes: 4 additions & 5 deletions src/engine/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ class Join : public Operation {
static void hashJoin(const IdTable& dynA, ColumnIndex jc1,
const IdTable& dynB, ColumnIndex jc2, IdTable* dynRes);

// TODO<joka921> Comment.
std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) override;

protected:
virtual string getCacheKeyImpl() const override;

Expand All @@ -149,11 +153,6 @@ class Join : public Operation {

VariableToColumnMap computeVariableToColumnMap() const override;

// A special implementation that is called when both children are
// `IndexScan`s. Uses the lazy scans to only retrieve the subset of the
// `IndexScan`s that is actually needed without fully materializing them.
ProtoResult computeResultForTwoIndexScans(bool requestLaziness) const;

// A special implementation that is called when exactly one of the children is
// an `IndexScan` and the other one is a fully materialized result. The
// argument `idTableIsRightInput` determines whether the `IndexScan` is the
Expand Down
6 changes: 6 additions & 0 deletions src/engine/Operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,12 @@ class Operation {
RuntimeInformation::Status status =
RuntimeInformation::Status::optimizedOut);

// TODO<joka921> Comment.
virtual std::vector<Operation*> getIndexScansForSortVariables(
[[maybe_unused]] std::span<const Variable> variables) {
return {};
}

private:
// Create the runtime information in case the evaluation of this operation has
// failed.
Expand Down
15 changes: 15 additions & 0 deletions src/engine/QueryExecutionTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,18 @@ QueryExecutionTree::getVariableAndInfoByColumnIndex(ColumnIndex colIdx) const {
AD_CONTRACT_CHECK(it != varColMap.end());
return *it;
}

// _____________________________________________________________________________
std::vector<Operation*> QueryExecutionTree::getIndexScansForSortVariables(
std::span<const Variable> variables) {
auto result = rootOperation_->getIndexScansForSortVariables(variables);
if (result.empty()) {
return result;
}
// TODO<joka921> We have to disable the caching as soon as the PR for that is
// merged.
// rootOperation_->disableCaching();
cachedResult_.reset();
sizeEstimate_.reset();
return result;
}
4 changes: 4 additions & 0 deletions src/engine/QueryExecutionTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ class QueryExecutionTree {
return rootOperation_->collectWarnings();
}

// TODO<joka921> Comment.
virtual std::vector<Operation*> getIndexScansForSortVariables(
std::span<const Variable> variables) final;

template <typename F>
void forAllDescendants(F f) {
static_assert(
Expand Down
2 changes: 1 addition & 1 deletion src/index/CompressedRelation.h
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ class CompressedRelationReader {
// to be performed.
struct ScanSpecAndBlocks {
ScanSpecification scanSpec_;
const std::span<const CompressedBlockMetadata> blockMetadata_;
std::span<const CompressedBlockMetadata> blockMetadata_;
};

// This struct additionally contains the first and last triple of the scan
Expand Down
Loading