diff --git a/src/libstore/build-result.hh b/src/libstore/build-result.hh index cb6d19b8e90..ee185db6717 100644 --- a/src/libstore/build-result.hh +++ b/src/libstore/build-result.hh @@ -64,9 +64,6 @@ struct BuildResult non-determinism.) */ bool isNonDeterministic = false; - /* The derivation we built or the store path we substituted. */ - DerivedPath path; - /* For derivations, a mapping from the names of the wanted outputs to actual paths. */ DrvOutputs builtOutputs; @@ -86,4 +83,11 @@ struct BuildResult } }; +/* A build result together with it's "primary key" */ +struct KeyedBuildResult : BuildResult +{ + /* The derivation we built or the store path we substituted. */ + DerivedPath path; +}; + } diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index 3d1c4fbc135..a384f834fe4 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -9,7 +9,8 @@ #include "archive.hh" #include "json.hh" #include "compression.hh" -#include "worker-protocol.hh" +#include "common-protocol.hh" +#include "common-protocol-impl.hh" #include "topo-sort.hh" #include "callback.hh" #include "local-store.hh" // TODO remove, along with remaining downcasts @@ -309,11 +310,14 @@ void DerivationGoal::outputsSubstitutionTried() gaveUpOnSubstitution(); } - /* At least one of the output paths could not be produced using a substitute. So we have to build instead. */ void DerivationGoal::gaveUpOnSubstitution() { + /* Make sure checkPathValidity() from now on checks all + outputs. */ + wantedOutputs.clear(); + /* The inputs must be built before we can build this goal. */ if (useDerivation) for (auto & i : dynamic_cast(drv.get())->inputDrvs) @@ -523,8 +527,6 @@ void DerivationGoal::inputsRealised() build hook. */ state = &DerivationGoal::tryToBuild; worker.wakeUp(shared_from_this()); - - buildResult = BuildResult { .path = buildResult.path }; } void DerivationGoal::started() @@ -1068,9 +1070,11 @@ HookReply DerivationGoal::tryBuildHook() throw; } + common_proto::WriteConn conn { hook->sink }; + /* Tell the hook all the inputs that have to be copied to the remote system. */ - worker_proto::write(worker.store, hook->sink, inputPaths); + common_proto::write(worker.store, conn, inputPaths); /* Tell the hooks the missing outputs that have to be copied back from the remote system. */ @@ -1081,7 +1085,7 @@ HookReply DerivationGoal::tryBuildHook() if (buildMode != bmCheck && status.known && status.known->isValid()) continue; missingOutputs.insert(outputName); } - worker_proto::write(worker.store, hook->sink, missingOutputs); + common_proto::write(worker.store, conn, missingOutputs); } hook->sink = FdSink(); diff --git a/src/libstore/build/entry-points.cc b/src/libstore/build/entry-points.cc index bea7363dbfa..baf9a4664cf 100644 --- a/src/libstore/build/entry-points.cc +++ b/src/libstore/build/entry-points.cc @@ -47,7 +47,7 @@ void Store::buildPaths(const std::vector & reqs, BuildMode buildMod } } -std::vector Store::buildPathsWithResults( +std::vector Store::buildPathsWithResults( const std::vector & reqs, BuildMode buildMode, std::shared_ptr evalStore) @@ -55,23 +55,49 @@ std::vector Store::buildPathsWithResults( Worker worker(*this, evalStore ? *evalStore : *this); Goals goals; - for (const auto & br : reqs) { - std::visit(overloaded { - [&](const DerivedPath::Built & bfd) { - goals.insert(worker.makeDerivationGoal(bfd.drvPath, bfd.outputs, buildMode)); + std::vector> reqs2; + + for (const auto & req : reqs) { + auto g = std::visit(overloaded { + [&](const DerivedPath::Built & bfd) -> GoalPtr { + return worker.makeDerivationGoal(bfd.drvPath, bfd.outputs, buildMode); }, - [&](const DerivedPath::Opaque & bo) { - goals.insert(worker.makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair)); + [&](const DerivedPath::Opaque & bo) -> GoalPtr { + return worker.makePathSubstitutionGoal(bo.path, buildMode == bmRepair ? Repair : NoRepair); }, - }, br.raw()); + }, req.raw()); + goals.insert(g); + reqs2.push_back({g, req}); } worker.run(goals); - std::vector results; + std::vector results; - for (auto & i : goals) - results.push_back(i->buildResult); + for (auto & [gp, req] : reqs2) { + auto & br = results.emplace_back(KeyedBuildResult { + gp->buildResult, + .path = req + }); + + auto pbp = std::get_if(&req); + if (!pbp) continue; + auto & bp = *pbp; + + /* Because goals are in general shared between derived paths + that share the same derivation, we need to filter their + results to get back just the results we care about. + */ + + auto & bos = br.builtOutputs; + + for (auto it = bos.begin(); it != bos.end();) { + if (wantOutput(it->first.outputName, bp.outputs)) + ++it; + else + it = bos.erase(it); + } + } return results; } @@ -89,7 +115,6 @@ BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivat return BuildResult { .status = BuildResult::MiscFailure, .errorMsg = e.msg(), - .path = DerivedPath::Built { .drvPath = drvPath }, }; }; } diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 35121c5d9e7..dd40e623b0c 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -64,7 +64,6 @@ struct Goal : public std::enable_shared_from_this Goal(Worker & worker, DerivedPath path) : worker(worker) - , buildResult { .path = std::move(path) } { } virtual ~Goal() diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc index b176f318b2c..4c1eabf0120 100644 --- a/src/libstore/build/local-derivation-goal.cc +++ b/src/libstore/build/local-derivation-goal.cc @@ -11,7 +11,8 @@ #include "json.hh" #include "compression.hh" #include "daemon.hh" -#include "worker-protocol.hh" +#include "common-protocol.hh" +#include "common-protocol-impl.hh" #include "topo-sort.hh" #include "callback.hh" @@ -1266,7 +1267,7 @@ struct RestrictedStore : public virtual RestrictedStoreConfig, public virtual Lo result.rethrow(); } - std::vector buildPathsWithResults( + std::vector buildPathsWithResults( const std::vector & paths, BuildMode buildMode = bmNormal, std::shared_ptr evalStore = nullptr) override diff --git a/src/libstore/common-protocol-impl.hh b/src/libstore/common-protocol-impl.hh new file mode 100644 index 00000000000..813ba80d320 --- /dev/null +++ b/src/libstore/common-protocol-impl.hh @@ -0,0 +1,22 @@ +#pragma once + +#include "meta-protocol-templates.hh" + +namespace nix { +namespace common_proto { + +/* protocol-agnostic templates */ + +WRAP_META_PROTO(template, std::vector) +WRAP_META_PROTO(template, std::set) + +#define X_ template +#define Y_ std::map +WRAP_META_PROTO(X_, Y_) +#undef X_ +#undef Y_ + +/* protocol-specific templates */ + +} +} diff --git a/src/libstore/common-protocol.cc b/src/libstore/common-protocol.cc new file mode 100644 index 00000000000..db801a28ddf --- /dev/null +++ b/src/libstore/common-protocol.cc @@ -0,0 +1,139 @@ +#include "serialise.hh" +#include "util.hh" +#include "path-with-outputs.hh" +#include "store-api.hh" +#include "build-result.hh" +#include "common-protocol.hh" +#include "common-protocol-impl.hh" +#include "archive.hh" +#include "derivations.hh" + +#include + +namespace nix { +namespace common_proto { + +/* protocol-agnostic definitions */ + +std::string read(const Store & store, ReadConn conn, Phantom _) +{ + return readString(conn.from); +} + +void write(const Store & store, WriteConn conn, const std::string & str) +{ + conn.to << str; +} + + +StorePath read(const Store & store, ReadConn conn, Phantom _) +{ + return store.parseStorePath(readString(conn.from)); +} + +void write(const Store & store, WriteConn conn, const StorePath & storePath) +{ + conn.to << store.printStorePath(storePath); +} + + +ContentAddress read(const Store & store, ReadConn conn, Phantom _) +{ + return parseContentAddress(readString(conn.from)); +} + +void write(const Store & store, WriteConn conn, const ContentAddress & ca) +{ + conn.to << renderContentAddress(ca); +} + +DerivedPath read(const Store & store, ReadConn conn, Phantom _) +{ + auto s = readString(conn.from); + return DerivedPath::parse(store, s); +} + +void write(const Store & store, WriteConn conn, const DerivedPath & req) +{ + conn.to << req.to_string(store); +} + + +Realisation read(const Store & store, ReadConn conn, Phantom _) +{ + std::string rawInput = readString(conn.from); + return Realisation::fromJSON( + nlohmann::json::parse(rawInput), + "remote-protocol" + ); +} + +void write(const Store & store, WriteConn conn, const Realisation & realisation) +{ + conn.to << realisation.toJSON().dump(); +} + + +DrvOutput read(const Store & store, ReadConn conn, Phantom _) +{ + return DrvOutput::parse(readString(conn.from)); +} + +void write(const Store & store, WriteConn conn, const DrvOutput & drvOutput) +{ + conn.to << drvOutput.to_string(); +} + + +std::optional read(const Store & store, ReadConn conn, Phantom> _) +{ + auto s = readString(conn.from); + return s == "" ? std::optional {} : store.parseStorePath(s); +} + +void write(const Store & store, WriteConn conn, const std::optional & storePathOpt) +{ + conn.to << (storePathOpt ? store.printStorePath(*storePathOpt) : ""); +} + + +std::optional read(const Store & store, ReadConn conn, Phantom> _) +{ + return parseContentAddressOpt(readString(conn.from)); +} + +void write(const Store & store, WriteConn conn, const std::optional & caOpt) +{ + conn.to << (caOpt ? renderContentAddress(*caOpt) : ""); +} + +// Helpers for downstream + +BuildResult read0(const Store & store, ReadConn conn, Phantom _) +{ + BuildResult res; + res.status = (BuildResult::Status) readInt(conn.from); + conn.from + >> res.errorMsg + >> res.timesBuilt + >> res.isNonDeterministic + >> res.startTime + >> res.stopTime; + res.builtOutputs = read(store, conn, Phantom {}); + return res; +} + +void write0(const Store & store, WriteConn conn, const BuildResult & res) +{ + conn.to + << res.status + << res.errorMsg + << res.timesBuilt + << res.isNonDeterministic + << res.startTime + << res.stopTime; + write(store, conn, res.builtOutputs); +} + +} +} diff --git a/src/libstore/common-protocol.hh b/src/libstore/common-protocol.hh new file mode 100644 index 00000000000..d4d2f09f409 --- /dev/null +++ b/src/libstore/common-protocol.hh @@ -0,0 +1,73 @@ +#pragma once + +#include "serialise.hh" +#include "phantom.hh" + +namespace nix { + +class Store; +struct Source; + +// items being serialized +struct DerivedPath; +struct DrvOutput; +struct Realisation; +struct BuildResult; + + +namespace common_proto { +/* FIXME maybe move more stuff inside here */ + +struct ReadConn { + Source & from; +}; + +struct WriteConn { + Sink & to; +}; + +#define MAKE_PROTO(TEMPLATE, T) \ + TEMPLATE T read(const Store & store, ReadConn conn, Phantom< T > _); \ + TEMPLATE void write(const Store & store, WriteConn conn, const T & str) + +MAKE_PROTO(, std::string); +MAKE_PROTO(, StorePath); +MAKE_PROTO(, ContentAddress); +MAKE_PROTO(, DerivedPath); +MAKE_PROTO(, Realisation); +MAKE_PROTO(, DrvOutput); + +MAKE_PROTO(template, std::vector); +MAKE_PROTO(template, std::set); + +#define X_ template +#define Y_ std::map +MAKE_PROTO(X_, Y_); +#undef X_ +#undef Y_ + +/* These use the empty string for the null case, relying on the fact + that the underlying types never serialize to the empty string. + + We do this instead of a generic std::optional instance because + ordinal tags (0 or 1, here) are a bit of a compatability hazard. For + the same reason, we don't have a std::variant instances (ordinal + tags 0...n). + + We could the generic instances and then these as specializations for + compatability, but that's proven a bit finnicky, and also makes the + worker protocol harder to implement in other languages where such + specializations may not be allowed. + */ +MAKE_PROTO(, std::optional); +MAKE_PROTO(, std::optional); + +/* N suffix indicates this is the nth version of them in common. Wrapped downstream + by protocol-specific functions which handle the previous versions too. */ + +BuildResult read0(const Store & store, ReadConn conn, Phantom _); +void write0(const Store & store, WriteConn conn, const BuildResult & res); + +} + +} diff --git a/src/libstore/daemon.cc b/src/libstore/daemon.cc index de69b50eec7..a57e8a7510b 100644 --- a/src/libstore/daemon.cc +++ b/src/libstore/daemon.cc @@ -1,6 +1,7 @@ #include "daemon.hh" #include "monitor-fd.hh" #include "worker-protocol.hh" +#include "worker-protocol-impl.hh" #include "build-result.hh" #include "store-api.hh" #include "store-cast.hh" @@ -253,13 +254,13 @@ struct ClientSettings } }; -static std::vector readDerivedPaths(Store & store, unsigned int clientVersion, Source & from) +static std::vector readDerivedPaths(Store & store, unsigned int clientVersion, worker_proto::ReadConn conn) { std::vector reqs; if (GET_PROTOCOL_MINOR(clientVersion) >= 30) { - reqs = worker_proto::read(store, from, Phantom> {}); + reqs = worker_proto::read(store, conn, Phantom> {}); } else { - for (auto & s : readStrings(from)) + for (auto & s : readStrings(conn.from)) reqs.push_back(parsePathWithOutputs(store, s).toDerivedPath()); } return reqs; @@ -269,6 +270,15 @@ static void performOp(TunnelLogger * logger, ref store, TrustedFlag trusted, RecursiveFlag recursive, unsigned int clientVersion, Source & from, BufferedSink & to, unsigned int op) { + worker_proto::ReadConn rconn { + { .from = from }, + .version = clientVersion, + }; + worker_proto::WriteConn wconn { + { .to = to }, + .version = clientVersion, + }; + switch (op) { case wopIsValidPath: { @@ -281,7 +291,7 @@ static void performOp(TunnelLogger * logger, ref store, } case wopQueryValidPaths: { - auto paths = worker_proto::read(*store, from, Phantom {}); + auto paths = worker_proto::read(*store, rconn, Phantom {}); SubstituteFlag substitute = NoSubstitute; if (GET_PROTOCOL_MINOR(clientVersion) >= 27) { @@ -294,7 +304,7 @@ static void performOp(TunnelLogger * logger, ref store, } auto res = store->queryValidPaths(paths, substitute); logger->stopWork(); - worker_proto::write(*store, to, res); + worker_proto::write(*store, wconn, res); break; } @@ -310,11 +320,11 @@ static void performOp(TunnelLogger * logger, ref store, } case wopQuerySubstitutablePaths: { - auto paths = worker_proto::read(*store, from, Phantom {}); + auto paths = worker_proto::read(*store, rconn, Phantom {}); logger->startWork(); auto res = store->querySubstitutablePaths(paths); logger->stopWork(); - worker_proto::write(*store, to, res); + worker_proto::write(*store, wconn, res); break; } @@ -343,7 +353,7 @@ static void performOp(TunnelLogger * logger, ref store, paths = store->queryValidDerivers(path); else paths = store->queryDerivationOutputs(path); logger->stopWork(); - worker_proto::write(*store, to, paths); + worker_proto::write(*store, wconn, paths); break; } @@ -361,7 +371,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); auto outputs = store->queryPartialDerivationOutputMap(path); logger->stopWork(); - worker_proto::write(*store, to, outputs); + worker_proto::write(*store, wconn, outputs); break; } @@ -387,7 +397,7 @@ static void performOp(TunnelLogger * logger, ref store, if (GET_PROTOCOL_MINOR(clientVersion) >= 25) { auto name = readString(from); auto camStr = readString(from); - auto refs = worker_proto::read(*store, from, Phantom {}); + auto refs = worker_proto::read(*store, rconn, Phantom {}); bool repairBool; from >> repairBool; auto repair = RepairFlag{repairBool}; @@ -413,7 +423,7 @@ static void performOp(TunnelLogger * logger, ref store, }(); logger->stopWork(); - pathInfo->write(to, *store, GET_PROTOCOL_MINOR(clientVersion)); + worker_proto::write(*store, wconn, *pathInfo); } else { HashType hashAlgo; std::string baseName; @@ -485,7 +495,7 @@ static void performOp(TunnelLogger * logger, ref store, case wopAddTextToStore: { std::string suffix = readString(from); std::string s = readString(from); - auto refs = worker_proto::read(*store, from, Phantom {}); + auto refs = worker_proto::read(*store, rconn, Phantom {}); logger->startWork(); auto path = store->addTextToStore(suffix, s, refs, NoRepair); logger->stopWork(); @@ -517,7 +527,7 @@ static void performOp(TunnelLogger * logger, ref store, } case wopBuildPaths: { - auto drvs = readDerivedPaths(*store, clientVersion, from); + auto drvs = readDerivedPaths(*store, clientVersion, rconn); BuildMode mode = bmNormal; if (GET_PROTOCOL_MINOR(clientVersion) >= 15) { mode = (BuildMode) readInt(from); @@ -535,7 +545,7 @@ static void performOp(TunnelLogger * logger, ref store, } case wopBuildPathsWithResults: { - auto drvs = readDerivedPaths(*store, clientVersion, from); + auto drvs = readDerivedPaths(*store, clientVersion, rconn); BuildMode mode = bmNormal; mode = (BuildMode) readInt(from); @@ -548,7 +558,7 @@ static void performOp(TunnelLogger * logger, ref store, auto results = store->buildPathsWithResults(drvs, mode); logger->stopWork(); - worker_proto::write(*store, to, results); + worker_proto::write(*store, wconn, results); break; } @@ -617,13 +627,7 @@ static void performOp(TunnelLogger * logger, ref store, auto res = store->buildDerivation(drvPath, drv, buildMode); logger->stopWork(); - to << res.status << res.errorMsg; - if (GET_PROTOCOL_MINOR(clientVersion) >= 29) { - to << res.timesBuilt << res.isNonDeterministic << res.startTime << res.stopTime; - } - if (GET_PROTOCOL_MINOR(clientVersion) >= 28) { - worker_proto::write(*store, to, res.builtOutputs); - } + worker_proto::write(*store, wconn, res); break; } @@ -687,7 +691,7 @@ static void performOp(TunnelLogger * logger, ref store, case wopCollectGarbage: { GCOptions options; options.action = (GCOptions::GCAction) readInt(from); - options.pathsToDelete = worker_proto::read(*store, from, Phantom {}); + options.pathsToDelete = worker_proto::read(*store, rconn, Phantom {}); from >> options.ignoreLiveness >> options.maxFreed; // obsolete fields readInt(from); @@ -757,7 +761,7 @@ static void performOp(TunnelLogger * logger, ref store, else { to << 1 << (i->second.deriver ? store->printStorePath(*i->second.deriver) : ""); - worker_proto::write(*store, to, i->second.references); + worker_proto::write(*store, wconn, i->second.references); to << i->second.downloadSize << i->second.narSize; } @@ -768,11 +772,11 @@ static void performOp(TunnelLogger * logger, ref store, SubstitutablePathInfos infos; StorePathCAMap pathsMap = {}; if (GET_PROTOCOL_MINOR(clientVersion) < 22) { - auto paths = worker_proto::read(*store, from, Phantom {}); + auto paths = worker_proto::read(*store, rconn, Phantom {}); for (auto & path : paths) pathsMap.emplace(path, std::nullopt); } else - pathsMap = worker_proto::read(*store, from, Phantom {}); + pathsMap = worker_proto::read(*store, rconn, Phantom {}); logger->startWork(); store->querySubstitutablePathInfos(pathsMap, infos); logger->stopWork(); @@ -780,7 +784,7 @@ static void performOp(TunnelLogger * logger, ref store, for (auto & i : infos) { to << store->printStorePath(i.first) << (i.second.deriver ? store->printStorePath(*i.second.deriver) : ""); - worker_proto::write(*store, to, i.second.references); + worker_proto::write(*store, wconn, i.second.references); to << i.second.downloadSize << i.second.narSize; } break; @@ -790,7 +794,7 @@ static void performOp(TunnelLogger * logger, ref store, logger->startWork(); auto paths = store->queryAllValidPaths(); logger->stopWork(); - worker_proto::write(*store, to, paths); + worker_proto::write(*store, wconn, paths); break; } @@ -807,7 +811,7 @@ static void performOp(TunnelLogger * logger, ref store, if (info) { if (GET_PROTOCOL_MINOR(clientVersion) >= 17) to << 1; - info->write(to, *store, GET_PROTOCOL_MINOR(clientVersion), false); + worker_proto::write(*store, wconn, *info, false); } else { assert(GET_PROTOCOL_MINOR(clientVersion) >= 17); to << 0; @@ -862,7 +866,7 @@ static void performOp(TunnelLogger * logger, ref store, ValidPathInfo info { path, narHash }; if (deriver != "") info.deriver = store->parseStorePath(deriver); - info.references = worker_proto::read(*store, from, Phantom {}); + info.references = worker_proto::read(*store, rconn, Phantom {}); from >> info.registrationTime >> info.narSize >> info.ultimate; info.sigs = readStrings(from); info.ca = parseContentAddressOpt(readString(from)); @@ -907,15 +911,15 @@ static void performOp(TunnelLogger * logger, ref store, } case wopQueryMissing: { - auto targets = readDerivedPaths(*store, clientVersion, from); + auto targets = readDerivedPaths(*store, clientVersion, rconn); logger->startWork(); StorePathSet willBuild, willSubstitute, unknown; uint64_t downloadSize, narSize; store->queryMissing(targets, willBuild, willSubstitute, unknown, downloadSize, narSize); logger->stopWork(); - worker_proto::write(*store, to, willBuild); - worker_proto::write(*store, to, willSubstitute); - worker_proto::write(*store, to, unknown); + worker_proto::write(*store, wconn, willBuild); + worker_proto::write(*store, wconn, willSubstitute); + worker_proto::write(*store, wconn, unknown); to << downloadSize << narSize; break; } @@ -928,7 +932,7 @@ static void performOp(TunnelLogger * logger, ref store, store->registerDrvOutput(Realisation{ .id = outputId, .outPath = outputPath}); } else { - auto realisation = worker_proto::read(*store, from, Phantom()); + auto realisation = worker_proto::read(*store, rconn, Phantom()); store->registerDrvOutput(realisation); } logger->stopWork(); @@ -943,11 +947,11 @@ static void performOp(TunnelLogger * logger, ref store, if (GET_PROTOCOL_MINOR(clientVersion) < 31) { std::set outPaths; if (info) outPaths.insert(info->outPath); - worker_proto::write(*store, to, outPaths); + worker_proto::write(*store, wconn, outPaths); } else { std::set realisations; if (info) realisations.insert(*info); - worker_proto::write(*store, to, realisations); + worker_proto::write(*store, wconn, realisations); } break; } diff --git a/src/libstore/derivations.cc b/src/libstore/derivations.cc index 7fed803876a..f8304f86dd5 100644 --- a/src/libstore/derivations.cc +++ b/src/libstore/derivations.cc @@ -2,7 +2,8 @@ #include "store-api.hh" #include "globals.hh" #include "util.hh" -#include "worker-protocol.hh" +#include "common-protocol.hh" +#include "common-protocol-impl.hh" #include "fs-accessor.hh" #include @@ -657,7 +658,9 @@ Source & readDerivation(Source & in, const Store & store, BasicDerivation & drv, drv.outputs.emplace(std::move(name), std::move(output)); } - drv.inputSrcs = worker_proto::read(store, in, Phantom {}); + drv.inputSrcs = common_proto::read(store, + common_proto::ReadConn { .from = in }, + Phantom {}); in >> drv.platform >> drv.builder; drv.args = readStrings(in); @@ -700,7 +703,9 @@ void writeDerivation(Sink & out, const Store & store, const BasicDerivation & dr }, }, i.second.raw()); } - worker_proto::write(store, out, drv.inputSrcs); + common_proto::write(store, + common_proto::WriteConn { .to = out }, + drv.inputSrcs); out << drv.platform << drv.builder << drv.args; out << drv.env.size(); for (auto & i : drv.env) diff --git a/src/libstore/export-import.cc b/src/libstore/export-import.cc index 9875da90906..9984c839715 100644 --- a/src/libstore/export-import.cc +++ b/src/libstore/export-import.cc @@ -1,7 +1,8 @@ #include "serialise.hh" #include "store-api.hh" #include "archive.hh" -#include "worker-protocol.hh" +#include "common-protocol.hh" +#include "common-protocol-impl.hh" #include @@ -45,7 +46,9 @@ void Store::exportPath(const StorePath & path, Sink & sink) teeSink << exportMagic << printStorePath(path); - worker_proto::write(*this, teeSink, info->references); + common_proto::write(*this, + common_proto::WriteConn { .to = teeSink }, + info->references); teeSink << (info->deriver ? printStorePath(*info->deriver) : "") << 0; @@ -73,7 +76,9 @@ StorePaths Store::importPaths(Source & source, CheckSigsFlag checkSigs) //Activity act(*logger, lvlInfo, format("importing path '%s'") % info.path); - auto references = worker_proto::read(*this, source, Phantom {}); + auto references = common_proto::read(*this, + common_proto::ReadConn { .from = source }, + Phantom {}); auto deriver = readString(source); auto narHash = hashString(htSHA256, saved.s); diff --git a/src/libstore/legacy-ssh-store.cc b/src/libstore/legacy-ssh-store.cc index dd34b19c683..d0a894cd59b 100644 --- a/src/libstore/legacy-ssh-store.cc +++ b/src/libstore/legacy-ssh-store.cc @@ -2,10 +2,10 @@ #include "pool.hh" #include "remote-store.hh" #include "serve-protocol.hh" +#include "serve-protocol-impl.hh" #include "build-result.hh" #include "store-api.hh" #include "path-with-outputs.hh" -#include "worker-protocol.hh" #include "ssh.hh" #include "derivations.hh" #include "callback.hh" @@ -39,6 +39,24 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor FdSource from; int remoteVersion; bool good = true; + + operator serve_proto::ReadConn () + { + return serve_proto::ReadConn { + { + .from = from, + } + }; + } + + operator serve_proto::WriteConn () + { + return serve_proto::WriteConn { + { + .to = to, + } + }; + } }; std::string host; @@ -138,7 +156,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor auto deriver = readString(conn->from); if (deriver != "") info->deriver = parseStorePath(deriver); - info->references = worker_proto::read(*this, conn->from, Phantom {}); + info->references = serve_proto::read(*this, *conn, Phantom {}); readLongLong(conn->from); // download size info->narSize = readLongLong(conn->from); @@ -172,7 +190,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor << printStorePath(info.path) << (info.deriver ? printStorePath(*info.deriver) : "") << info.narHash.to_string(Base16, false); - worker_proto::write(*this, conn->to, info.references); + serve_proto::write(*this, *conn, info.references); conn->to << info.registrationTime << info.narSize @@ -201,7 +219,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor conn->to << exportMagic << printStorePath(info.path); - worker_proto::write(*this, conn->to, info.references); + serve_proto::write(*this, *conn, info.references); conn->to << (info.deriver ? printStorePath(*info.deriver) : "") << 0 @@ -279,16 +297,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor conn->to.flush(); - BuildResult status { .path = DerivedPath::Built { .drvPath = drvPath } }; - status.status = (BuildResult::Status) readInt(conn->from); - conn->from >> status.errorMsg; - - if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 3) - conn->from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime; - if (GET_PROTOCOL_MINOR(conn->remoteVersion) >= 6) { - status.builtOutputs = worker_proto::read(*this, conn->from, Phantom {}); - } - return status; + return serve_proto::read(*this, *conn, Phantom {}); } void buildPaths(const std::vector & drvPaths, BuildMode buildMode, std::shared_ptr evalStore) override @@ -317,7 +326,7 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor conn->to.flush(); - BuildResult result { .path = DerivedPath::Opaque { StorePath::dummy } }; + BuildResult result; result.status = (BuildResult::Status) readInt(conn->from); if (!result.success()) { @@ -343,10 +352,10 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor conn->to << cmdQueryClosure << includeOutputs; - worker_proto::write(*this, conn->to, paths); + serve_proto::write(*this, *conn, paths); conn->to.flush(); - for (auto & i : worker_proto::read(*this, conn->from, Phantom {})) + for (auto & i : serve_proto::read(*this, *conn, Phantom {})) out.insert(i); } @@ -359,10 +368,10 @@ struct LegacySSHStore : public virtual LegacySSHStoreConfig, public virtual Stor << cmdQueryValidPaths << false // lock << maybeSubstitute; - worker_proto::write(*this, conn->to, paths); + serve_proto::write(*this, *conn, paths); conn->to.flush(); - return worker_proto::read(*this, conn->from, Phantom {}); + return serve_proto::read(*this, *conn, Phantom {}); } void connect() override diff --git a/src/libstore/meta-protocol-templates.hh b/src/libstore/meta-protocol-templates.hh new file mode 100644 index 00000000000..58753d474b9 --- /dev/null +++ b/src/libstore/meta-protocol-templates.hh @@ -0,0 +1,89 @@ +#pragma once + +#include "types.hh" + +/* Shared code between worker and serv protocols, injected into proper header. + */ +namespace nix { + +struct Store; + +namespace meta_protocol { + +#define WRAP_META_PROTO(TEMPLATE, T) \ + TEMPLATE \ + T read(const Store & store, ReadConn conn, Phantom< T > p) { \ + return meta_protocol::read(store, conn, p); \ + } \ + \ + TEMPLATE \ + void write(const Store & store, WriteConn conn, const T & v) { \ + meta_protocol::write(store, conn, v); \ + } + +template +std::vector read(const Store & store, ReadConn conn, Phantom> _) +{ + std::vector resSet; + auto size = readNum(conn.from); + while (size--) { + resSet.push_back(read(store, conn, Phantom {})); + } + return resSet; +} + +template +void write(const Store & store, WriteConn conn, const std::vector & resSet) +{ + conn.to << resSet.size(); + for (auto & key : resSet) { + write(store, conn, key); + } +} + +template +std::set read(const Store & store, ReadConn conn, Phantom> _) +{ + std::set resSet; + auto size = readNum(conn.from); + while (size--) { + resSet.insert(read(store, conn, Phantom {})); + } + return resSet; +} + +template +void write(const Store & store, WriteConn conn, const std::set & resSet) +{ + conn.to << resSet.size(); + for (auto & key : resSet) { + write(store, conn, key); + } +} + +template +std::map read(const Store & store, ReadConn conn, Phantom> _) +{ + std::map resMap; + auto size = readNum(conn.from); + while (size--) { + auto k = read(store, conn, Phantom {}); + auto v = read(store, conn, Phantom {}); + resMap.insert_or_assign(std::move(k), std::move(v)); + } + return resMap; +} + +template +void write(const Store & store, WriteConn conn, const std::map & resMap) +{ + conn.to << resMap.size(); + for (auto & i : resMap) { + write(store, conn, i.first); + write(store, conn, i.second); + } +} + +} + +} diff --git a/src/libstore/path-info.cc b/src/libstore/path-info.cc deleted file mode 100644 index fda55b2b618..00000000000 --- a/src/libstore/path-info.cc +++ /dev/null @@ -1,46 +0,0 @@ -#include "path-info.hh" -#include "worker-protocol.hh" - -namespace nix { - -ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format) -{ - return read(source, store, format, store.parseStorePath(readString(source))); -} - -ValidPathInfo ValidPathInfo::read(Source & source, const Store & store, unsigned int format, StorePath && path) -{ - auto deriver = readString(source); - auto narHash = Hash::parseAny(readString(source), htSHA256); - ValidPathInfo info(path, narHash); - if (deriver != "") info.deriver = store.parseStorePath(deriver); - info.references = worker_proto::read(store, source, Phantom {}); - source >> info.registrationTime >> info.narSize; - if (format >= 16) { - source >> info.ultimate; - info.sigs = readStrings(source); - info.ca = parseContentAddressOpt(readString(source)); - } - return info; -} - -void ValidPathInfo::write( - Sink & sink, - const Store & store, - unsigned int format, - bool includePath) const -{ - if (includePath) - sink << store.printStorePath(path); - sink << (deriver ? store.printStorePath(*deriver) : "") - << narHash.to_string(Base16, false); - worker_proto::write(store, sink, references); - sink << registrationTime << narSize; - if (format >= 16) { - sink << ultimate - << sigs - << renderContentAddress(ca); - } -} - -} diff --git a/src/libstore/path-info.hh b/src/libstore/path-info.hh index b4b54e593c1..de87f8b33b0 100644 --- a/src/libstore/path-info.hh +++ b/src/libstore/path-info.hh @@ -105,11 +105,6 @@ struct ValidPathInfo ValidPathInfo(const StorePath & path, Hash narHash) : path(path), narHash(narHash) { }; virtual ~ValidPathInfo() { } - - static ValidPathInfo read(Source & source, const Store & store, unsigned int format); - static ValidPathInfo read(Source & source, const Store & store, unsigned int format, StorePath && path); - - void write(Sink & sink, const Store & store, unsigned int format, bool includePath = true) const; }; typedef std::map ValidPathInfos; diff --git a/src/libstore/phantom.hh b/src/libstore/phantom.hh new file mode 100644 index 00000000000..f1909d2faba --- /dev/null +++ b/src/libstore/phantom.hh @@ -0,0 +1,10 @@ +#pragma once + +namespace nix { + +/* To guide overloading for serializers. + * Shared between the old and new protocols. */ +template +struct Phantom {}; + +} diff --git a/src/libstore/remote-store-connection.hh b/src/libstore/remote-store-connection.hh new file mode 100644 index 00000000000..9c5e2a8c653 --- /dev/null +++ b/src/libstore/remote-store-connection.hh @@ -0,0 +1,41 @@ +#include "remote-store.hh" +#include "worker-protocol.hh" + +namespace nix { + +struct RemoteStore::Connection +{ + FdSink to; + FdSource from; + unsigned int daemonVersion; + std::optional daemonNixVersion; + std::chrono::time_point startTime; + + operator worker_proto::ReadConn () + { + return worker_proto::ReadConn { + { + .from = from, + }, + .version = daemonVersion, + }; + } + + operator worker_proto::WriteConn () + { + return worker_proto::WriteConn { + { + .to = to, + }, + .version = daemonVersion, + }; + } + + virtual ~Connection(); + + virtual void closeWrite() = 0; + + std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); +}; + +} diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 347e3209426..6abab775381 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -5,7 +5,9 @@ #include "remote-fs-accessor.hh" #include "build-result.hh" #include "remote-store.hh" +#include "remote-store-connection.hh" #include "worker-protocol.hh" +#include "worker-protocol-impl.hh" #include "archive.hh" #include "globals.hh" #include "derivations.hh" @@ -18,133 +20,6 @@ namespace nix { -namespace worker_proto { - -std::string read(const Store & store, Source & from, Phantom _) -{ - return readString(from); -} - -void write(const Store & store, Sink & out, const std::string & str) -{ - out << str; -} - - -StorePath read(const Store & store, Source & from, Phantom _) -{ - return store.parseStorePath(readString(from)); -} - -void write(const Store & store, Sink & out, const StorePath & storePath) -{ - out << store.printStorePath(storePath); -} - - -ContentAddress read(const Store & store, Source & from, Phantom _) -{ - return parseContentAddress(readString(from)); -} - -void write(const Store & store, Sink & out, const ContentAddress & ca) -{ - out << renderContentAddress(ca); -} - - -DerivedPath read(const Store & store, Source & from, Phantom _) -{ - auto s = readString(from); - return DerivedPath::parse(store, s); -} - -void write(const Store & store, Sink & out, const DerivedPath & req) -{ - out << req.to_string(store); -} - - -Realisation read(const Store & store, Source & from, Phantom _) -{ - std::string rawInput = readString(from); - return Realisation::fromJSON( - nlohmann::json::parse(rawInput), - "remote-protocol" - ); -} - -void write(const Store & store, Sink & out, const Realisation & realisation) -{ - out << realisation.toJSON().dump(); -} - - -DrvOutput read(const Store & store, Source & from, Phantom _) -{ - return DrvOutput::parse(readString(from)); -} - -void write(const Store & store, Sink & out, const DrvOutput & drvOutput) -{ - out << drvOutput.to_string(); -} - - -BuildResult read(const Store & store, Source & from, Phantom _) -{ - auto path = worker_proto::read(store, from, Phantom {}); - BuildResult res { .path = path }; - res.status = (BuildResult::Status) readInt(from); - from - >> res.errorMsg - >> res.timesBuilt - >> res.isNonDeterministic - >> res.startTime - >> res.stopTime; - res.builtOutputs = worker_proto::read(store, from, Phantom {}); - return res; -} - -void write(const Store & store, Sink & to, const BuildResult & res) -{ - worker_proto::write(store, to, res.path); - to - << res.status - << res.errorMsg - << res.timesBuilt - << res.isNonDeterministic - << res.startTime - << res.stopTime; - worker_proto::write(store, to, res.builtOutputs); -} - - -std::optional read(const Store & store, Source & from, Phantom> _) -{ - auto s = readString(from); - return s == "" ? std::optional {} : store.parseStorePath(s); -} - -void write(const Store & store, Sink & out, const std::optional & storePathOpt) -{ - out << (storePathOpt ? store.printStorePath(*storePathOpt) : ""); -} - - -std::optional read(const Store & store, Source & from, Phantom> _) -{ - return parseContentAddressOpt(readString(from)); -} - -void write(const Store & store, Sink & out, const std::optional & caOpt) -{ - out << (caOpt ? renderContentAddress(*caOpt) : ""); -} - -} - - /* TODO: Separate these store impls into different files, give them better names */ RemoteStore::RemoteStore(const Params & params) : RemoteStoreConfig(params) @@ -303,6 +178,7 @@ struct ConnectionHandle } RemoteStore::Connection * operator -> () { return &*handle; } + RemoteStore::Connection & operator * () { return *handle; } void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true) { @@ -346,12 +222,12 @@ StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, Substitute return res; } else { conn->to << wopQueryValidPaths; - worker_proto::write(*this, conn->to, paths); + worker_proto::write(*this, *conn, paths); if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 27) { conn->to << (settings.buildersUseSubstitutes ? 1 : 0); } conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom {}); + return worker_proto::read(*this, *conn, Phantom {}); } } @@ -361,7 +237,7 @@ StorePathSet RemoteStore::queryAllValidPaths() auto conn(getConnection()); conn->to << wopQueryAllValidPaths; conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom {}); + return worker_proto::read(*this, *conn, Phantom {}); } @@ -378,9 +254,9 @@ StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths) return res; } else { conn->to << wopQuerySubstitutablePaths; - worker_proto::write(*this, conn->to, paths); + worker_proto::write(*this, *conn, paths); conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom {}); + return worker_proto::read(*this, *conn, Phantom {}); } } @@ -402,7 +278,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto deriver = readString(conn->from); if (deriver != "") info.deriver = parseStorePath(deriver); - info.references = worker_proto::read(*this, conn->from, Phantom {}); + info.references = worker_proto::read(*this, *conn, Phantom {}); info.downloadSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from); infos.insert_or_assign(i.first, std::move(info)); @@ -415,9 +291,9 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S StorePathSet paths; for (auto & path : pathsMap) paths.insert(path.first); - worker_proto::write(*this, conn->to, paths); + worker_proto::write(*this, *conn, paths); } else - worker_proto::write(*this, conn->to, pathsMap); + worker_proto::write(*this, *conn, pathsMap); conn.processStderr(); size_t count = readNum(conn->from); for (size_t n = 0; n < count; n++) { @@ -425,7 +301,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S auto deriver = readString(conn->from); if (deriver != "") info.deriver = parseStorePath(deriver); - info.references = worker_proto::read(*this, conn->from, Phantom {}); + info.references = worker_proto::read(*this, *conn, Phantom {}); info.downloadSize = readLongLong(conn->from); info.narSize = readLongLong(conn->from); } @@ -455,7 +331,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path, if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path)); } info = std::make_shared( - ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion), StorePath{path})); + worker_proto::readValidPathInfo(*this, *conn, StorePath{path})); } callback(std::move(info)); } catch (...) { callback.rethrow(); } @@ -468,7 +344,7 @@ void RemoteStore::queryReferrers(const StorePath & path, auto conn(getConnection()); conn->to << wopQueryReferrers << printStorePath(path); conn.processStderr(); - for (auto & i : worker_proto::read(*this, conn->from, Phantom {})) + for (auto & i : worker_proto::read(*this, *conn, Phantom {})) referrers.insert(i); } @@ -478,7 +354,7 @@ StorePathSet RemoteStore::queryValidDerivers(const StorePath & path) auto conn(getConnection()); conn->to << wopQueryValidDerivers << printStorePath(path); conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom {}); + return worker_proto::read(*this, *conn, Phantom {}); } @@ -490,7 +366,7 @@ StorePathSet RemoteStore::queryDerivationOutputs(const StorePath & path) auto conn(getConnection()); conn->to << wopQueryDerivationOutputs << printStorePath(path); conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom {}); + return worker_proto::read(*this, *conn, Phantom {}); } @@ -500,7 +376,7 @@ std::map> RemoteStore::queryPartialDerivat auto conn(getConnection()); conn->to << wopQueryDerivationOutputMap << printStorePath(path); conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom>> {}); + return worker_proto::read(*this, *conn, Phantom>> {}); } else { // Fallback for old daemon versions. // For floating-CA derivations (and their co-dependencies) this is an @@ -545,7 +421,7 @@ ref RemoteStore::addCAToStore( << wopAddToStore << name << renderContentAddressMethod(caMethod); - worker_proto::write(*this, conn->to, references); + worker_proto::write(*this, *conn, references); conn->to << repair; // The dump source may invoke the store, so we need to make some room. @@ -558,7 +434,7 @@ ref RemoteStore::addCAToStore( } return make_ref( - ValidPathInfo::read(conn->from, *this, GET_PROTOCOL_MINOR(conn->daemonVersion))); + worker_proto::readValidPathInfo(*this, *conn)); } else { if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25"); @@ -567,7 +443,7 @@ ref RemoteStore::addCAToStore( [&](const TextHashMethod & thm) -> void { std::string s = dump.drain(); conn->to << wopAddTextToStore << name << s; - worker_proto::write(*this, conn->to, references); + worker_proto::write(*this, *conn, references); conn.processStderr(); }, [&](const FixedOutputHashMethod & fohm) -> void { @@ -635,7 +511,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, sink << exportMagic << printStorePath(info.path); - worker_proto::write(*this, sink, info.references); + worker_proto::write(*this, *conn, info.references); sink << (info.deriver ? printStorePath(*info.deriver) : "") << 0 // == no legacy signature @@ -645,7 +521,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, conn.processStderr(0, source2.get()); - auto importedPaths = worker_proto::read(*this, conn->from, Phantom {}); + auto importedPaths = worker_proto::read(*this, *conn, Phantom {}); assert(importedPaths.size() <= 1); } @@ -654,7 +530,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source, << printStorePath(info.path) << (info.deriver ? printStorePath(*info.deriver) : "") << info.narHash.to_string(Base16, false); - worker_proto::write(*this, conn->to, info.references); + worker_proto::write(*this, *conn, info.references); conn->to << info.registrationTime << info.narSize << info.ultimate << info.sigs << renderContentAddress(info.ca) << repair << !checkSigs; @@ -710,7 +586,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info) conn->to << info.id.to_string(); conn->to << std::string(info.outPath.to_string()); } else { - worker_proto::write(*this, conn->to, info); + worker_proto::write(*this, *conn, info); } conn.processStderr(); } @@ -734,13 +610,13 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, auto real = [&]() -> std::shared_ptr { if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) { auto outPaths = worker_proto::read( - *this, conn->from, Phantom> {}); + *this, *conn, Phantom> {}); if (outPaths.empty()) return nullptr; return std::make_shared(Realisation { .id = id, .outPath = *outPaths.begin() }); } else { auto realisations = worker_proto::read( - *this, conn->from, Phantom> {}); + *this, *conn, Phantom> {}); if (realisations.empty()) return nullptr; return std::make_shared(*realisations.begin()); @@ -752,10 +628,10 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id, } catch (...) { return callback.rethrow(); } } -static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector & reqs) +static void writeDerivedPaths(RemoteStore & store, RemoteStore::Connection & conn, const std::vector & reqs) { - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 30) { - worker_proto::write(store, conn->to, reqs); + if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 30) { + worker_proto::write(store, conn, reqs); } else { Strings ss; for (auto & p : reqs) { @@ -767,12 +643,12 @@ static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, cons [&](const StorePath & drvPath) { throw Error("trying to request '%s', but daemon protocol %d.%d is too old (< 1.29) to request a derivation file", store.printStorePath(drvPath), - GET_PROTOCOL_MAJOR(conn->daemonVersion), - GET_PROTOCOL_MINOR(conn->daemonVersion)); + GET_PROTOCOL_MAJOR(conn.daemonVersion), + GET_PROTOCOL_MINOR(conn.daemonVersion)); }, }, sOrDrvPath); } - conn->to << ss; + conn.to << ss; } } @@ -798,7 +674,7 @@ void RemoteStore::buildPaths(const std::vector & drvPaths, BuildMod auto conn(getConnection()); conn->to << wopBuildPaths; assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13); - writeDerivedPaths(*this, conn, drvPaths); + writeDerivedPaths(*this, *conn, drvPaths); if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15) conn->to << buildMode; else @@ -810,7 +686,7 @@ void RemoteStore::buildPaths(const std::vector & drvPaths, BuildMod readInt(conn->from); } -std::vector RemoteStore::buildPathsWithResults( +std::vector RemoteStore::buildPathsWithResults( const std::vector & paths, BuildMode buildMode, std::shared_ptr evalStore) @@ -822,10 +698,10 @@ std::vector RemoteStore::buildPathsWithResults( if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) { conn->to << wopBuildPathsWithResults; - writeDerivedPaths(*this, conn, paths); + writeDerivedPaths(*this, *conn, paths); conn->to << buildMode; conn.processStderr(); - return worker_proto::read(*this, conn->from, Phantom> {}); + return worker_proto::read(*this, *conn, Phantom> {}); } else { // Avoid deadlock. conn_.reset(); @@ -834,20 +710,24 @@ std::vector RemoteStore::buildPathsWithResults( // fails, but meh. buildPaths(paths, buildMode, evalStore); - std::vector results; + std::vector results; for (auto & path : paths) { std::visit( overloaded { [&](const DerivedPath::Opaque & bo) { - results.push_back(BuildResult { - .status = BuildResult::Substituted, + results.push_back(KeyedBuildResult { + { + .status = BuildResult::Substituted, + }, .path = bo, }); }, [&](const DerivedPath::Built & bfd) { - BuildResult res { - .status = BuildResult::Built, + KeyedBuildResult res { + { + .status = BuildResult::Built + }, .path = bfd, }; @@ -904,17 +784,7 @@ BuildResult RemoteStore::buildDerivation(const StorePath & drvPath, const BasicD writeDerivation(conn->to, *this, drv); conn->to << buildMode; conn.processStderr(); - BuildResult res { .path = DerivedPath::Built { .drvPath = drvPath } }; - res.status = (BuildResult::Status) readInt(conn->from); - conn->from >> res.errorMsg; - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 29) { - conn->from >> res.timesBuilt >> res.isNonDeterministic >> res.startTime >> res.stopTime; - } - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 28) { - auto builtOutputs = worker_proto::read(*this, conn->from, Phantom {}); - res.builtOutputs = builtOutputs; - } - return res; + return worker_proto::read(*this, *conn, Phantom{}); } @@ -967,7 +837,7 @@ void RemoteStore::collectGarbage(const GCOptions & options, GCResults & results) conn->to << wopCollectGarbage << options.action; - worker_proto::write(*this, conn->to, options.pathsToDelete); + worker_proto::write(*this, *conn, options.pathsToDelete); conn->to << options.ignoreLiveness << options.maxFreed /* removed options */ @@ -1024,11 +894,11 @@ void RemoteStore::queryMissing(const std::vector & targets, // to prevent a deadlock. goto fallback; conn->to << wopQueryMissing; - writeDerivedPaths(*this, conn, targets); + writeDerivedPaths(*this, *conn, targets); conn.processStderr(); - willBuild = worker_proto::read(*this, conn->from, Phantom {}); - willSubstitute = worker_proto::read(*this, conn->from, Phantom {}); - unknown = worker_proto::read(*this, conn->from, Phantom {}); + willBuild = worker_proto::read(*this, *conn, Phantom {}); + willSubstitute = worker_proto::read(*this, *conn, Phantom {}); + unknown = worker_proto::read(*this, *conn, Phantom {}); conn->from >> downloadSize >> narSize; return; } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index 8493be6fc54..e1171ebf92e 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -101,7 +101,7 @@ public: void buildPaths(const std::vector & paths, BuildMode buildMode, std::shared_ptr evalStore) override; - std::vector buildPathsWithResults( + std::vector buildPathsWithResults( const std::vector & paths, BuildMode buildMode, std::shared_ptr evalStore) override; @@ -139,20 +139,7 @@ public: void flushBadConnections(); - struct Connection - { - FdSink to; - FdSource from; - unsigned int daemonVersion; - std::optional daemonNixVersion; - std::chrono::time_point startTime; - - virtual ~Connection(); - - virtual void closeWrite() = 0; - - std::exception_ptr processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); - }; + struct Connection; ref openConnectionWrapper(); diff --git a/src/libstore/serve-protocol-impl.hh b/src/libstore/serve-protocol-impl.hh new file mode 100644 index 00000000000..1dc7906fe80 --- /dev/null +++ b/src/libstore/serve-protocol-impl.hh @@ -0,0 +1,24 @@ +#pragma once + +#include "meta-protocol-templates.hh" + +namespace nix { +namespace serve_proto { + +using namespace common_proto; + +/* protocol-agnostic templates */ + +WRAP_META_PROTO(template, std::vector) +WRAP_META_PROTO(template, std::set) + +#define X_ template +#define Y_ std::map +WRAP_META_PROTO(X_, Y_) +#undef X_ +#undef Y_ + +/* protocol-specific templates */ + +} +} diff --git a/src/libstore/serve-protocol.cc b/src/libstore/serve-protocol.cc new file mode 100644 index 00000000000..93c266450c1 --- /dev/null +++ b/src/libstore/serve-protocol.cc @@ -0,0 +1,45 @@ +#include "serialise.hh" +#include "util.hh" +#include "path-with-outputs.hh" +#include "store-api.hh" +#include "build-result.hh" +#include "serve-protocol.hh" +#include "serve-protocol-impl.hh" +#include "archive.hh" +#include "path-info.hh" + +#include + +namespace nix { +namespace serve_proto { + +/* protocol-specific definitions */ + +BuildResult read(const Store & store, ReadConn conn, Phantom _) +{ + if (GET_PROTOCOL_MINOR(conn.version) < 6) { + BuildResult status; + status.status = (BuildResult::Status) readInt(conn.from); + conn.from >> status.errorMsg; + + if (GET_PROTOCOL_MINOR(conn.version) >= 3) + conn.from >> status.timesBuilt >> status.isNonDeterministic >> status.startTime >> status.stopTime; + return status; + } else + return serve_proto::read0(store, conn, Phantom {}); +} + +void write(const Store & store, WriteConn conn, const BuildResult & status) +{ + if (GET_PROTOCOL_MINOR(conn.version < 6)) { + conn.to << status.status << status.errorMsg; + + if (GET_PROTOCOL_MINOR(conn.version) >= 3) + conn.to << status.timesBuilt << status.isNonDeterministic << status.startTime << status.stopTime; + } else { + serve_proto::write0(store, conn, status); + } +} + +} +} diff --git a/src/libstore/serve-protocol.hh b/src/libstore/serve-protocol.hh index 3f76baa82ea..88989917f20 100644 --- a/src/libstore/serve-protocol.hh +++ b/src/libstore/serve-protocol.hh @@ -1,5 +1,7 @@ #pragma once +#include "common-protocol.hh" + namespace nix { #define SERVE_MAGIC_1 0x390c9deb @@ -9,6 +11,7 @@ namespace nix { #define GET_PROTOCOL_MAJOR(x) ((x) & 0xff00) #define GET_PROTOCOL_MINOR(x) ((x) & 0x00ff) + typedef enum { cmdQueryValidPaths = 1, cmdQueryPathInfos = 2, @@ -21,4 +24,40 @@ typedef enum { cmdAddToStoreNar = 9, } ServeCommand; + +class Store; +struct Source; + +// items being serialized +struct BuildResult; + + +namespace serve_proto { + +using common_proto::read; +using common_proto::write; + +/* FIXME maybe move more stuff inside here */ + +struct ReadConn : common_proto::ReadConn { + unsigned int version; +}; + +struct WriteConn : common_proto::WriteConn { + unsigned int version; +}; + +MAKE_PROTO(, BuildResult); + +MAKE_PROTO(template, std::vector); +MAKE_PROTO(template, std::set); + +#define X_ template +#define Y_ std::map +MAKE_PROTO(X_, Y_); +#undef X_ +#undef Y_ + +} + } diff --git a/src/libstore/ssh-store.cc b/src/libstore/ssh-store.cc index 62daa838ce6..4bbdb1f722b 100644 --- a/src/libstore/ssh-store.cc +++ b/src/libstore/ssh-store.cc @@ -1,5 +1,6 @@ #include "store-api.hh" #include "remote-store.hh" +#include "remote-store-connection.hh" #include "remote-fs-accessor.hh" #include "archive.hh" #include "worker-protocol.hh" diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 59937be4d2d..48fb0ba2e69 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -11,6 +11,8 @@ #include "archive.hh" #include "callback.hh" #include "remote-store.hh" +// FIXME this should not be here +#include "worker-protocol.hh" #include @@ -266,7 +268,11 @@ void Store::addMultipleToStore( { auto expected = readNum(source); for (uint64_t i = 0; i < expected; ++i) { - auto info = ValidPathInfo::read(source, *this, 16); + auto info = worker_proto::readValidPathInfo(*this, + worker_proto::ReadConn { + { .from = source }, + .version = 16, + }); info.ultimate = false; addToStore(info, source, repair, checkSigs); } @@ -978,6 +984,7 @@ std::map copyPaths( return pathsMap; } + std::map copyPaths( Store & srcStore, Store & dstStore, @@ -1013,7 +1020,12 @@ std::map copyPaths( PushActivity pact(act.id); auto info = srcStore.queryPathInfo(storePath); - info->write(sink, srcStore, 16); + worker_proto::write(srcStore, + worker_proto::WriteConn { + { .to = sink }, + .version = 16, + }, + *info); srcStore.narFromPath(storePath, sink); } }); diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index 0c8a4db5667..d497e433788 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -81,6 +81,7 @@ const uint32_t exportMagic = 0x4558494e; enum BuildMode { bmNormal, bmRepair, bmCheck }; struct BuildResult; +struct KeyedBuildResult; struct StoreConfig : public Config @@ -439,7 +440,7 @@ public: a build/substitution error, this function won't throw an exception, but return a `BuildResult` containing an error message. */ - virtual std::vector buildPathsWithResults( + virtual std::vector buildPathsWithResults( const std::vector & paths, BuildMode buildMode = bmNormal, std::shared_ptr evalStore = nullptr); diff --git a/src/libstore/uds-remote-store.hh b/src/libstore/uds-remote-store.hh index f8dfcca704a..32c3e917e1f 100644 --- a/src/libstore/uds-remote-store.hh +++ b/src/libstore/uds-remote-store.hh @@ -1,6 +1,7 @@ #pragma once #include "remote-store.hh" +#include "remote-store-connection.hh" #include "local-fs-store.hh" namespace nix { diff --git a/src/libstore/worker-protocol-impl.hh b/src/libstore/worker-protocol-impl.hh new file mode 100644 index 00000000000..25b336ffbe3 --- /dev/null +++ b/src/libstore/worker-protocol-impl.hh @@ -0,0 +1,22 @@ +#pragma once + +#include "meta-protocol-templates.hh" + +namespace nix { +namespace worker_proto { + +/* protocol-agnostic templates */ + +WRAP_META_PROTO(template, std::vector) +WRAP_META_PROTO(template, std::set) + +#define X_ template +#define Y_ std::map +WRAP_META_PROTO(X_, Y_) +#undef X_ +#undef Y_ + +/* protocol-specific templates */ + +} +} diff --git a/src/libstore/worker-protocol.cc b/src/libstore/worker-protocol.cc new file mode 100644 index 00000000000..8f08713f735 --- /dev/null +++ b/src/libstore/worker-protocol.cc @@ -0,0 +1,105 @@ +#include "serialise.hh" +#include "util.hh" +#include "path-with-outputs.hh" +#include "store-api.hh" +#include "build-result.hh" +#include "worker-protocol.hh" +#include "worker-protocol-impl.hh" +#include "archive.hh" +#include "path-info.hh" + +#include + +namespace nix { +namespace worker_proto { + +/* protocol-specific definitions */ + +KeyedBuildResult read(const Store & store, ReadConn conn, Phantom _) +{ + auto path = worker_proto::read(store, conn, Phantom {}); + auto br = worker_proto::read(store, conn, Phantom {}); + return KeyedBuildResult { + std::move(br), + .path = std::move(path), + }; +} + +void write(const Store & store, WriteConn conn, const KeyedBuildResult & res) +{ + worker_proto::write(store, conn, res.path); + worker_proto::write(store, conn, static_cast(res)); +} + +BuildResult read(const Store & store, ReadConn conn, Phantom _) +{ + if (GET_PROTOCOL_MINOR(conn.version) < 29) { + BuildResult res; + res.status = (BuildResult::Status) readInt(conn.from); + conn.from >> res.errorMsg; + if (GET_PROTOCOL_MINOR(conn.version) >= 28) { + auto builtOutputs = read(store, conn, Phantom {}); + res.builtOutputs = builtOutputs; + } + return res; + } else + return read0(store, conn, Phantom{}); +} + +void write(const Store & store, WriteConn conn, const BuildResult & res) +{ + if (GET_PROTOCOL_MINOR(conn.version) < 29) { + conn.to << res.status << res.errorMsg; + if (GET_PROTOCOL_MINOR(conn.version) >= 28) { + write(store, conn, res.builtOutputs); + } + } else + write0(store, conn, res); +} + + +ValidPathInfo readValidPathInfo(const Store & store, ReadConn conn) +{ + auto path = read(store, conn, Phantom{}); + return readValidPathInfo(store, conn, std::move(path)); +} + +ValidPathInfo readValidPathInfo(const Store & store, ReadConn conn, StorePath && path) +{ + auto deriver = readString(conn.from); + auto narHash = Hash::parseAny(readString(conn.from), htSHA256); + ValidPathInfo info(path, narHash); + if (deriver != "") info.deriver = store.parseStorePath(deriver); + info.references = read(store, conn, Phantom {}); + conn.from >> info.registrationTime >> info.narSize; + if (GET_PROTOCOL_MINOR(conn.version) >= 16) { + conn.from >> info.ultimate; + info.sigs = readStrings(conn.from); + info.ca = parseContentAddressOpt(readString(conn.from)); + } + return info; +} + +void write( + const Store & store, + WriteConn conn, + const ValidPathInfo & pathInfo, + bool includePath) +{ + if (includePath) + conn.to << store.printStorePath(pathInfo.path); + conn.to + << (pathInfo.deriver ? store.printStorePath(*pathInfo.deriver) : "") + << pathInfo.narHash.to_string(Base16, false); + write(store, conn, pathInfo.references); + conn.to << pathInfo.registrationTime << pathInfo.narSize; + if (GET_PROTOCOL_MINOR(conn.version) >= 16) { + conn.to + << pathInfo.ultimate + << pathInfo.sigs + << renderContentAddress(pathInfo.ca); + } +} + +} +} diff --git a/src/libstore/worker-protocol.hh b/src/libstore/worker-protocol.hh index 87088a3ac10..6007643c7a2 100644 --- a/src/libstore/worker-protocol.hh +++ b/src/libstore/worker-protocol.hh @@ -1,7 +1,6 @@ #pragma once -#include "store-api.hh" -#include "serialise.hh" +#include "common-protocol.hh" namespace nix { @@ -74,113 +73,45 @@ typedef enum { class Store; struct Source; -/* To guide overloading */ -template -struct Phantom {}; +// items being serialized +struct BuildResult; +struct KeyedBuildResult; +struct PathInfo; namespace worker_proto { + +using common_proto::read; +using common_proto::write; + /* FIXME maybe move more stuff inside here */ -#define MAKE_WORKER_PROTO(TEMPLATE, T) \ - TEMPLATE T read(const Store & store, Source & from, Phantom< T > _); \ - TEMPLATE void write(const Store & store, Sink & out, const T & str) +struct ReadConn : common_proto::ReadConn { + unsigned int version; +}; + +struct WriteConn : common_proto::WriteConn { + unsigned int version; +}; -MAKE_WORKER_PROTO(, std::string); -MAKE_WORKER_PROTO(, StorePath); -MAKE_WORKER_PROTO(, ContentAddress); -MAKE_WORKER_PROTO(, DerivedPath); -MAKE_WORKER_PROTO(, Realisation); -MAKE_WORKER_PROTO(, DrvOutput); -MAKE_WORKER_PROTO(, BuildResult); +MAKE_PROTO(, BuildResult); +MAKE_PROTO(, KeyedBuildResult); -MAKE_WORKER_PROTO(template, std::vector); -MAKE_WORKER_PROTO(template, std::set); +MAKE_PROTO(template, std::vector); +MAKE_PROTO(template, std::set); #define X_ template #define Y_ std::map -MAKE_WORKER_PROTO(X_, Y_); +MAKE_PROTO(X_, Y_); #undef X_ #undef Y_ -/* These use the empty string for the null case, relying on the fact - that the underlying types never serialize to the empty string. - - We do this instead of a generic std::optional instance because - ordinal tags (0 or 1, here) are a bit of a compatability hazard. For - the same reason, we don't have a std::variant instances (ordinal - tags 0...n). - - We could the generic instances and then these as specializations for - compatability, but that's proven a bit finnicky, and also makes the - worker protocol harder to implement in other languages where such - specializations may not be allowed. - */ -MAKE_WORKER_PROTO(, std::optional); -MAKE_WORKER_PROTO(, std::optional); - -template -std::vector read(const Store & store, Source & from, Phantom> _) -{ - std::vector resSet; - auto size = readNum(from); - while (size--) { - resSet.push_back(read(store, from, Phantom {})); - } - return resSet; -} - -template -void write(const Store & store, Sink & out, const std::vector & resSet) -{ - out << resSet.size(); - for (auto & key : resSet) { - write(store, out, key); - } -} - -template -std::set read(const Store & store, Source & from, Phantom> _) -{ - std::set resSet; - auto size = readNum(from); - while (size--) { - resSet.insert(read(store, from, Phantom {})); - } - return resSet; -} - -template -void write(const Store & store, Sink & out, const std::set & resSet) -{ - out << resSet.size(); - for (auto & key : resSet) { - write(store, out, key); - } -} +/* These are a non-standard form for historical reasons. */ -template -std::map read(const Store & store, Source & from, Phantom> _) -{ - std::map resMap; - auto size = readNum(from); - while (size--) { - auto k = read(store, from, Phantom {}); - auto v = read(store, from, Phantom {}); - resMap.insert_or_assign(std::move(k), std::move(v)); - } - return resMap; -} +ValidPathInfo readValidPathInfo(const Store & store, ReadConn conn); +ValidPathInfo readValidPathInfo(const Store & store, ReadConn conn, StorePath && path); -template -void write(const Store & store, Sink & out, const std::map & resMap) -{ - out << resMap.size(); - for (auto & i : resMap) { - write(store, out, i.first); - write(store, out, i.second); - } -} +void write(const Store & store, WriteConn conn, const ValidPathInfo & pathInfo, bool includePath = true); } diff --git a/src/nix-store/nix-store.cc b/src/nix-store/nix-store.cc index 153b8413765..f42345a8945 100644 --- a/src/nix-store/nix-store.cc +++ b/src/nix-store/nix-store.cc @@ -9,9 +9,9 @@ #include "local-store.hh" #include "monitor-fd.hh" #include "serve-protocol.hh" +#include "serve-protocol-impl.hh" #include "shared.hh" #include "util.hh" -#include "worker-protocol.hh" #include "graphml.hh" #include "legacy.hh" #include "path-with-outputs.hh" @@ -797,6 +797,9 @@ static void opServe(Strings opFlags, Strings opArgs) out.flush(); unsigned int clientVersion = readInt(in); + serve_proto::ReadConn rconn { { .from = in } }; + serve_proto::WriteConn wconn { { .to = out } }; + auto getBuildSettings = [&]() { // FIXME: changing options here doesn't work if we're // building through the daemon. @@ -831,7 +834,7 @@ static void opServe(Strings opFlags, Strings opArgs) case cmdQueryValidPaths: { bool lock = readInt(in); bool substitute = readInt(in); - auto paths = worker_proto::read(*store, in, Phantom {}); + auto paths = serve_proto::read(*store, rconn, Phantom {}); if (lock && writeAllowed) for (auto & path : paths) store->addTempRoot(path); @@ -840,19 +843,19 @@ static void opServe(Strings opFlags, Strings opArgs) store->substitutePaths(paths); } - worker_proto::write(*store, out, store->queryValidPaths(paths)); + serve_proto::write(*store, wconn, store->queryValidPaths(paths)); break; } case cmdQueryPathInfos: { - auto paths = worker_proto::read(*store, in, Phantom {}); + auto paths = serve_proto::read(*store, rconn, Phantom {}); // !!! Maybe we want a queryPathInfos? for (auto & i : paths) { try { auto info = store->queryPathInfo(i); out << store->printStorePath(info->path) << (info->deriver ? store->printStorePath(*info->deriver) : ""); - worker_proto::write(*store, out, info->references); + serve_proto::write(*store, wconn, info->references); // !!! Maybe we want compression? out << info->narSize // downloadSize << info->narSize; @@ -880,7 +883,7 @@ static void opServe(Strings opFlags, Strings opArgs) case cmdExportPaths: { readInt(in); // obsolete - store->exportPaths(worker_proto::read(*store, in, Phantom {}), out); + store->exportPaths(serve_proto::read(*store, rconn, Phantom {}), out); break; } @@ -918,14 +921,7 @@ static void opServe(Strings opFlags, Strings opArgs) MonitorFdHup monitor(in.fd); auto status = store->buildDerivation(drvPath, drv); - out << status.status << status.errorMsg; - - if (GET_PROTOCOL_MINOR(clientVersion) >= 3) - out << status.timesBuilt << status.isNonDeterministic << status.startTime << status.stopTime; - if (GET_PROTOCOL_MINOR(clientVersion >= 6)) { - worker_proto::write(*store, out, status.builtOutputs); - } - + serve_proto::write(*store, wconn, status); break; } @@ -933,9 +929,9 @@ static void opServe(Strings opFlags, Strings opArgs) case cmdQueryClosure: { bool includeOutputs = readInt(in); StorePathSet closure; - store->computeFSClosure(worker_proto::read(*store, in, Phantom {}), + store->computeFSClosure(serve_proto::read(*store, rconn, Phantom {}), closure, false, includeOutputs); - worker_proto::write(*store, out, closure); + serve_proto::write(*store, wconn, closure); break; } @@ -950,7 +946,7 @@ static void opServe(Strings opFlags, Strings opArgs) }; if (deriver != "") info.deriver = store->parseStorePath(deriver); - info.references = worker_proto::read(*store, in, Phantom {}); + info.references = serve_proto::read(*store, rconn, Phantom {}); in >> info.registrationTime >> info.narSize >> info.ultimate; info.sigs = readStrings(in); info.ca = parseContentAddressOpt(readString(in)); diff --git a/src/nix/daemon.cc b/src/nix/daemon.cc index 940923d3b56..ab6e7b79344 100644 --- a/src/nix/daemon.cc +++ b/src/nix/daemon.cc @@ -2,6 +2,7 @@ #include "shared.hh" #include "local-store.hh" #include "remote-store.hh" +#include "remote-store-connection.hh" #include "util.hh" #include "serialise.hh" #include "archive.hh"