diff --git a/flake.lock b/flake.lock index ec9a00e63..7d1bcbce2 100644 --- a/flake.lock +++ b/flake.lock @@ -5,6 +5,7 @@ "flake-compat": [], "flake-parts": [], "git-hooks-nix": [], + "nixfmt": [], "nixpkgs": [ "nixpkgs" ], @@ -12,16 +13,16 @@ "nixpkgs-regression": [] }, "locked": { - "lastModified": 1739393420, - "narHash": "sha256-jGFuyYKJjJZsBRoi7ZcaVKt1OYxusz/ld1HA7VD2w/0=", + "lastModified": 1739571938, + "narHash": "sha256-NlaLAed/xei6RWpU2HIIbDjILRC4l1NIfGeyrn7ALQs=", "owner": "NixOS", "repo": "nix", - "rev": "970942f45836172fda410a638853382952189eb9", + "rev": "ffc649d2eabdd3e678b5bcc211dd59fd06debf3e", "type": "github" }, "original": { "owner": "NixOS", - "ref": "2.26-maintenance", + "ref": "ssh-ng-extensions-for-hydra", "repo": "nix", "type": "github" } @@ -29,15 +30,16 @@ "nix-eval-jobs": { "flake": false, "locked": { - "lastModified": 1739500569, - "narHash": "sha256-3wIReAqdTALv39gkWXLMZQvHyBOc3yPkWT2ZsItxedY=", - "owner": "nix-community", + "lastModified": 1739499741, + "narHash": "sha256-dNMJY6+G3PwE8lIAhwetPJdA2DxCEKRXPY/EtHmdDh4=", + "owner": "Ericson2314", "repo": "nix-eval-jobs", - "rev": "4b392b284877d203ae262e16af269f702df036bc", + "rev": "de345eb4518d952c2d86261b270f2c31edecd3de", "type": "github" }, "original": { - "owner": "nix-community", + "owner": "Ericson2314", + "ref": "nix-2.27", "repo": "nix-eval-jobs", "type": "github" } diff --git a/flake.nix b/flake.nix index dc3aaf5cd..244d3353e 100644 --- a/flake.nix +++ b/flake.nix @@ -4,7 +4,7 @@ inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11-small"; inputs.nix = { - url = "github:NixOS/nix/2.26-maintenance"; + url = "github:NixOS/nix/ssh-ng-extensions-for-hydra"; inputs.nixpkgs.follows = "nixpkgs"; # hide nix dev tooling from our lock file @@ -13,10 +13,11 @@ inputs.nixpkgs-regression.follows = ""; inputs.nixpkgs-23-11.follows = ""; inputs.flake-compat.follows = ""; + inputs.nixfmt.follows = ""; }; inputs.nix-eval-jobs = { - url = "github:nix-community/nix-eval-jobs"; + url = "github:Ericson2314/nix-eval-jobs/nix-2.27"; # We want to control the deps precisely flake = false; }; diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 77bde2c45..b99bb5507 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -7,15 +7,12 @@ #include "build-result.hh" #include "path.hh" -#include "legacy-ssh-store.hh" +#include "ssh-store.hh" #include "serve-protocol.hh" -#include "serve-protocol-impl.hh" #include "state.hh" #include "current-process.hh" #include "processes.hh" #include "util.hh" -#include "serve-protocol.hh" -#include "serve-protocol-impl.hh" #include "ssh.hh" #include "finally.hh" #include "url.hh" @@ -39,108 +36,6 @@ bool ::Machine::isLocalhost() const namespace nix::build_remote { -static std::unique_ptr openConnection( - ::Machine::ptr machine, SSHMaster & master) -{ - Strings command = {"nix-store", "--serve", "--write"}; - if (machine->isLocalhost()) { - command.push_back("--builders"); - command.push_back(""); - } else { - auto remoteStore = machine->storeUri.params.find("remote-store"); - if (remoteStore != machine->storeUri.params.end()) { - command.push_back("--store"); - command.push_back(shellEscape(remoteStore->second)); - } - } - - auto ret = master.startCommand(std::move(command), { - "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" - }); - - // XXX: determine the actual max value we can use from /proc. - - // FIXME: Should this be upstreamed into `startCommand` in Nix? - - int pipesize = 1024 * 1024; - - fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize); - fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize); - - return ret; -} - - -static void copyClosureTo( - ::Machine::Connection & conn, - Store & destStore, - const StorePathSet & paths, - SubstituteFlag useSubstitutes = NoSubstitute) -{ - StorePathSet closure; - destStore.computeFSClosure(paths, closure); - - /* Send the "query valid paths" command with the "lock" option - enabled. This prevents a race where the remote host - garbage-collect paths that are already there. Optionally, ask - the remote host to substitute missing paths. */ - // FIXME: substitute output pollutes our build log - /* Get back the set of paths that are already valid on the remote - host. */ - auto present = conn.queryValidPaths( - destStore, true, closure, useSubstitutes); - - if (present.size() == closure.size()) return; - - auto sorted = destStore.topoSortPaths(closure); - - StorePathSet missing; - for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) - if (!present.count(*i)) missing.insert(*i); - - printMsg(lvlDebug, "sending %d missing paths", missing.size()); - - std::unique_lock sendLock(conn.machine->state->sendLock, - std::chrono::seconds(600)); - - conn.to << ServeProto::Command::ImportPaths; - destStore.exportPaths(missing, conn.to); - conn.to.flush(); - - if (readInt(conn.from) != 1) - throw Error("remote machine failed to import closure"); -} - - -// FIXME: use Store::topoSortPaths(). -static StorePaths reverseTopoSortPaths(const std::map & paths) -{ - StorePaths sorted; - StorePathSet visited; - - std::function dfsVisit; - - dfsVisit = [&](const StorePath & path) { - if (!visited.insert(path).second) return; - - auto info = paths.find(path); - auto references = info == paths.end() ? StorePathSet() : info->second.references; - - for (auto & i : references) - /* Don't traverse into paths that don't exist. That can - happen due to substitutes for non-existent paths. */ - if (i != path && paths.count(i)) - dfsVisit(i); - - sorted.push_back(path); - }; - - for (auto & i : paths) - dfsVisit(i.first); - - return sorted; -} - static std::pair openLogFile(const std::string & logDir, const StorePath & drvPath) { std::string base(drvPath.to_string()); @@ -203,13 +98,13 @@ static BasicDerivation sendInputs( auto now1 = std::chrono::steady_clock::now(); /* Copy the input closure. */ - if (conn.machine->isLocalhost()) { - StorePathSet closure; - destStore.computeFSClosure(basicDrv.inputSrcs, closure); - copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); - } else { - copyClosureTo(conn, destStore, basicDrv.inputSrcs, Substitute); - } + copyClosure( + destStore, + conn.machine->isLocalhost() ? localStore : *conn.store, + basicDrv.inputSrcs, + NoRepair, + NoCheckSigs, + Substitute); auto now2 = std::chrono::steady_clock::now(); @@ -224,11 +119,10 @@ static BuildResult performBuild( Store & localStore, StorePath drvPath, const BasicDerivation & drv, - const ServeProto::BuildOptions & options, counter & nrStepsBuilding ) { - conn.putBuildDerivationRequest(localStore, drvPath, drv, options); + auto kont = conn.store->buildDerivationAsync(drvPath, drv, bmNormal); BuildResult result; @@ -237,7 +131,10 @@ static BuildResult performBuild( startTime = time(0); { MaintainCount mc(nrStepsBuilding); - result = ServeProto::Serialise::read(localStore, conn); + result = kont(); + // Without proper call-once functions, we need to manually + // delete after calling. + kont = {}; } stopTime = time(0); @@ -253,7 +150,7 @@ static BuildResult performBuild( // If the protocol was too old to give us `builtOutputs`, initialize // it manually by introspecting the derivation. - if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6) + if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6) { // If the remote is too old to handle CA derivations, we can’t get this // far anyways @@ -278,55 +175,6 @@ static BuildResult performBuild( return result; } -static void copyPathFromRemote( - ::Machine::Connection & conn, - NarMemberDatas & narMembers, - Store & localStore, - Store & destStore, - const ValidPathInfo & info -) -{ - /* Receive the NAR from the remote and add it to the - destination store. Meanwhile, extract all the info from the - NAR that getBuildOutput() needs. */ - auto source2 = sinkToSource([&](Sink & sink) - { - /* Note: we should only send the command to dump the store - path to the remote if the NAR is actually going to get read - by the destination store, which won't happen if this path - is already valid on the destination store. Since this - lambda function only gets executed if someone tries to read - from source2, we will send the command from here rather - than outside the lambda. */ - conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); - conn.to.flush(); - - TeeSource tee(conn.from, sink); - extractNarData(tee, localStore.printStorePath(info.path), narMembers); - }); - - destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); -} - -static void copyPathsFromRemote( - ::Machine::Connection & conn, - NarMemberDatas & narMembers, - Store & localStore, - Store & destStore, - const std::map & infos -) -{ - auto pathsSorted = reverseTopoSortPaths(infos); - - for (auto & path : pathsSorted) { - auto & info = infos.find(path)->second; - copyPathFromRemote( - conn, narMembers, localStore, destStore, - ValidPathInfo { path, info }); - } - -} - } /* using namespace nix::build_remote; */ @@ -389,7 +237,6 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) void State::buildRemote(ref destStore, ::Machine::ptr machine, Step::ptr step, - const ServeProto::BuildOptions & buildOptions, RemoteResult & result, std::shared_ptr activeStep, std::function updateStep, NarMemberDatas & narMembers) @@ -404,35 +251,43 @@ void State::buildRemote(ref destStore, updateStep(ssConnecting); - auto storeRef = machine->completeStoreReference(); - - auto * pSpecified = std::get_if(&storeRef.variant); - if (!pSpecified || pSpecified->scheme != "ssh") { - throw Error("Currently, only (legacy-)ssh stores are supported!"); - } - - LegacySSHStoreConfig storeConfig { - pSpecified->scheme, - pSpecified->authority, - storeRef.params - }; - - auto master = storeConfig.createSSHMaster( - false, // no SSH master yet - logFD.get()); - // FIXME: rewrite to use Store. - auto child = build_remote::openConnection(machine, master); + ::Machine::Connection conn { + .machine = machine, + .store = [&]{ + auto * pSpecified = std::get_if(&machine->storeUri.variant); + if (!pSpecified || pSpecified->scheme != "ssh-ng") { + throw Error("Currently, only ssh-ng:// stores are supported!"); + } + + auto remoteStore = machine->openStore().dynamic_pointer_cast(); + auto remoteStoreConfig = std::dynamic_pointer_cast(remoteStore); + assert(remoteStore); + + if (machine->isLocalhost()) { + auto rp_new = remoteStoreConfig->remoteProgram.get(); + rp_new.push_back("--builders"); + rp_new.push_back(""); + const_cast &>(remoteStoreConfig->remoteProgram).assign(rp_new); + } + remoteStoreConfig->extraSshArgs = { + "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" + }; + + // TODO logging + //const_cast &>(remoteStore->logFD).assign(logFD.get()); + + return nix::ref{remoteStore}; + }(), + }; { auto activeStepState(activeStep->state_.lock()); if (activeStepState->cancelled) throw Error("step cancelled"); - activeStepState->pid = child->sshPid; } Finally clearPid([&]() { auto activeStepState(activeStep->state_.lock()); - activeStepState->pid = -1; /* FIXME: there is a slight race here with step cancellation in State::processQueueChange(), which @@ -442,35 +297,13 @@ void State::buildRemote(ref destStore, process. Meh. */ }); - ::Machine::Connection conn { - { - .to = child->in.get(), - .from = child->out.get(), - /* Handshake. */ - .remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize - }, - /*.machine =*/ machine, - }; - Finally updateStats([&]() { - bytesReceived += conn.from.read; - bytesSent += conn.to.written; + // TODO + //auto stats = conn.store->getConnectionStats(); + //bytesReceived += stats.bytesReceived; + //bytesSent += stats.bytesSent; }); - constexpr ServeProto::Version our_version = 0x206; - - try { - conn.remoteVersion = decltype(conn)::handshake( - conn.to, - conn.from, - our_version, - machine->storeUri.render()); - } catch (EndOfFile & e) { - child->sshPid.wait(); - std::string s = chomp(readFile(result.logFile)); - throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s); - } - { auto info(machine->state->connectInfo.lock()); info->consecutiveFailures = 0; @@ -508,7 +341,6 @@ void State::buildRemote(ref destStore, *localStore, step->drvPath, resolvedDrv, - buildOptions, nrStepsBuilding ); @@ -539,21 +371,12 @@ void State::buildRemote(ref destStore, auto now1 = std::chrono::steady_clock::now(); - auto infos = conn.queryPathInfos(*localStore, outputs); - - size_t totalNarSize = 0; - for (auto & [_, info] : infos) totalNarSize += info.narSize; - - if (totalNarSize > maxOutputSize) { - result.stepStatus = bsNarSizeLimitExceeded; - return; - } - /* Copy each path. */ - printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)", - localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize); + printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’", + localStore->printStorePath(step->drvPath), machine->storeUri.render()); + + copyClosure(*conn.store, *destStore, outputs); - build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); auto now2 = std::chrono::steady_clock::now(); result.overhead += std::chrono::duration_cast(now2 - now1).count(); @@ -574,9 +397,11 @@ void State::buildRemote(ref destStore, } } - /* Shut down the connection. */ - child->in = -1; - child->sshPid.wait(); + /* Shut down the connection done by RAII. + + Only difference is kill() instead of wait() (i.e. send signal + then wait()) + */ } catch (Error & e) { /* Disable this machine until a certain period of time has diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 4bc00f0cf..22b91420c 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -98,13 +98,6 @@ State::StepResult State::doBuildStep(nix::ref destStore, it). */ BuildID buildId; std::optional buildDrvPath; - // Other fields set below - nix::ServeProto::BuildOptions buildOptions { - .maxLogSize = maxLogSize, - .nrRepeats = step->isDeterministic ? 1u : 0u, - .enforceDeterminism = step->isDeterministic, - .keepFailed = false, - }; auto conn(dbPool.get()); @@ -139,18 +132,19 @@ State::StepResult State::doBuildStep(nix::ref destStore, { auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName)); if (i != jobsetRepeats.end()) - buildOptions.nrRepeats = std::max(buildOptions.nrRepeats, i->second); + warn("jobset repeats is deprecated; nix stopped supporting this correctly a long time ago."); } } if (!build) build = *dependents.begin(); buildId = build->id; buildDrvPath = build->drvPath; - buildOptions.maxSilentTime = build->maxSilentTime; - buildOptions.buildTimeout = build->buildTimeout; + settings.maxLogSize = maxLogSize; + settings.maxSilentTime = build->maxSilentTime; + settings.buildTimeout = build->buildTimeout; printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)", - localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->storeUri.render(), buildId, (dependents.size() - 1)); + localStore->printStorePath(step->drvPath), 1, machine->storeUri.render(), buildId, (dependents.size() - 1)); } if (!buildOneDone) @@ -211,7 +205,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, try { /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers); + buildRemote(destStore, machine, step, result, activeStep, updateStep, narMembers); } catch (Error & e) { if (activeStep->state_.lock()->cancelled) { printInfo("marking step %d of build %d as cancelled", stepNr, buildId); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 99411f9fb..82ff3b3ba 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -182,7 +182,7 @@ void State::monitorMachinesFile() getEnv("NIX_REMOTE_SYSTEMS").value_or(pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":"); if (machinesFiles.empty()) { - parseMachines("localhost " + + parseMachines("ssh-ng://localhost " + (settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem.get()) + " - " + std::to_string(settings.maxBuildJobs) + " 1 " + concatStringsSep(",", StoreConfig::getDefaultSystemFeatures())); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 30e01c746..8c5958a71 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -20,9 +20,7 @@ #include "store-api.hh" #include "sync.hh" #include "nar-extractor.hh" -#include "serve-protocol.hh" -#include "serve-protocol-impl.hh" -#include "serve-protocol-connection.hh" +#include "ssh-store.hh" #include "machines.hh" @@ -292,14 +290,16 @@ struct Machine : nix::Machine bool isLocalhost() const; // A connection to a machine - struct Connection : nix::ServeProto::BasicClientConnection { + struct Connection { // Backpointer to the machine ptr machine; + // Opened store + nix::ref store; }; }; -class HydraConfig; +struct HydraConfig; class State @@ -542,7 +542,6 @@ private: void buildRemote(nix::ref destStore, Machine::ptr machine, Step::ptr step, - const nix::ServeProto::BuildOptions & buildOptions, RemoteResult & result, std::shared_ptr activeStep, std::function updateStep, NarMemberDatas & narMembers);