diff --git a/.gitignore b/.gitignore index a58d15de63..811161d7d3 100644 --- a/.gitignore +++ b/.gitignore @@ -79,3 +79,6 @@ docs/src/deployment/observability/metrics_reference.rst /.vagrant/ /vagrant-nix-cache/ lnav*work + +# SV ACS onboarding snapshots written at runtime (default acsSnapshotDir) +/acs-snapshots/ diff --git a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala index 926b03bbc9..2675813e11 100644 --- a/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala +++ b/apps/app/src/main/scala/org/lfdecentralizedtrust/splice/console/SvAppReference.scala @@ -115,14 +115,14 @@ abstract class SvAppReference( httpCommand(HttpSvPublicAppClient.CometBftJsonRpcRequest(id, method, params)) } - def onboardSvPartyMigrationAuthorize( + def onboardSvPartyMigrationInitiate( participantId: ParticipantId, candidateParty: PartyId, - ): HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorizeResponse = + ): Unit = consoleEnvironment .run { httpCommand( - HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorize( + HttpSvPublicAppClient.OnboardSvPartyMigrationInitiate( participantId, candidateParty, ) diff --git a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/SvDsoPartyManagementIntegrationTest.scala b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/SvDsoPartyManagementIntegrationTest.scala index 55c1f98767..eef14d21cd 100644 --- a/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/SvDsoPartyManagementIntegrationTest.scala +++ b/apps/app/src/test/scala/org/lfdecentralizedtrust/splice/integration/tests/SvDsoPartyManagementIntegrationTest.scala @@ -65,7 +65,7 @@ class SvDsoPartyManagementIntegrationTest extends SvIntegrationTestBase with Wal ) { val randomParty = allocateRandomSvParty("random") assertThrowsAndLogsCommandFailures( - sv1Backend.onboardSvPartyMigrationAuthorize( + sv1Backend.onboardSvPartyMigrationInitiate( sv3Backend.participantClient.id, randomParty, ), diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ChunkWritingObserver.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ChunkWritingObserver.scala new file mode 100644 index 0000000000..050c6e1b01 --- /dev/null +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ChunkWritingObserver.scala @@ -0,0 +1,33 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.environment + +import com.digitalasset.canton.discard.Implicits.DiscardOps +import com.google.protobuf.ByteString + +import java.io.OutputStream +import scala.concurrent.{Future, Promise} + +/** A [[io.grpc.stub.StreamObserver]] that writes each streamed chunk to the given + * [[java.io.OutputStream]] instead of accumulating the whole stream in memory. This keeps the + * memory footprint bounded regardless of how large the streamed payload (e.g. an ACS snapshot) + * is. + * + * The caller is responsible for opening the [[OutputStream]] and for closing it once + * [[resultFuture]] has completed (in both the success and failure cases). + */ +class ChunkWritingObserver[T](out: OutputStream, chunkOf: T => ByteString) + extends io.grpc.stub.StreamObserver[T] { + private val promise: Promise[Unit] = Promise[Unit]() + + def resultFuture: Future[Unit] = promise.future + + override def onNext(value: T): Unit = + try chunkOf(value).writeTo(out) + catch { case t: Throwable => promise.tryFailure(t).discard } + + override def onError(t: Throwable): Unit = promise.tryFailure(t).discard + + override def onCompleted(): Unit = promise.trySuccess(()).discard +} diff --git a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ParticipantAdminConnection.scala b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ParticipantAdminConnection.scala index b6f95f5193..3fe1383dce 100644 --- a/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ParticipantAdminConnection.scala +++ b/apps/common/src/main/scala/org/lfdecentralizedtrust/splice/environment/ParticipantAdminConnection.scala @@ -66,6 +66,8 @@ import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.{ TopologySnapshot, } +import java.io.BufferedOutputStream +import java.nio.file.{Files, Path} import java.time.Instant import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} import scala.jdk.CollectionConverters.* @@ -204,6 +206,51 @@ class ParticipantAdminConnection( } yield ByteString.copyFrom(chunks.map(_.chunk).asJava) } + /** Like [[exportPartyAcs]] but streams the snapshot chunks straight to `outputFile` instead of + * accumulating them in memory, so the full ACS is never held on the heap. The caller is + * responsible for the lifecycle of `outputFile` (e.g. writing to a temporary file and + * atomically renaming it once this future completes successfully). + */ + def exportPartyAcsToFile( + party: PartyId, + synchronizerId: SynchronizerId, + targetParticipantId: ParticipantId, + activationTime: Instant, + outputFile: Path, + )(implicit + traceContext: TraceContext + ): Future[Unit] = { + val out = new BufferedOutputStream(Files.newOutputStream(outputFile)) + val observer = new ChunkWritingObserver[ExportPartyAcsResponse](out, _.chunk) + + val result = for { + // See exportPartyAcs for why we resolve the offset and subtract one. + activationOffset <- resolveOffset(Left(activationTime), synchronizerId, force = true) + beforeActivationOffset = activationOffset - 1L + _ = logger.info( + s"Exporting ACS snapshot for party $party from domain $synchronizerId at offset $beforeActivationOffset to file $outputFile" + ) + _ <- runCmd( + ParticipantAdminCommands.PartyManagement.ExportPartyAcs( + party, + synchronizerId, + targetParticipantId, + beforeActivationOffset, + waitForActivationTimeout = None, // i.e., default + observer, + ) + ) + // The export streams asynchronously; this future completes once the last chunk has been + // written to `out`. + _ <- observer.resultFuture + } yield () + // Close the stream (flushing the buffer) in both the success and failure cases. + result.transform { res => + out.close() + res + } + } + def downloadAcsSnapshotNonChunked( parties: Set[PartyId], filterSynchronizerId: SynchronizerId, @@ -267,6 +314,32 @@ class ParticipantAdminConnection( ) } + /** Like [[importPartyAcs]] but reads the snapshot from a file on disk, streaming it into the + * participant so the full ACS is never held on the heap. A fresh [[java.io.InputStream]] is + * opened on every (retried) attempt. + */ + def importPartyAcsFromFile(acsFile: Path, synchronizerId: SynchronizerId, partyId: PartyId)( + implicit tc: TraceContext + ): Future[Unit] = { + retryProvider.retryForClientCalls( + "import_party_acs", + "Imports the acs in the participant", + runCmd( + ParticipantAdminCommands.PartyManagement + .ImportPartyAcs( + Files.newInputStream(acsFile), + synchronizerId, + IMPORT_ACS_WORKFLOW_ID_PREFIX, + contractImportMode = ContractImportMode.Validation, + representativePackageIdOverride = RepresentativePackageIdOverride.NoOverride, + party = Some(partyId), + ), + timeoutOverride = Some(GrpcAdminCommand.DefaultUnboundedTimeout), + ).map(_ => ()), + logger, + ) + } + def getParticipantId()(implicit traceContext: TraceContext): Future[ParticipantId] = getId().map(ParticipantId(_)) diff --git a/apps/sv/src/main/openapi/sv-internal.yaml b/apps/sv/src/main/openapi/sv-internal.yaml index 4884c71255..e8823d283e 100644 --- a/apps/sv/src/main/openapi/sv-internal.yaml +++ b/apps/sv/src/main/openapi/sv-internal.yaml @@ -429,24 +429,29 @@ paths: $ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/400" "500": $ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/500" - /v0/onboard/sv/party-migration/authorize: + # Step 1 of the resumable SV party migration. Authorizes hosting the DSO party on the + # candidate participant (synchronous, surfaces proposal/accepted-state errors) and then + # asynchronously exports the DSO party ACS snapshot to a file on the sponsor. Returns 202 + # immediately; the candidate then downloads the snapshot via the resumable, range-enabled + # GET /v0/onboard/sv/party-migration/snapshot/{candidate_party_id} endpoint, which is defined + # in sv-stream-server.yaml (server-only generation, since guardrail cannot generate a client + # for a binary stream response). + /v0/onboard/sv/party-migration/initiate: post: tags: [sv] x-jvm-package: sv_public - operationId: "onboardSvPartyMigrationAuthorize" + operationId: "onboardSvPartyMigrationInitiate" requestBody: required: true content: application/json: schema: - "$ref": "#/components/schemas/OnboardSvPartyMigrationAuthorizeRequest" + "$ref": "#/components/schemas/OnboardSvPartyMigrationInitiateRequest" responses: - "200": - description: ok - content: - application/json: - schema: - "$ref": "#/components/schemas/OnboardSvPartyMigrationAuthorizeResponse" + "202": + description: | + Snapshot generation has been accepted and is running (or already complete). + Poll the snapshot download endpoint until it returns the snapshot. "400": description: system state not yet valid content: @@ -987,7 +992,7 @@ components: completed: "#/components/schemas/SvOnboardingStateCompleted" unknown: "#/components/schemas/SvOnboardingStateUnknown" - OnboardSvPartyMigrationAuthorizeRequest: + OnboardSvPartyMigrationInitiateRequest: type: object required: - candidate_party_id @@ -995,14 +1000,6 @@ components: candidate_party_id: type: string - OnboardSvPartyMigrationAuthorizeResponse: - type: object - required: - - acs_snapshot - properties: - acs_snapshot: - type: string - OnboardSvPartyMigrationAuthorizeErrorResponse: oneOf: - "$ref": "#/components/schemas/AcceptedStateNotFoundErrorResponse" diff --git a/apps/sv/src/main/openapi/sv-stream-server.yaml b/apps/sv/src/main/openapi/sv-stream-server.yaml new file mode 100644 index 0000000000..b373df2dd0 --- /dev/null +++ b/apps/sv/src/main/openapi/sv-stream-server.yaml @@ -0,0 +1,48 @@ +openapi: 3.0.0 +info: + title: SV Streaming API + version: 0.0.1 +tags: + - name: sv + description: | + SV operator endpoints that stream binary payloads. +servers: + - url: https://example.com/api/sv +paths: + + # Note for developers working on this openAPI spec: we generate only server-side code from this + # file, not client-side, because guardrail cannot decode a binary stream response as JSON. + # The client lives in SvConnection.downloadDsoPartyAcsSnapshot and must be updated manually to + # reflect any changes here. + + /v0/onboard/sv/party-migration/snapshot/{candidate_party_id}: + get: + tags: [sv] + x-jvm-package: svStream + operationId: "onboardSvPartyMigrationSnapshot" + description: | + Step 2 of the resumable SV party migration: download the DSO party ACS snapshot prepared by + the POST /v0/onboard/sv/party-migration/initiate endpoint. The response is served from a + file with HTTP range support (withRangeSupport in SvApp), so a Range request yields a 206 + Partial Content response and a failed download can resume from a byte offset. + parameters: + - name: "candidate_party_id" + in: "path" + required: true + schema: + type: string + responses: + "200": + description: the ACS snapshot + content: + application/octet-stream: + schema: + type: string + format: binary + x-scala-type: org.apache.pekko.http.scaladsl.model.ResponseEntity + "404": + description: no snapshot for this candidate; call the initiate endpoint first + "409": + description: the snapshot is still being generated; retry later + "500": + description: the snapshot generation failed; re-initiate diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala index 10c66d71d3..6167a18cfa 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/SvApp.scala @@ -51,6 +51,7 @@ import org.lfdecentralizedtrust.splice.http.{HttpClient, HttpRateLimiter} import org.lfdecentralizedtrust.splice.http.v0.sv_admin.SvAdminResource import org.lfdecentralizedtrust.splice.http.v0.sv_operator.SvOperatorResource import org.lfdecentralizedtrust.splice.http.v0.sv_public.SvPublicResource +import org.lfdecentralizedtrust.splice.http.v0.svStream.SvStreamResource import org.lfdecentralizedtrust.splice.setup.ParticipantInitializer import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority @@ -59,6 +60,7 @@ import org.lfdecentralizedtrust.splice.sv.admin.http.{ HttpSvAdminHandler, HttpSvOperatorHandler, HttpSvPublicHandler, + HttpSvStreamHandler, } import org.lfdecentralizedtrust.splice.sv.automation.{ DsoDelegateBasedAutomationService, @@ -463,6 +465,17 @@ class SvApp( throw new IllegalStateException("No initial round specified in user's metadata") } + dsoPartyMigration = new DsoPartyMigration( + svAutomation, + dsoAutomation, + participantAdminConnection, + ledgerClient, + retryProvider, + dsoPartyHosting, + config.acsSnapshotDir, + loggerFactory, + ) + publicHandler = new HttpSvPublicHandler( config.ledgerApiUser, svAutomation, @@ -473,19 +486,13 @@ class SvApp( participantAdminConnection, synchronizerNodeService, retryProvider, - new DsoPartyMigration( - svAutomation, - dsoAutomation, - participantAdminConnection, - ledgerClient, - retryProvider, - dsoPartyHosting, - loggerFactory, - ), + dsoPartyMigration, loggerFactory, initialRound, ) + streamHandler = new HttpSvStreamHandler(dsoPartyMigration) + operatorHandler = new HttpSvOperatorHandler( svAutomation, dsoAutomation, @@ -547,6 +554,17 @@ class SvApp( errorHandler.directive(traceContext) { concat( + // The binary ACS-snapshot download endpoint. withRangeSupport turns the known-length + // file entity returned by the handler into resumable 206 Partial Content responses + // when a Range header is present. + SvStreamResource.routes( + streamHandler, + operation => + withRangeSupport.tflatMap(_ => + buildOperation("svStream", operation) + .tflatMap(_ => provide(traceContext)) + ), + ), SvPublicResource.routes( publicHandler, operation => diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/SvConnection.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/SvConnection.scala index 1b676055c2..d144de2df9 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/SvConnection.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/SvConnection.scala @@ -3,17 +3,27 @@ package org.lfdecentralizedtrust.splice.sv.admin.api.client +import com.digitalasset.canton.discard.Implicits.DiscardOps import com.digitalasset.canton.logging.NamedLoggerFactory import com.digitalasset.canton.topology.{ParticipantId, PartyId, SequencerId} import com.digitalasset.canton.tracing.TraceContext import com.google.protobuf.ByteString +import org.apache.pekko.http.scaladsl.model.headers.{ + ByteRange, + Range as RangeHeader, + `Content-Range`, +} +import org.apache.pekko.http.scaladsl.model.{ContentRange, HttpMethods, HttpRequest, StatusCodes} +import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.FileIO import org.lfdecentralizedtrust.splice.config.{NetworkAppClientConfig, UpgradesConfig} import org.lfdecentralizedtrust.splice.environment.{HttpAppConnection, RetryProvider} import org.lfdecentralizedtrust.splice.http.HttpClient import org.lfdecentralizedtrust.splice.sv.admin.api.client.commands.HttpSvPublicAppClient import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder +import java.nio.file.{Files, OpenOption, Path, StandardOpenOption} import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} final class SvConnection private ( @@ -39,9 +49,11 @@ final class SvConnection private ( ): Future[Unit] = runHttpCmd(config.url, HttpSvPublicAppClient.StartSvOnboarding(token)) - /** Ask the sponsoring SV to authorize hosting the DSO party at the candidate participant and to prepare the ACS snapshot. + /** Step 1 of the resumable party migration: ask the sponsoring SV to authorize hosting the DSO + * party at the candidate participant and to start exporting the ACS snapshot to a file. The + * snapshot is fetched separately via [[downloadDsoPartyAcsSnapshot]]. */ - def authorizeDsoPartyHosting( + def initiateDsoPartyMigration( candidateParticipantId: ParticipantId, candidateParty: PartyId, )(implicit @@ -50,17 +62,97 @@ final class SvConnection private ( ec: ExecutionContext, mat: Materializer, ): Future[Either[ - HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorizeProposalNotFound, - HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorizeResponse, + HttpSvPublicAppClient.OnboardSvPartyMigrationProposalNotFound, + Unit, ]] = runHttpCmd( config.url, - HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorize( + HttpSvPublicAppClient.OnboardSvPartyMigrationInitiate( candidateParticipantId, candidateParty, ), ) + /** Step 2 of the resumable party migration: perform a single, range-resumable download attempt + * of the DSO party ACS snapshot prepared by [[initiateDsoPartyMigration]], streaming the bytes + * straight into `outputFile` so the snapshot is never held in memory. If `outputFile` already + * holds a partial download, the request resumes from that byte offset. The caller is expected + * to retry until [[SvConnection.SnapshotDownloadAttempt.Complete]] is returned. + */ + def downloadDsoPartyAcsSnapshot( + candidateParty: PartyId, + outputFile: Path, + )(implicit + httpClient: HttpClient, + ec: ExecutionContext, + mat: Materializer, + ): Future[SvConnection.SnapshotDownloadAttempt] = { + import SvConnection.SnapshotDownloadAttempt + val offset = if (Files.exists(outputFile)) Files.size(outputFile) else 0L + if (offset > 0) + logger.info( + s"Resuming download of DSO party ACS snapshot for $candidateParty into $outputFile from offset $offset" + ) + else + logger.info( + s"Downloading fresh DSO party ACS snapshot for $candidateParty into $outputFile" + ) + val uri = config.url.withPath( + config.url.path / "api" / "sv" / "v0" / "onboard" / "sv" / "party-migration" / "snapshot" / candidateParty.toProtoPrimitive + ) + val rangeHeaders = + if (offset > 0) List(RangeHeader(ByteRange.fromOffset(offset))) else Nil + val request = HttpRequest(method = HttpMethods.GET, uri = uri, headers = rangeHeaders) + httpClient + .executeRequest("sv", "onboardSvPartyMigrationSnapshotDownload")(request) + .flatMap { response => + response.status match { + case StatusCodes.OK | StatusCodes.PartialContent => + val isPartial = response.status == StatusCodes.PartialContent + val total = + if (isPartial) + response + .header[`Content-Range`] + .flatMap(_.contentRange match { + case ContentRange.Default(_, _, instanceLength) => instanceLength + case _ => None + }) + else response.entity.contentLengthOption + // Append when resuming a partial download, otherwise (re)write from the start. + val openOptions: Set[OpenOption] = + if (isPartial && offset > 0) + Set(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.APPEND) + else + Set( + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING, + ) + response.entity.dataBytes + .runWith(FileIO.toPath(outputFile, openOptions)) + .map { _ => + val written = Files.size(outputFile) + if (total.contains(written)) SnapshotDownloadAttempt.Complete + else SnapshotDownloadAttempt.Incomplete(written, total) + } + case StatusCodes.RangeNotSatisfiable => + // We have already downloaded the whole snapshot. + response.discardEntityBytes().discard + Future.successful(SnapshotDownloadAttempt.Complete) + case StatusCodes.Conflict => + response.discardEntityBytes().discard + Future.successful(SnapshotDownloadAttempt.NotReady) + case StatusCodes.NotFound => + response.discardEntityBytes().discard + Future.successful(SnapshotDownloadAttempt.Absent) + case other => + Unmarshal(response.entity) + .to[String] + .map(body => SnapshotDownloadAttempt.Failed(s"Unexpected status $other: $body")) + } + } + } + def onboardSvSequencer( sequencerId: SequencerId )(implicit @@ -101,6 +193,29 @@ final class SvConnection private ( } object SvConnection { + + /** Outcome of a single [[SvConnection.downloadDsoPartyAcsSnapshot]] attempt. */ + sealed trait SnapshotDownloadAttempt + object SnapshotDownloadAttempt { + + /** The snapshot file has been fully downloaded. */ + case object Complete extends SnapshotDownloadAttempt + + /** Some bytes were downloaded but the snapshot is not complete yet; retry to resume. */ + final case class Incomplete(written: Long, total: Option[Long]) extends SnapshotDownloadAttempt + + /** The sponsor is still generating the snapshot; retry later. */ + case object NotReady extends SnapshotDownloadAttempt + + /** The sponsor has no snapshot for this candidate (e.g. it was lost on a restart); the + * candidate needs to re-initiate. + */ + case object Absent extends SnapshotDownloadAttempt + + /** The sponsor reported a failure preparing the snapshot. */ + final case class Failed(message: String) extends SnapshotDownloadAttempt + } + def apply( config: NetworkAppClientConfig, upgradesConfig: UpgradesConfig, diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvPublicAppClient.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvPublicAppClient.scala index 5111c3b7f0..354423dd78 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvPublicAppClient.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/api/client/commands/HttpSvPublicAppClient.scala @@ -255,23 +255,25 @@ object HttpSvPublicAppClient { sequencerSnapshot: CantonSequencerSnapshot, ) - case class OnboardSvPartyMigrationAuthorizeProposalNotFound( + case class OnboardSvPartyMigrationProposalNotFound( partyToParticipantMappingSerial: PositiveInt ) extends QuietNonRetryableException( s"Party migration failed as required proposals were not found. Found base mappings: PartyToParticipant($partyToParticipantMappingSerial)" ) - case class OnboardSvPartyMigrationAuthorizeResponse( - acsSnapshot: ByteString - ) - case class OnboardSvPartyMigrationAuthorize( + /** Step 1 of the resumable party migration: ask the sponsor to authorize hosting the DSO party + * on the candidate participant and to start exporting the ACS snapshot to a file. The snapshot + * itself is fetched separately via the resumable download endpoint (see + * [[org.lfdecentralizedtrust.splice.sv.admin.api.client.SvConnection.downloadDsoPartyAcsSnapshot]]). + */ + case class OnboardSvPartyMigrationInitiate( participantId: ParticipantId, candidate: PartyId, ) extends BaseCommandPublic[ - http.OnboardSvPartyMigrationAuthorizeResponse, + http.OnboardSvPartyMigrationInitiateResponse, Either[ - OnboardSvPartyMigrationAuthorizeProposalNotFound, - OnboardSvPartyMigrationAuthorizeResponse, + OnboardSvPartyMigrationProposalNotFound, + Unit, ], ] { override val nonErrorStatusCodes = Set(StatusCodes.BadRequest) @@ -282,9 +284,9 @@ object HttpSvPublicAppClient { ): EitherT[Future, Either[ Throwable, HttpResponse, - ], http.OnboardSvPartyMigrationAuthorizeResponse] = - client.onboardSvPartyMigrationAuthorize( - body = definitions.OnboardSvPartyMigrationAuthorizeRequest( + ], http.OnboardSvPartyMigrationInitiateResponse] = + client.onboardSvPartyMigrationInitiate( + body = definitions.OnboardSvPartyMigrationInitiateRequest( candidate.toProtoPrimitive ), headers = headers, @@ -293,14 +295,14 @@ object HttpSvPublicAppClient { override def handleOk()(implicit decoder: TemplateJsonDecoder ) = { - case http.OnboardSvPartyMigrationAuthorizeResponse.BadRequest( + case http.OnboardSvPartyMigrationInitiateResponse.BadRequest( definitions.OnboardSvPartyMigrationAuthorizeErrorResponse.members .AcceptedStateNotFoundErrorResponse( response ) ) => Left(response.acceptedStateNotFound.error) - case http.OnboardSvPartyMigrationAuthorizeResponse.BadRequest( + case http.OnboardSvPartyMigrationInitiateResponse.BadRequest( definitions.OnboardSvPartyMigrationAuthorizeErrorResponse.members .ProposalNotFoundErrorResponse( response @@ -308,23 +310,13 @@ object HttpSvPublicAppClient { ) => Right( Left( - OnboardSvPartyMigrationAuthorizeProposalNotFound( + OnboardSvPartyMigrationProposalNotFound( PositiveInt.tryCreate(response.proposalNotFound.partyToParticipantBaseSerial.intValue) ) ) ) - case http.OnboardSvPartyMigrationAuthorizeResponse.OK( - definitions.OnboardSvPartyMigrationAuthorizeResponse( - encodedAcsSnapshot - ) - ) => - Right( - Right( - OnboardSvPartyMigrationAuthorizeResponse( - ByteString.copyFrom(Base64.getDecoder.decode(encodedAcsSnapshot)) - ) - ) - ) + case http.OnboardSvPartyMigrationInitiateResponse.Accepted => + Right(Right(())) } } diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvPublicHandler.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvPublicHandler.scala index 074c879acd..c393511748 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvPublicHandler.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvPublicHandler.scala @@ -458,15 +458,15 @@ class HttpSvPublicHandler( * * Protection: Endpoint is protected by IP allowlisting */ - override def onboardSvPartyMigrationAuthorize( - respond: r0.OnboardSvPartyMigrationAuthorizeResponse.type + override def onboardSvPartyMigrationInitiate( + respond: r0.OnboardSvPartyMigrationInitiateResponse.type )( - body: definitions.OnboardSvPartyMigrationAuthorizeRequest + body: definitions.OnboardSvPartyMigrationInitiateRequest )( extracted: TraceContext - ): Future[r0.OnboardSvPartyMigrationAuthorizeResponse] = { + ): Future[r0.OnboardSvPartyMigrationInitiateResponse] = { implicit val traceContext: TraceContext = extracted - withSpan(s"$workflowId.onboardSvPartyMigrationAuthorize") { _ => _ => + withSpan(s"$workflowId.onboardSvPartyMigrationInitiate") { _ => _ => (for { candidateParty <- Codec.decode(Codec.Party)(body.candidatePartyId) } yield { @@ -491,26 +491,28 @@ class HttpSvPublicHandler( ) ) else - authorizeParticipantForHostingDsoParty( - ParticipantId.tryFromProtoPrimitive(candidateParticipantId.payload.svParticipantId) + initiatePartyMigration( + candidateParty, + ParticipantId.tryFromProtoPrimitive(candidateParticipantId.payload.svParticipantId), ) } yield res }).fold(errMsg => Future.failed(HttpErrorHandler.badRequest(errMsg)), identity) } } - private def authorizeParticipantForHostingDsoParty( - participantId: ParticipantId - )(implicit tc: TraceContext): Future[r0.OnboardSvPartyMigrationAuthorizeResponse] = { + private def initiatePartyMigration( + candidateParty: PartyId, + participantId: ParticipantId, + )(implicit tc: TraceContext): Future[r0.OnboardSvPartyMigrationInitiateResponse] = { dsoPartyMigration - .authorizeParticipantForHostingDsoParty(participantId) + .initiateSnapshot(candidateParty, participantId) .fold( { case DsoPartyHosting .RequiredProposalNotFound( partyToParticipantSerial ) => - r0.OnboardSvPartyMigrationAuthorizeResponseBadRequest( + r0.OnboardSvPartyMigrationInitiateResponseBadRequest( definitions.ProposalNotFoundErrorResponse( proposalNotFound = definitions.ProposalNotFoundErrorResponse.ProposalNotFound( BigInt(partyToParticipantSerial.value) @@ -518,15 +520,9 @@ class HttpSvPublicHandler( ) ) }, - { acsBytes => - // TODO(M3-57) consider if a more space-efficient encoding is necessary - val encoded = Base64.getEncoder.encodeToString(acsBytes.toByteArray) - r0.OnboardSvPartyMigrationAuthorizeResponseOK( - definitions.OnboardSvPartyMigrationAuthorizeResponse( - encoded - ) - ) - }, + // The snapshot is exported asynchronously; the candidate downloads it from the resumable + // GET /v0/onboard/sv/party-migration/snapshot/{candidate_party_id} endpoint. + _ => r0.OnboardSvPartyMigrationInitiateResponseAccepted, ) } diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvStreamHandler.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvStreamHandler.scala new file mode 100644 index 0000000000..92d2f376f8 --- /dev/null +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/HttpSvStreamHandler.scala @@ -0,0 +1,65 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.sv.admin.http + +import com.digitalasset.canton.tracing.{Spanning, TraceContext} +import io.opentelemetry.api.trace.Tracer +import org.apache.pekko.http.scaladsl.model.{ContentTypes, HttpEntity} +import org.apache.pekko.stream.scaladsl.FileIO +import org.lfdecentralizedtrust.splice.admin.http.HttpErrorHandler +import org.lfdecentralizedtrust.splice.http.v0.svStream as v0 +import org.lfdecentralizedtrust.splice.http.v0.svStream.SvStreamResource +import org.lfdecentralizedtrust.splice.sv.onboarding.sponsor.DsoPartyMigration +import org.lfdecentralizedtrust.splice.util.Codec + +import java.nio.file.Files +import scala.concurrent.Future + +/** Server-side handler for the SV streaming endpoints (see sv-stream-server.yaml). Serves the DSO + * party ACS snapshot prepared by the initiate endpoint directly from the file written by the + * sponsor. Returning a known-length [[HttpEntity.Default]] lets the `withRangeSupport` directive + * applied in [[org.lfdecentralizedtrust.splice.sv.SvApp]] turn Range requests into resumable 206 + * Partial Content responses. + */ +class HttpSvStreamHandler( + dsoPartyMigration: DsoPartyMigration +)(implicit protected val tracer: Tracer) + extends v0.SvStreamHandler[TraceContext] + with Spanning { + + protected val workflowId: String = this.getClass.getSimpleName + + override def onboardSvPartyMigrationSnapshot( + respond: SvStreamResource.OnboardSvPartyMigrationSnapshotResponse.type + )( + candidatePartyId: String + )(extracted: TraceContext): Future[SvStreamResource.OnboardSvPartyMigrationSnapshotResponse] = { + implicit val tc: TraceContext = extracted + withSpan(s"$workflowId.onboardSvPartyMigrationSnapshot") { _ => _ => + Codec.decode(Codec.Party)(candidatePartyId) match { + case Left(err) => + Future.failed(HttpErrorHandler.badRequest(s"Invalid candidate party id: $err")) + case Right(candidateParty) => + dsoPartyMigration.snapshotState(candidateParty) match { + case DsoPartyMigration.SnapshotState.Ready(file) => + Future.successful( + respond.OK( + HttpEntity.Default( + ContentTypes.`application/octet-stream`, + Files.size(file), + FileIO.fromPath(file), + ) + ) + ) + case DsoPartyMigration.SnapshotState.InProgress => + Future.successful(respond.Conflict) + case DsoPartyMigration.SnapshotState.NotFound => + Future.successful(respond.NotFound) + case DsoPartyMigration.SnapshotState.Failed(_) => + Future.successful(respond.InternalServerError) + } + } + } + } +} diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/ResponseEntityGuardrailSupport.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/ResponseEntityGuardrailSupport.scala new file mode 100644 index 0000000000..bac4c8c7df --- /dev/null +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/admin/http/ResponseEntityGuardrailSupport.scala @@ -0,0 +1,13 @@ +// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package org.lfdecentralizedtrust.splice.sv.admin.http + +import org.apache.pekko.http.scaladsl.marshalling.Marshaller +import org.apache.pekko.http.scaladsl.model.ResponseEntity + +/* An implicit marshaller to prevent guardrail from panicking over a non-json ResponseEntity */ +object ResponseEntityGuardrailSupport { + implicit val responseEntityMarshaller: Marshaller[ResponseEntity, ResponseEntity] = + Marshaller.opaque(identity) +} diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/onboarding/SvOnboardingPartyToParticipantProposalTrigger.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/onboarding/SvOnboardingPartyToParticipantProposalTrigger.scala index eb4d03adbb..5d78018a69 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/onboarding/SvOnboardingPartyToParticipantProposalTrigger.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/automation/singlesv/onboarding/SvOnboardingPartyToParticipantProposalTrigger.scala @@ -86,7 +86,7 @@ class SvOnboardingPartyToParticipantProposalTrigger( } } yield { // It is crucial to wait for a proposal from both the candidate and the sponsor. - // The proposal by the sponsor is only created through the onboard/sv/party-migration/authorize + // The proposal by the sponsor is only created through the onboard/sv/party-migration/initiate // which the candidate calls after having disconnected from the domain. // Without this check, the transaction can become valid while the candidate is still connected // which then results in all kinds of errors because it does not have an ACS import. diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala index 10934a5d28..58946d7f29 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/config/SvAppConfig.scala @@ -366,6 +366,10 @@ case class SvAppBackendConfig( participantBootstrappingDump: Option[ParticipantBootstrapDumpConfig] = None, identitiesDump: Option[BackupDumpConfig] = None, domainMigrationDumpPath: Option[Path] = None, + // Directory in which a sponsoring SV writes DSO party ACS snapshots while onboarding a new SV, + // so they can be served via the resumable party-migration download endpoint without being held + // in memory. + acsSnapshotDir: Path = java.nio.file.Paths.get("acs-snapshots"), onLedgerStatusReportInterval: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofMinutes(2), lsuSequencingTestInterval: NonNegativeFiniteDuration = NonNegativeFiniteDuration.ofSeconds(30), diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeDsoPartyHosting.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeDsoPartyHosting.scala index 600e40bedc..d07ed39e4e 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeDsoPartyHosting.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/joining/JoiningNodeDsoPartyHosting.scala @@ -12,19 +12,22 @@ import org.lfdecentralizedtrust.splice.environment.{ import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologySnapshot import org.lfdecentralizedtrust.splice.http.HttpClient import org.lfdecentralizedtrust.splice.sv.admin.api.client.SvConnection -import org.lfdecentralizedtrust.splice.sv.admin.api.client.commands.HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorizeProposalNotFound +import org.lfdecentralizedtrust.splice.sv.admin.api.client.commands.HttpSvPublicAppClient.OnboardSvPartyMigrationProposalNotFound import org.lfdecentralizedtrust.splice.sv.config.SvOnboardingConfig import org.lfdecentralizedtrust.splice.sv.onboarding.DsoPartyHosting import org.lfdecentralizedtrust.splice.sv.SvAppClientConfig import org.lfdecentralizedtrust.splice.util.TemplateJsonDecoder import com.digitalasset.canton.SynchronizerAlias +import com.digitalasset.canton.discard.Implicits.DiscardOps import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} import com.digitalasset.canton.topology.{SynchronizerId, ParticipantId, PartyId} import com.digitalasset.canton.tracing.TraceContext import io.grpc.Status import org.apache.pekko.stream.Materializer +import java.nio.file.{Files, Path} import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.util.Try class JoiningNodeDsoPartyHosting( participantAdminConnection: ParticipantAdminConnection, @@ -51,8 +54,13 @@ class JoiningNodeDsoPartyHosting( ): Future[Either[String, Unit]] = { getSponsorSvConfig(onboardingConfig) match { case Some(sponsorSvConfig) => - for { - response <- retryProvider.retry( + // The snapshot is downloaded to this file and streamed into the participant from there, + // so it is never held in memory and a failed download can resume from the byte offset. + val snapshotFile = Files.createTempFile("dso-party-acs-", ".snapshot") + val result = for { + // Step 1: propose hosting the DSO party on our participant, disconnect, and ask the + // sponsor to authorize the hosting and start preparing the ACS snapshot. + _ <- retryProvider.retry( RetryFor.WaitingOnInitDependency, "onboard_dso_party", "Onboard to DSO party hosting and decentralized namespace membership", @@ -73,13 +81,13 @@ class JoiningNodeDsoPartyHosting( _ = logger.info("Disconnecting from all domains") _ <- participantAdminConnection.disconnectFromAllSynchronizers() _ = logger.info("candidate SV participant disconnected from global domain") - response <- retryProvider + _ <- retryProvider .retry( RetryFor.WaitingOnInitDependency, - "authorize_dso_party", - "authorize DSO party hosting on sponsor", + "initiate_dso_party_migration", + "initiate DSO party migration on sponsor", svConnection - .authorizeDsoPartyHosting( + .initiateDsoPartyMigration( participantId, svParty, ) @@ -100,58 +108,58 @@ class JoiningNodeDsoPartyHosting( } else { Future.failed(proposalNotFound) } - case Right(acsSnapshot) => Future.successful(acsSnapshot) + case Right(()) => Future.unit }, logger, ) - .recoverWith { - case proposalNotFound: OnboardSvPartyMigrationAuthorizeProposalNotFound => - // Reconnect so that the participant gets its state in sync before the next retry - logger.info( - "Reconnecting to global domain so that the proposal can be recreated from the latest base." + .recoverWith { case proposalNotFound: OnboardSvPartyMigrationProposalNotFound => + // Reconnect so that the participant gets its state in sync before the next retry + logger.info( + "Reconnecting to global domain so that the proposal can be recreated from the latest base." + ) + for { + _ <- participantAdminConnection.connectSynchronizer(synchronizerAlias) + _ <- retryProvider.waitUntil( + RetryFor.WaitingOnInitDependency, + "party_hosting_serial_observed", + s"Serial ${proposalNotFound.partyToParticipantMappingSerial} expected by sponsor is observed", + participantAdminConnection + .getPartyToParticipant( + synchronizerId, + dsoParty, + topologySnapshot = TopologySnapshot.Sequenced, + ) + .map(result => + if ( + result.base.serial < proposalNotFound.partyToParticipantMappingSerial + ) { + throw Status.FAILED_PRECONDITION + .withDescription( + s"Current serial is ${result.base.serial}, waiting for ${proposalNotFound.partyToParticipantMappingSerial}" + ) + .asRuntimeException() + } + ), + logger, ) - for { - _ <- participantAdminConnection.connectSynchronizer(synchronizerAlias) - _ <- retryProvider.waitUntil( - RetryFor.WaitingOnInitDependency, - "party_hosting_serial_observed", - s"Serial ${proposalNotFound.partyToParticipantMappingSerial} expected by sponsor is observed", - participantAdminConnection - .getPartyToParticipant( - synchronizerId, - dsoParty, - topologySnapshot = TopologySnapshot.Sequenced, - ) - .map(result => - if ( - result.base.serial < proposalNotFound.partyToParticipantMappingSerial - ) { - throw Status.FAILED_PRECONDITION - .withDescription( - s"Current serial is ${result.base.serial}, waiting for ${proposalNotFound.partyToParticipantMappingSerial}" - ) - .asRuntimeException() - } - ), - logger, - ) - } yield throw Status.FAILED_PRECONDITION - .withDescription( - s"Failed because serial advanced and invalidated our proposal (serial reported by sponsor: ${proposalNotFound.partyToParticipantMappingSerial})" - ) - .asRuntimeException() + } yield throw Status.FAILED_PRECONDITION + .withDescription( + s"Failed because serial advanced and invalidated our proposal (serial reported by sponsor: ${proposalNotFound.partyToParticipantMappingSerial})" + ) + .asRuntimeException() } - } yield { - response - }).andThen(_ => svConnection.close()) + } yield ()).andThen(_ => svConnection.close()) }, logger, ) + // Step 2: resumably download the snapshot to a local file and import it. The participant + // stays disconnected throughout the import. + _ <- downloadSnapshotToFile(sponsorSvConfig, participantId, svParty, snapshotFile) _ = logger.info( "Received Acs snapshot from sponsor, importing into candidate participant" ) - _ <- participantAdminConnection.importPartyAcs( - response.acsSnapshot, + _ <- participantAdminConnection.importPartyAcsFromFile( + snapshotFile, synchronizerId, dsoParty, ) @@ -171,11 +179,73 @@ class JoiningNodeDsoPartyHosting( s"DSO party is now hosted in the candidate SV participant $participantId" ) } yield Right(()) + result.andThen { _ => + Try(Files.deleteIfExists(snapshotFile)).discard + } case None => Future.successful(Left("unexpected onboarding config")) } } + /** Resumably download the DSO party ACS snapshot from the sponsor into `snapshotFile`, retrying + * (and resuming by byte offset) until the full snapshot has been received. + */ + private def downloadSnapshotToFile( + sponsorSvConfig: SvAppClientConfig, + participantId: ParticipantId, + svParty: PartyId, + snapshotFile: Path, + )(implicit traceContext: TraceContext): Future[Unit] = + SvConnection( + sponsorSvConfig.adminApi, + upgradesConfig, + retryProvider, + loggerFactory, + ).flatMap { svConnection => + retryProvider + .retry( + RetryFor.WaitingOnInitDependency, + "download_dso_party_acs_snapshot", + "download DSO party ACS snapshot from sponsor", + svConnection.downloadDsoPartyAcsSnapshot(svParty, snapshotFile).flatMap { + case SvConnection.SnapshotDownloadAttempt.Complete => + Future.unit + case SvConnection.SnapshotDownloadAttempt.Incomplete(written, total) => + Future.failed( + Status.FAILED_PRECONDITION + .withDescription( + s"ACS snapshot download incomplete ($written/${total.fold("?")(_.toString)} bytes), resuming" + ) + .asRuntimeException() + ) + case SvConnection.SnapshotDownloadAttempt.NotReady => + Future.failed( + Status.FAILED_PRECONDITION + .withDescription("Sponsor is still generating the ACS snapshot") + .asRuntimeException() + ) + case SvConnection.SnapshotDownloadAttempt.Absent => + // The sponsor has no snapshot (e.g. it restarted); re-initiate and retry. + logger.info("Sponsor reported no ACS snapshot, re-initiating the party migration.") + svConnection.initiateDsoPartyMigration(participantId, svParty).transformWith { _ => + Future.failed( + Status.FAILED_PRECONDITION + .withDescription("Re-initiated ACS snapshot generation on sponsor") + .asRuntimeException() + ) + } + case SvConnection.SnapshotDownloadAttempt.Failed(message) => + Future.failed( + Status.FAILED_PRECONDITION + .withDescription(s"Sponsor failed to prepare ACS snapshot: $message") + .asRuntimeException() + ) + }, + logger, + ) + .andThen(_ => svConnection.close()) + } + private def getSponsorSvConfig( onboardingConfig: Option[SvOnboardingConfig] ): Option[SvAppClientConfig] = diff --git a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sponsor/DsoPartyMigration.scala b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sponsor/DsoPartyMigration.scala index da794281b1..767c9811f9 100644 --- a/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sponsor/DsoPartyMigration.scala +++ b/apps/sv/src/main/scala/org/lfdecentralizedtrust/splice/sv/onboarding/sponsor/DsoPartyMigration.scala @@ -5,6 +5,7 @@ package org.lfdecentralizedtrust.splice.sv.onboarding.sponsor import cats.data.EitherT import com.digitalasset.base.error.utils.ErrorDetails +import com.digitalasset.canton.discard.Implicits.DiscardOps import org.lfdecentralizedtrust.splice.codegen.java.splice.externalpartyamuletrules.ExternalPartyAmuletRules import org.lfdecentralizedtrust.splice.environment.{ ParticipantAdminConnection, @@ -19,17 +20,19 @@ import org.lfdecentralizedtrust.splice.sv.onboarding.DsoPartyHosting.DsoPartyMig import org.lfdecentralizedtrust.splice.sv.store.{SvDsoStore, SvSvStore} import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging} -import com.digitalasset.canton.topology.{ParticipantId, SynchronizerId} +import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId} import com.digitalasset.canton.tracing.TraceContext import com.digitalasset.canton.util.MonadUtil import com.digitalasset.canton.util.ShowUtil.* -import com.google.protobuf.ByteString import io.grpc.Status import io.grpc.StatusRuntimeException +import java.nio.file.{Files, Path, StandardCopyOption} import java.time.Instant import scala.annotation.unused +import scala.collection.concurrent.TrieMap import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.util.{Failure, Success, Try} class DsoPartyMigration( svStoreWithIngestion: AppStoreWithIngestion[SvSvStore], @@ -38,11 +41,14 @@ class DsoPartyMigration( @unused ledgerClient: SpliceLedgerClient, retryProvider: RetryProvider, dsoPartyHosting: DsoPartyHosting, + acsSnapshotDir: Path, protected val loggerFactory: NamedLoggerFactory, )(implicit ec: ExecutionContextExecutor ) extends NamedLogging { + import DsoPartyMigration.* + private val dsoStore = dsoStoreWithIngestion.store private val dsoParty = dsoStore.key.dsoParty private val svParty = dsoStore.key.svParty @@ -53,57 +59,157 @@ class DsoPartyMigration( loggerFactory, ) - def authorizeParticipantForHostingDsoParty( - participantId: ParticipantId - )(implicit tc: TraceContext): EitherT[Future, DsoPartyMigrationFailure, ByteString] = { - logger.info(s"Sponsor SV authorizing DSO party to participant $participantId") - for { - dsoRules <- EitherT.liftF(dsoStore.getDsoRules()) - // this will wait until the PartyToParticipant state change completed - _ <- partyHosting - .authorizeDsoPartyToParticipant( - dsoRules.domain, - participantId, - ) - activationTx <- EitherT.liftF( - participantAdminConnection - .getDsoPartyToParticipantTransaction( + Files.createDirectories(acsSnapshotDir).discard + + /** Tracks in-flight and failed exports keyed by the candidate party. A completed export is not + * tracked here; its presence is signalled by the existence of the final snapshot file on disk. + */ + private val exports = new TrieMap[PartyId, ExportProgress]() + + /** The file the snapshot for `candidate` is (or will be) written to. Party ids cannot contain a + * path separator, so they are safe to use directly in the file name. The sponsor party + * (`svParty`) is included so that several SVs sharing an `acsSnapshotDir` (e.g. local + * multi-SV test runs) don't collide on the same file. + */ + def snapshotFilePath(candidate: PartyId): Path = + acsSnapshotDir.resolve( + s"dso-party-acs-${svParty.toProtoPrimitive}-${candidate.toProtoPrimitive}.acs" + ) + + private def tmpFilePath(finalFile: Path): Path = + finalFile.resolveSibling(finalFile.getFileName.toString + ".tmp") + + /** State of the snapshot for `candidate`, used by the download endpoint to decide what to + * respond. + */ + def snapshotState(candidate: PartyId): SnapshotState = { + val file = snapshotFilePath(candidate) + if (Files.exists(file)) SnapshotState.Ready(file) + else + exports.get(candidate) match { + case Some(ExportProgress.Running) => SnapshotState.InProgress + case Some(ExportProgress.Failed(message)) => SnapshotState.Failed(message) + case None => SnapshotState.NotFound + } + } + + /** Step 1 of the resumable party migration: synchronously authorize hosting the DSO party on + * the candidate participant (this is where proposal-not-found surfaces, preserving the + * existing serial-based retry behaviour) and then kick off the ACS snapshot export to a file in + * the background. Returns as soon as the export has been started (or is already running / + * complete). The candidate downloads the resulting file via the resumable download endpoint. + */ + def initiateSnapshot( + candidate: PartyId, + participantId: ParticipantId, + )(implicit tc: TraceContext): EitherT[Future, DsoPartyMigrationFailure, Unit] = { + logger.info(s"Sponsor SV initiating DSO party snapshot for $candidate on $participantId") + val file = snapshotFilePath(candidate) + if (Files.exists(file)) { + logger.info(s"ACS snapshot for $candidate already available at $file, nothing to do") + EitherT.rightT[Future, DsoPartyMigrationFailure](()) + } else if (exports.get(candidate).contains(ExportProgress.Running)) { + logger.info(s"ACS snapshot export for $candidate already in progress, nothing to do") + EitherT.rightT[Future, DsoPartyMigrationFailure](()) + } else { + for { + dsoRules <- EitherT.liftF(dsoStore.getDsoRules()) + // this will wait until the PartyToParticipant state change completed + _ <- partyHosting + .authorizeDsoPartyToParticipant( dsoRules.domain, participantId, - dsoParty, ) - .getOrElseF( - Future.failed( - Status.NOT_FOUND - .withDescription( - s"Transaction where the participant $participantId was activated not found." - ) - .asRuntimeException() + activationTx <- EitherT.liftF( + participantAdminConnection + .getDsoPartyToParticipantTransaction( + dsoRules.domain, + participantId, + dsoParty, + ) + .getOrElseF( + Future.failed( + Status.NOT_FOUND + .withDescription( + s"Transaction where the participant $participantId was activated not found." + ) + .asRuntimeException() + ) ) - ) - ) - activationTime = activationTx.base.validFrom - _ = logger.info( - s"DSO party was authorized on $participantId, downloading snapshot at time $activationTime." - ) - acsBytes <- EitherT.liftF( - downloadSnapshotFromTime( - participantId, - activationTime, - dsoRules.domain, ) + activationTime = activationTx.base.validFrom + } yield { + logger.info( + s"DSO party was authorized on $participantId, starting background ACS snapshot export at time $activationTime." + ) + startBackgroundExportIfNeeded(candidate, participantId, activationTime, dsoRules.domain) + } + } + } + + private def startBackgroundExportIfNeeded( + candidate: PartyId, + participantId: ParticipantId, + activationTime: Instant, + decentralizedSynchronizer: SynchronizerId, + )(implicit tc: TraceContext): Unit = { + val file = snapshotFilePath(candidate) + // Atomically claim the export so that two concurrent initiate calls don't both spawn one. A + // previously failed entry is replaced (via CAS) here so that a retry re-runs the export. + val shouldStart = + if (Files.exists(file)) false + else + exports.get(candidate) match { + case Some(ExportProgress.Running) => false + case Some(failed: ExportProgress.Failed) => + exports.replace(candidate, failed, ExportProgress.Running) + case None => + exports.putIfAbsent(candidate, ExportProgress.Running).isEmpty + } + if (shouldStart) + runBackgroundExport(candidate, participantId, activationTime, decentralizedSynchronizer) + else + logger.info( + s"ACS snapshot export for $candidate already in progress or complete, not starting another" ) - } yield { - acsBytes + } + + private def runBackgroundExport( + candidate: PartyId, + participantId: ParticipantId, + activationTime: Instant, + decentralizedSynchronizer: SynchronizerId, + )(implicit tc: TraceContext): Unit = { + val finalFile = snapshotFilePath(candidate) + val tmpFile = tmpFilePath(finalFile) + val exportF = downloadSnapshotToFile( + participantId, + activationTime, + decentralizedSynchronizer, + tmpFile, + ).map { _ => + // Atomically publish the completed snapshot so the download endpoint never serves a + // partially-written file. + Files.move(tmpFile, finalFile, StandardCopyOption.ATOMIC_MOVE).discard + } + exportF.onComplete { + case Success(_) => + exports.remove(candidate).discard + logger.info(s"Completed ACS snapshot export for $candidate to $finalFile") + case Failure(e) => + Try(Files.deleteIfExists(tmpFile)).discard + exports.update(candidate, ExportProgress.Failed(Option(e.getMessage).getOrElse(e.toString))) + logger.warn(s"Failed to export ACS snapshot for $candidate", e) } } @SuppressWarnings(Array("org.wartremover.warts.Var")) - private def downloadSnapshotFromTime( + private def downloadSnapshotToFile( targetParticipantId: ParticipantId, activationTime: Instant, decentralizedSynchronizer: SynchronizerId, - )(implicit tc: TraceContext): Future[ByteString] = { + outputFile: Path, + )(implicit tc: TraceContext): Future[Unit] = { def submitDummyTransaction(): Future[Unit] = svStoreWithIngestion @@ -120,43 +226,67 @@ class DsoPartyMigration( .noDedup .yieldUnit() - for { - snapshot <- { - retryProvider.retry( - RetryFor.ClientCalls, - "download_acs_snapshot", - show"Download ACS snapshot for DSO at $activationTime", - participantAdminConnection - .exportPartyAcs( - dsoParty, - synchronizerId = decentralizedSynchronizer, - targetParticipantId = targetParticipantId, - activationTime = activationTime, - ) - .recoverWith { case ex: StatusRuntimeException => - val errorDetails = ErrorDetails.from(ex: StatusRuntimeException) - for { - _ <- MonadUtil.sequentialTraverse_(errorDetails) { - case ErrorDetails.ErrorInfoDetail( - PartyManagementServiceError.UnprocessedRequestedTimestamp.id, - metadata, - ) => - logger.info( - s"Requested record time $activationTime is not yet clean: $metadata, submitting dummy transaction" - ) - submitDummyTransaction() - case _ => Future.unit - } - } yield { - // rethrow to trigger the retry - throw ex - } - }, - logger, + retryProvider.retry( + RetryFor.ClientCalls, + "download_acs_snapshot", + show"Download ACS snapshot for DSO at $activationTime", + participantAdminConnection + .exportPartyAcsToFile( + dsoParty, + synchronizerId = decentralizedSynchronizer, + targetParticipantId = targetParticipantId, + activationTime = activationTime, + outputFile = outputFile, ) - } - } yield snapshot + .recoverWith { case ex: StatusRuntimeException => + val errorDetails = ErrorDetails.from(ex: StatusRuntimeException) + for { + _ <- MonadUtil.sequentialTraverse_(errorDetails) { + case ErrorDetails.ErrorInfoDetail( + PartyManagementServiceError.UnprocessedRequestedTimestamp.id, + metadata, + ) => + logger.info( + s"Requested record time $activationTime is not yet clean: $metadata, submitting dummy transaction" + ) + submitDummyTransaction() + case _ => Future.unit + } + } yield { + // rethrow to trigger the retry + throw ex + } + }, + logger, + ) + } + +} + +object DsoPartyMigration { + private sealed trait ExportProgress + private object ExportProgress { + case object Running extends ExportProgress + final case class Failed(message: String) extends ExportProgress } + /** State of a candidate's ACS snapshot as observed by the sponsor. */ + sealed trait SnapshotState + object SnapshotState { + + /** The snapshot has been fully written and is available at `file`. */ + final case class Ready(file: Path) extends SnapshotState + + /** The snapshot is currently being exported. */ + case object InProgress extends SnapshotState + + /** The last export attempt failed; the candidate should re-initiate. */ + final case class Failed(message: String) extends SnapshotState + + /** No snapshot is known for this candidate (e.g. initiate was never called, or it was lost on + * a sponsor restart); the candidate should (re-)initiate. + */ + case object NotFound extends SnapshotState + } } diff --git a/build.sbt b/build.sbt index c1a8494aff..aa33dc564f 100644 --- a/build.sbt +++ b/build.sbt @@ -1113,7 +1113,18 @@ lazy val `apps-sv` = pkg = "org.lfdecentralizedtrust.splice.http.v0", modules = List("pekko-http-v1.0.0", "circe"), customExtraction = true, - ) + ), + // Server-only: the binary snapshot download endpoint. The client is hand-written in + // SvConnection because guardrail cannot decode a binary stream response. + ScalaServer( + new File("apps/sv/src/main/openapi/sv-stream-server.yaml"), + pkg = "org.lfdecentralizedtrust.splice.http.v0", + modules = List("pekko-http-v1.0.0", "circe"), + imports = List( + "org.lfdecentralizedtrust.splice.sv.admin.http.ResponseEntityGuardrailSupport._" + ), + customExtraction = true, + ), ), ) diff --git a/cluster/helm/splice-sv-node/templates/sv.yaml b/cluster/helm/splice-sv-node/templates/sv.yaml index b421b54f54..e9121a726e 100644 --- a/cluster/helm/splice-sv-node/templates/sv.yaml +++ b/cluster/helm/splice-sv-node/templates/sv.yaml @@ -187,6 +187,11 @@ spec: - name: ADDITIONAL_CONFIG_GLOBAL_DOMAIN_UPGRADE_DUMP_PATH value: | canton.sv-apps.sv.domain-migration-dump-path = "/domain-upgrade-dump/domain_migration_dump.json" + {{- if .Values.acsSnapshot.enabled }} + - name: ADDITIONAL_CONFIG_ACS_SNAPSHOT_PATH + value: | + canton.sv-apps.sv.acs-snapshot-dir = "/acs-snapshots" + {{- end }} {{- if .Values.domain }} - name: ADDITIONAL_CONFIG_SV_DOMAIN #TODO(#930) - switch to port 443 https://github.com/DACH-NY/canton/issues/23835 @@ -580,6 +585,10 @@ spec: mountPath: /domain-upgrade-dump {{- end }} {{- end }} + {{- if .Values.acsSnapshot.enabled }} + - name: acs-snapshot-volume + mountPath: /acs-snapshots + {{- end }} {{- with .Values.resources }} resources: {{- toYaml . | nindent 12 }} {{- end }} @@ -623,6 +632,11 @@ spec: persistentVolumeClaim: claimName: {{ .Values.pvc.volumeName }} {{- end }} + {{- if .Values.acsSnapshot.enabled }} + - name: acs-snapshot-volume + persistentVolumeClaim: + claimName: {{ .Values.acsSnapshot.volumeName }} + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} @@ -670,6 +684,23 @@ spec: storage: 20G storageClassName: {{ .Values.pvc.volumeStorageClass }} {{- end }} +{{- if .Values.acsSnapshot.enabled }} +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ .Values.acsSnapshot.volumeName }} + namespace: {{ $.Release.Namespace }} + annotations: + helm.sh/resource-policy: keep +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Values.acsSnapshot.size }} + storageClassName: {{ .Values.acsSnapshot.volumeStorageClass }} +{{- end }} --- apiVersion: v1 kind: ConfigMap diff --git a/cluster/helm/splice-sv-node/values-template.yaml b/cluster/helm/splice-sv-node/values-template.yaml index cdd784b147..3151131ca5 100644 --- a/cluster/helm/splice-sv-node/values-template.yaml +++ b/cluster/helm/splice-sv-node/values-template.yaml @@ -58,6 +58,14 @@ pvc: volumeName: sv-app-global-domain-migration-pvc volumeStorageClass: standard-rwo +# pvc for the DSO party ACS snapshots a sponsoring SV writes while onboarding a new SV. +# Mounted at /acs-snapshots and used as the app's acs-snapshot-dir. +acsSnapshot: + enabled: true + size: 10Gi + volumeName: sv-app-acs-snapshot-pvc + volumeStorageClass: standard-rwo + failOnAppVersionMismatch: true permissionedSynchronizer: false diff --git a/cluster/helm/splice-sv-node/values.schema.json b/cluster/helm/splice-sv-node/values.schema.json index 9210285305..eda068d0d9 100644 --- a/cluster/helm/splice-sv-node/values.schema.json +++ b/cluster/helm/splice-sv-node/values.schema.json @@ -396,6 +396,23 @@ } } }, + "acsSnapshot": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean" + }, + "size": { + "type": "string" + }, + "volumeName": { + "type": "string" + }, + "volumeStorageClass": { + "type": "string" + } + } + }, "failOnAppVersionMismatch": { "type": "boolean" },