Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions tree/ntuple/inc/ROOT/RMiniFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ class TVirtualStreamerInfo;

namespace ROOT {

class RNTupleWriteOptions;

namespace Internal {

class RRawFile;
}

class RNTupleWriteOptions;
TDirectory *GetUnderlyingDirectory(ROOT::Internal::RNTupleFileWriter &writer);

namespace Internal {
/// Holds status information of an open ROOT file during writing
struct RTFileControlBlock;

Expand Down Expand Up @@ -68,9 +69,6 @@ private:
/// Used when the file turns out to be a TFile container. The ntuplePath variable is either the ntuple name
/// or an ntuple name preceded by a directory (`myNtuple` or `foo/bar/myNtuple` or `/foo/bar/myNtuple`)
RResult<RNTuple> GetNTupleProper(std::string_view ntuplePath);
/// Loads an RNTuple anchor from a TFile at the given file offset (unzipping it if necessary).
RResult<RNTuple>
GetNTupleProperAtOffset(std::uint64_t payloadOffset, std::uint64_t compSize, std::uint64_t uncompLen);

/// Searches for a key with the given name and type in the key index of the directory starting at offsetDir.
/// The offset points to the start of the TDirectory DATA section, without the key and without the name and title
Expand All @@ -84,6 +82,9 @@ public:
explicit RMiniFileReader(ROOT::Internal::RRawFile *rawFile);
/// Extracts header and footer location for the RNTuple identified by ntupleName
RResult<RNTuple> GetNTuple(std::string_view ntupleName);
/// Loads an RNTuple anchor from a TFile at the given file offset (unzipping it if necessary).
RResult<RNTuple>
GetNTupleProperAtOffset(std::uint64_t payloadOffset, std::uint64_t compSize, std::uint64_t uncompLen);
/// Reads a given byte range from the file into the provided memory buffer.
/// If `nbytes > fMaxKeySize` it will perform chunked read from multiple blobs,
/// whose addresses are listed at the end of the first chunk.
Expand All @@ -109,6 +110,8 @@ A stand-alone version of RNTuple can remove the TFile based writer.
*/
// clang-format on
class RNTupleFileWriter {
friend TDirectory *ROOT::Internal::GetUnderlyingDirectory(ROOT::Internal::RNTupleFileWriter &writer);

public:
/// The key length of a blob. It is always a big key (version > 1000) with class name RBlob.
static constexpr std::size_t kBlobKeyLen = 42;
Expand Down Expand Up @@ -254,7 +257,7 @@ public:
void WriteIntoReservedBlob(const void *buffer, size_t nbytes, std::int64_t offset);
/// Ensures that the streamer info records passed as argument are written to the file
void UpdateStreamerInfos(const ROOT::Internal::RNTupleSerializer::StreamerInfoMap_t &streamerInfos);
/// Writes the RNTuple key to the file so that the header and footer keys can be found
/// Writes the RNTuple key to the file so that the header and footer keys can be found.
void Commit(int compression = RCompressionSetting::EDefaults::kUseGeneralPurpose);
};

Expand Down
1 change: 1 addition & 0 deletions tree/ntuple/inc/ROOT/RPageNullSink.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public:
void CommitStagedClusters(std::span<RStagedCluster>) final {}
void CommitClusterGroup() final {}
void CommitDatasetImpl() final {}
std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view) const final { return nullptr; }
};

} // namespace Internal
Expand Down
5 changes: 5 additions & 0 deletions tree/ntuple/inc/ROOT/RPageSinkBuf.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public:
void CommitDatasetImpl() final;

RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;

std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view newName) const final
{
return std::make_unique<RPageSinkBuf>(fInnerSink->CreateNewWithNewRNTuple(newName));
}
}; // RPageSinkBuf

} // namespace Internal
Expand Down
12 changes: 9 additions & 3 deletions tree/ntuple/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ namespace ROOT {
class RNTupleModel;

namespace Internal {

class RPageAllocator;
class RColumn;
class RMiniFileReader;
class RPageAllocator;
struct RNTupleModelChangeset;

enum class EPageStorageType {
Expand Down Expand Up @@ -313,6 +313,10 @@ public:

virtual ROOT::NTupleSize_t GetNEntries() const = 0;

/// Creates a new RPageSink linked to the same underlying storage as this, writing to a new RNTuple called `newName`.
/// The existing sink will stay valid. The existing sink and the new one must not write concurrently.
virtual std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view newName) const = 0;

/// Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket)
/// Init() associates column handles to the columns referenced by the model
void Init(RNTupleModel &model)
Expand Down Expand Up @@ -757,7 +761,7 @@ public:
/// The underlying `std::shared_mutex`, however, is neither read nor write recursive:
/// within one thread, only one lock (shared or exclusive) must be acquired at the same time. This requires special
/// care in sections protected by `GetSharedDescriptorGuard()` and `GetExclDescriptorGuard()` especially to avoid
/// that the locks are acquired indirectly (e.g. by a call to `GetNEntries()`). As a general guideline, no other
/// that the locks are acquired indirectly. As a general guideline, no other
/// method of the page source should be called (directly or indirectly) in a guarded section.
const RSharedDescriptorGuard GetSharedDescriptorGuard() const
{
Expand Down Expand Up @@ -808,6 +812,8 @@ public:
virtual std::vector<std::unique_ptr<ROOT::Internal::RCluster>>
LoadClusters(std::span<ROOT::Internal::RCluster::RKey> clusterKeys) = 0;

virtual RMiniFileReader *GetUnderlyingReader() { return nullptr; }

/// Parallel decompression and unpacking of the pages in the given cluster. The unzipped pages are supposed
/// to be preloaded in a page pool attached to the source. The method is triggered by the cluster pool's
/// unzip thread. It is an optional optimization, the method can safely do nothing. In particular, the
Expand Down
5 changes: 4 additions & 1 deletion tree/ntuple/inc/ROOT/RPageStorageDaos.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ struct RDaosNTupleAnchor {
/// The object class for user data OIDs, e.g. `SX`
std::string fObjClass{};

bool operator ==(const RDaosNTupleAnchor &other) const {
bool operator==(const RDaosNTupleAnchor &other) const
{
return fVersionAnchor == other.fVersionAnchor && fVersionEpoch == other.fVersionEpoch &&
fVersionMajor == other.fVersionMajor && fVersionMinor == other.fVersionMinor &&
fVersionPatch == other.fVersionPatch && fNBytesHeader == other.fNBytesHeader &&
Expand Down Expand Up @@ -140,6 +141,8 @@ protected:
public:
RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const ROOT::RNTupleWriteOptions &options);
~RPageSinkDaos() override;

std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view) const final;
}; // class RPageSinkDaos

// clang-format off
Expand Down
11 changes: 11 additions & 0 deletions tree/ntuple/inc/ROOT/RPageStorageFile.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ public:
RPageSinkFile(RPageSinkFile &&) = default;
RPageSinkFile &operator=(RPageSinkFile &&) = default;
~RPageSinkFile() override;

std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view) const final;

ROOT::Internal::RNTupleFileWriter *GetUnderlyingWriter() const { return fWriter.get(); }
}; // class RPageSinkFile

// clang-format off
Expand Down Expand Up @@ -149,6 +153,8 @@ private:
std::unique_ptr<ROOT::Internal::RCluster>
PrepareSingleCluster(const ROOT::Internal::RCluster::RKey &clusterKey, std::vector<RRawFile::RIOVec> &readRequests);

RMiniFileReader *GetUnderlyingReader() final { return &fReader; }

protected:
void LoadStructureImpl() final;
ROOT::RNTupleDescriptor AttachImpl(RNTupleSerializer::EDescriptorDeserializeMode mode) final;
Expand All @@ -173,6 +179,11 @@ public:
RPageSourceFile &operator=(RPageSourceFile &&) = delete;
~RPageSourceFile() override;

/// Creates a new PageSourceFile using the same underlying file as this but referring to a different RNTuple,
/// represented by `anchor`.
std::unique_ptr<RPageSourceFile>
OpenWithDifferentAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options = ROOT::RNTupleReadOptions());

void
LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) final;

Expand Down
8 changes: 8 additions & 0 deletions tree/ntuple/src/RMiniFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1608,3 +1608,11 @@ void ROOT::Internal::RNTupleFileWriter::WriteTFileSkeleton(int defaultCompressio
fileSimple.Write(&padding, sizeof(padding));
fileSimple.fKeyOffset = fileSimple.fFilePos;
}

TDirectory *ROOT::Internal::GetUnderlyingDirectory(ROOT::Internal::RNTupleFileWriter &writer)
{
if (auto *proper = std::get_if<ROOT::Internal::RNTupleFileWriter::RFileProper>(&writer.fFile)) {
return proper->fDirectory;
}
return nullptr;
}
4 changes: 4 additions & 0 deletions tree/ntuple/src/RNTupleParallelWriter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ class RPageSynchronizingSink : public RPageSink {
{
throw ROOT::RException(R__FAIL("should never commit dataset via RPageSynchronizingSink"));
}
std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view) const final
{
throw ROOT::RException(R__FAIL("CreateNewWithNewRNTuple unavailable for RPageSynchronizingSink"));
}

RSinkGuard GetSinkGuard() final { return RSinkGuard(fMutex); }
};
Expand Down
6 changes: 6 additions & 0 deletions tree/ntuple/src/RPageStorageDaos.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,12 @@ void ROOT::Experimental::Internal::RPageSinkDaos::WriteNTupleAnchor()
kDistributionKeyDefault, kAttributeKeyAnchor, kCidMetadata);
}

std::unique_ptr<ROOT::Internal::RPageSink>
ROOT::Experimental::Internal::RPageSinkDaos::CreateNewWithNewRNTuple(std::string_view) const
{
throw ROOT::RException(R__FAIL("this method is not available for the DAOS backend"));
}

////////////////////////////////////////////////////////////////////////////////

ROOT::Experimental::Internal::RPageSourceDaos::RPageSourceDaos(std::string_view ntupleName, std::string_view uri,
Expand Down
44 changes: 32 additions & 12 deletions tree/ntuple/src/RPageStorageFile.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,18 @@ void ROOT::Internal::RPageSinkFile::CommitDatasetImpl(unsigned char *serializedF
fWriter->Commit(GetWriteOptions().GetCompression());
}

std::unique_ptr<ROOT::Internal::RPageSink>
ROOT::Internal::RPageSinkFile::CreateNewWithNewRNTuple(std::string_view newName) const
{
if (auto *dir = Internal::GetUnderlyingDirectory(*fWriter)) {
auto opts = ROOT::RNTupleWriteOptions();
opts.SetCompression(GetWriteOptions().GetCompression());
return std::make_unique<ROOT::Internal::RPageSinkFile>(newName, *dir, opts);
}
// TODO: support this method also for non-TFile-based writers
throw ROOT::RException(R__FAIL("cannot CreateNewWithNewRNTuple a non-TFile-based Sink."));
}

////////////////////////////////////////////////////////////////////////////////

ROOT::Internal::RPageSourceFile::RPageSourceFile(std::string_view ntupleName, const ROOT::RNTupleReadOptions &opts)
Expand Down Expand Up @@ -305,6 +317,15 @@ ROOT::Internal::RPageSourceFile::CreateFromAnchor(const RNTuple &anchor, const R
return pageSource;
}

std::unique_ptr<ROOT::Internal::RPageSourceFile>
ROOT::Internal::RPageSourceFile::OpenWithDifferentAnchor(const RNTuple &anchor, const ROOT::RNTupleReadOptions &options)
{
auto pageSource = std::make_unique<RPageSourceFile>("", fFile->Clone(), options);
pageSource->fAnchor = anchor;
// NOTE: fNTupleName gets set only upon Attach().
return pageSource;
}

ROOT::Internal::RPageSourceFile::~RPageSourceFile() = default;

void ROOT::Internal::RPageSourceFile::LoadStructureImpl()
Expand Down Expand Up @@ -509,18 +530,17 @@ ROOT::Internal::RPageSourceFile::PrepareSingleCluster(const RCluster::RKey &clus
std::vector<ROnDiskPageLocator> onDiskPages;
auto activeSize = 0;
auto pageZeroMap = std::make_unique<ROnDiskPageMap>();
PrepareLoadCluster(clusterKey, *pageZeroMap,
[&](ROOT::DescriptorId_t physicalColumnId, ROOT::NTupleSize_t pageNo,
const ROOT::RClusterDescriptor::RPageInfo &pageInfo) {
const auto &pageLocator = pageInfo.GetLocator();
if (pageLocator.GetType() == RNTupleLocator::kTypeUnknown)
throw RException(R__FAIL("tried to read a page with an unknown locator"));
const auto nBytes =
pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
activeSize += nBytes;
onDiskPages.push_back(
{physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
});
PrepareLoadCluster(
clusterKey, *pageZeroMap,
[&](ROOT::DescriptorId_t physicalColumnId, ROOT::NTupleSize_t pageNo,
const ROOT::RClusterDescriptor::RPageInfo &pageInfo) {
const auto &pageLocator = pageInfo.GetLocator();
if (pageLocator.GetType() == RNTupleLocator::kTypeUnknown)
throw RException(R__FAIL("tried to read a page with an unknown locator"));
const auto nBytes = pageLocator.GetNBytesOnStorage() + pageInfo.HasChecksum() * kNBytesPageChecksum;
activeSize += nBytes;
onDiskPages.push_back({physicalColumnId, pageNo, pageLocator.GetPosition<std::uint64_t>(), nBytes, 0});
});

// Linearize the page requests by file offset
std::sort(onDiskPages.begin(), onDiskPages.end(),
Expand Down
5 changes: 5 additions & 0 deletions tree/ntuple/test/ntuple_endian.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ class RPageSinkMock : public RPageSink {
void CommitStagedClusters(std::span<RStagedCluster>) final {}
void CommitClusterGroup() final {}
void CommitDatasetImpl() final {}
std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view) const final
{
R__ASSERT(false);
return nullptr;
}

public:
RPageSinkMock(const RColumnElementBase &elt) : RPageSink("test", ROOT::RNTupleWriteOptions()), fElement(elt)
Expand Down
64 changes: 64 additions & 0 deletions tree/ntuple/test/ntuple_storage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ class RPageSinkMock : public RPageSink {
void CommitStagedClusters(std::span<RStagedCluster>) final {}
void CommitClusterGroup() final {}
void CommitDatasetImpl() final {}
std::unique_ptr<RPageSink> CreateNewWithNewRNTuple(std::string_view) const final
{
R__ASSERT(false);
return nullptr;
}

public:
RPageSinkMock(const ROOT::RNTupleWriteOptions &options) : RPageSink("test", options) {}
Expand Down Expand Up @@ -1145,3 +1150,62 @@ TEST(RPageSourceFile, NameFromAnchor)
source->Attach();
EXPECT_EQ(source->GetNTupleName(), "ntpl");
}

TEST(RPageSourceFile, OpenDifferentAnchor)
{
FileRaii fileGuard("test_ntuple_open_diff_anchor.root");

auto model = RNTupleModel::Create();
auto pF = model->MakeField<float>("f");
auto file = std::unique_ptr<TFile>(TFile::Open(fileGuard.GetPath().c_str(), "RECREATE"));
{
auto writer = RNTupleWriter::Append(std::move(model), "ntpl1", *file);
for (auto i = 0; i < 100; ++i) {
*pF = i;
writer->Fill();
}
}
{
model = RNTupleModel::Create();
auto pI = model->MakeField<int>("i");
auto pC = model->MakeField<char>("c");

auto writer = RNTupleWriter::Append(std::move(model), "ntpl2", *file);
for (auto i = 0; i < 20; ++i) {
*pI = i;
*pC = i;
writer->Fill();
}
}

auto source = std::make_unique<RPageSourceFile>("ntpl1", fileGuard.GetPath(), RNTupleReadOptions());
source->Attach();
EXPECT_EQ(source->GetNEntries(), 100);
{
auto desc = source->GetSharedDescriptorGuard();
EXPECT_NE(desc->FindFieldId("f"), ROOT::kInvalidDescriptorId);
}

auto anchor2 = file->Get<ROOT::RNTuple>("ntpl2");
ASSERT_NE(anchor2, nullptr);
auto source2 = source->OpenWithDifferentAnchor(*anchor2);
source2->Attach();
EXPECT_EQ(source2->GetNTupleName(), "ntpl2");
EXPECT_EQ(source2->GetNEntries(), 20);
{
auto desc2 = source2->GetSharedDescriptorGuard();
EXPECT_EQ(desc2->FindFieldId("f"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("i"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("c"), ROOT::kInvalidDescriptorId);
}

source.reset();
// source2 should still be valid after dropping the first source.
EXPECT_EQ(source2->GetNEntries(), 20);
{
auto desc2 = source2->GetSharedDescriptorGuard();
EXPECT_EQ(desc2->FindFieldId("f"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("i"), ROOT::kInvalidDescriptorId);
EXPECT_NE(desc2->FindFieldId("c"), ROOT::kInvalidDescriptorId);
}
}
Loading