Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ docs/src/deployment/observability/metrics_reference.rst
/.vagrant/
/vagrant-nix-cache/
lnav*work

# SV ACS onboarding snapshots written at runtime (default acsSnapshotDir)
/acs-snapshots/
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ abstract class SvAppReference(
httpCommand(HttpSvPublicAppClient.CometBftJsonRpcRequest(id, method, params))
}

def onboardSvPartyMigrationAuthorize(
def onboardSvPartyMigrationInitiate(
participantId: ParticipantId,
candidateParty: PartyId,
): HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorizeResponse =
): Unit =
consoleEnvironment
.run {
httpCommand(
HttpSvPublicAppClient.OnboardSvPartyMigrationAuthorize(
HttpSvPublicAppClient.OnboardSvPartyMigrationInitiate(
participantId,
candidateParty,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class SvDsoPartyManagementIntegrationTest extends SvIntegrationTestBase with Wal
) {
val randomParty = allocateRandomSvParty("random")
assertThrowsAndLogsCommandFailures(
sv1Backend.onboardSvPartyMigrationAuthorize(
sv1Backend.onboardSvPartyMigrationInitiate(
sv3Backend.participantClient.id,
randomParty,
),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.environment

import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.google.protobuf.ByteString

import java.io.OutputStream
import scala.concurrent.{Future, Promise}

/** A [[io.grpc.stub.StreamObserver]] that writes each streamed chunk to the given
* [[java.io.OutputStream]] instead of accumulating the whole stream in memory. This keeps the
* memory footprint bounded regardless of how large the streamed payload (e.g. an ACS snapshot)
* is.
*
* The caller is responsible for opening the [[OutputStream]] and for closing it once
* [[resultFuture]] has completed (in both the success and failure cases).
*/
class ChunkWritingObserver[T](out: OutputStream, chunkOf: T => ByteString)
extends io.grpc.stub.StreamObserver[T] {
private val promise: Promise[Unit] = Promise[Unit]()

def resultFuture: Future[Unit] = promise.future

override def onNext(value: T): Unit =
try chunkOf(value).writeTo(out)
catch { case t: Throwable => promise.tryFailure(t).discard }

override def onError(t: Throwable): Unit = promise.tryFailure(t).discard

override def onCompleted(): Unit = promise.trySuccess(()).discard
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.{
TopologySnapshot,
}

import java.io.BufferedOutputStream
import java.nio.file.{Files, Path}
import java.time.Instant
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters.*
Expand Down Expand Up @@ -204,6 +206,51 @@ class ParticipantAdminConnection(
} yield ByteString.copyFrom(chunks.map(_.chunk).asJava)
}

/** Like [[exportPartyAcs]] but streams the snapshot chunks straight to `outputFile` instead of
* accumulating them in memory, so the full ACS is never held on the heap. The caller is
* responsible for the lifecycle of `outputFile` (e.g. writing to a temporary file and
* atomically renaming it once this future completes successfully).
*/
def exportPartyAcsToFile(
party: PartyId,
synchronizerId: SynchronizerId,
targetParticipantId: ParticipantId,
activationTime: Instant,
outputFile: Path,
)(implicit
traceContext: TraceContext
): Future[Unit] = {
val out = new BufferedOutputStream(Files.newOutputStream(outputFile))
val observer = new ChunkWritingObserver[ExportPartyAcsResponse](out, _.chunk)

val result = for {
// See exportPartyAcs for why we resolve the offset and subtract one.
activationOffset <- resolveOffset(Left(activationTime), synchronizerId, force = true)
beforeActivationOffset = activationOffset - 1L
_ = logger.info(
s"Exporting ACS snapshot for party $party from domain $synchronizerId at offset $beforeActivationOffset to file $outputFile"
)
_ <- runCmd(
ParticipantAdminCommands.PartyManagement.ExportPartyAcs(
party,
synchronizerId,
targetParticipantId,
beforeActivationOffset,
waitForActivationTimeout = None, // i.e., default
observer,
)
)
// The export streams asynchronously; this future completes once the last chunk has been
// written to `out`.
_ <- observer.resultFuture
} yield ()
// Close the stream (flushing the buffer) in both the success and failure cases.
result.transform { res =>
out.close()
res
}
}

def downloadAcsSnapshotNonChunked(
parties: Set[PartyId],
filterSynchronizerId: SynchronizerId,
Expand Down Expand Up @@ -267,6 +314,32 @@ class ParticipantAdminConnection(
)
}

/** Like [[importPartyAcs]] but reads the snapshot from a file on disk, streaming it into the
* participant so the full ACS is never held on the heap. A fresh [[java.io.InputStream]] is
* opened on every (retried) attempt.
*/
def importPartyAcsFromFile(acsFile: Path, synchronizerId: SynchronizerId, partyId: PartyId)(
implicit tc: TraceContext
): Future[Unit] = {
retryProvider.retryForClientCalls(
"import_party_acs",
"Imports the acs in the participant",
runCmd(
ParticipantAdminCommands.PartyManagement
.ImportPartyAcs(
Files.newInputStream(acsFile),
synchronizerId,
IMPORT_ACS_WORKFLOW_ID_PREFIX,
contractImportMode = ContractImportMode.Validation,
representativePackageIdOverride = RepresentativePackageIdOverride.NoOverride,
party = Some(partyId),
),
timeoutOverride = Some(GrpcAdminCommand.DefaultUnboundedTimeout),
).map(_ => ()),
logger,
)
}

def getParticipantId()(implicit traceContext: TraceContext): Future[ParticipantId] =
getId().map(ParticipantId(_))

Expand Down
33 changes: 15 additions & 18 deletions apps/sv/src/main/openapi/sv-internal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -429,24 +429,29 @@ paths:
$ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/400"
"500":
$ref: "../../../../common/src/main/openapi/common-external.yaml#/components/responses/500"
/v0/onboard/sv/party-migration/authorize:
# Step 1 of the resumable SV party migration. Authorizes hosting the DSO party on the
# candidate participant (synchronous, surfaces proposal/accepted-state errors) and then
# asynchronously exports the DSO party ACS snapshot to a file on the sponsor. Returns 202
# immediately; the candidate then downloads the snapshot via the resumable, range-enabled
# GET /v0/onboard/sv/party-migration/snapshot/{candidate_party_id} endpoint, which is defined
# in sv-stream-server.yaml (server-only generation, since guardrail cannot generate a client
# for a binary stream response).
/v0/onboard/sv/party-migration/initiate:
post:
tags: [sv]
x-jvm-package: sv_public
operationId: "onboardSvPartyMigrationAuthorize"
operationId: "onboardSvPartyMigrationInitiate"
requestBody:
required: true
content:
application/json:
schema:
"$ref": "#/components/schemas/OnboardSvPartyMigrationAuthorizeRequest"
"$ref": "#/components/schemas/OnboardSvPartyMigrationInitiateRequest"
responses:
"200":
description: ok
content:
application/json:
schema:
"$ref": "#/components/schemas/OnboardSvPartyMigrationAuthorizeResponse"
"202":
description: |
Snapshot generation has been accepted and is running (or already complete).
Poll the snapshot download endpoint until it returns the snapshot.
"400":
description: system state not yet valid
content:
Expand Down Expand Up @@ -987,22 +992,14 @@ components:
completed: "#/components/schemas/SvOnboardingStateCompleted"
unknown: "#/components/schemas/SvOnboardingStateUnknown"

OnboardSvPartyMigrationAuthorizeRequest:
OnboardSvPartyMigrationInitiateRequest:
type: object
required:
- candidate_party_id
properties:
candidate_party_id:
type: string

OnboardSvPartyMigrationAuthorizeResponse:
type: object
required:
- acs_snapshot
properties:
acs_snapshot:
type: string

OnboardSvPartyMigrationAuthorizeErrorResponse:
oneOf:
- "$ref": "#/components/schemas/AcceptedStateNotFoundErrorResponse"
Expand Down
48 changes: 48 additions & 0 deletions apps/sv/src/main/openapi/sv-stream-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
openapi: 3.0.0
info:
title: SV Streaming API
version: 0.0.1
tags:
- name: sv
description: |
SV operator endpoints that stream binary payloads.
servers:
- url: https://example.com/api/sv
paths:

# Note for developers working on this openAPI spec: we generate only server-side code from this
# file, not client-side, because guardrail cannot decode a binary stream response as JSON.
# The client lives in SvConnection.downloadDsoPartyAcsSnapshot and must be updated manually to
# reflect any changes here.

/v0/onboard/sv/party-migration/snapshot/{candidate_party_id}:
get:
tags: [sv]
x-jvm-package: svStream
operationId: "onboardSvPartyMigrationSnapshot"
description: |
Step 2 of the resumable SV party migration: download the DSO party ACS snapshot prepared by
the POST /v0/onboard/sv/party-migration/initiate endpoint. The response is served from a
file with HTTP range support (withRangeSupport in SvApp), so a Range request yields a 206
Partial Content response and a failed download can resume from a byte offset.
parameters:
- name: "candidate_party_id"
in: "path"
required: true
schema:
type: string
responses:
"200":
description: the ACS snapshot
content:
application/octet-stream:
schema:
type: string
format: binary
x-scala-type: org.apache.pekko.http.scaladsl.model.ResponseEntity
"404":
description: no snapshot for this candidate; call the initiate endpoint first
"409":
description: the snapshot is still being generated; retry later
"500":
description: the snapshot generation failed; re-initiate
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.lfdecentralizedtrust.splice.http.{HttpClient, HttpRateLimiter}
import org.lfdecentralizedtrust.splice.http.v0.sv_admin.SvAdminResource
import org.lfdecentralizedtrust.splice.http.v0.sv_operator.SvOperatorResource
import org.lfdecentralizedtrust.splice.http.v0.sv_public.SvPublicResource
import org.lfdecentralizedtrust.splice.http.v0.svStream.SvStreamResource
import org.lfdecentralizedtrust.splice.setup.ParticipantInitializer
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion
import org.lfdecentralizedtrust.splice.store.AppStoreWithIngestion.SpliceLedgerConnectionPriority
Expand All @@ -59,6 +60,7 @@ import org.lfdecentralizedtrust.splice.sv.admin.http.{
HttpSvAdminHandler,
HttpSvOperatorHandler,
HttpSvPublicHandler,
HttpSvStreamHandler,
}
import org.lfdecentralizedtrust.splice.sv.automation.{
DsoDelegateBasedAutomationService,
Expand Down Expand Up @@ -463,6 +465,17 @@ class SvApp(
throw new IllegalStateException("No initial round specified in user's metadata")
}

dsoPartyMigration = new DsoPartyMigration(
svAutomation,
dsoAutomation,
participantAdminConnection,
ledgerClient,
retryProvider,
dsoPartyHosting,
config.acsSnapshotDir,
loggerFactory,
)

publicHandler = new HttpSvPublicHandler(
config.ledgerApiUser,
svAutomation,
Expand All @@ -473,19 +486,13 @@ class SvApp(
participantAdminConnection,
synchronizerNodeService,
retryProvider,
new DsoPartyMigration(
svAutomation,
dsoAutomation,
participantAdminConnection,
ledgerClient,
retryProvider,
dsoPartyHosting,
loggerFactory,
),
dsoPartyMigration,
loggerFactory,
initialRound,
)

streamHandler = new HttpSvStreamHandler(dsoPartyMigration)

operatorHandler = new HttpSvOperatorHandler(
svAutomation,
dsoAutomation,
Expand Down Expand Up @@ -547,6 +554,17 @@ class SvApp(

errorHandler.directive(traceContext) {
concat(
// The binary ACS-snapshot download endpoint. withRangeSupport turns the known-length
// file entity returned by the handler into resumable 206 Partial Content responses
// when a Range header is present.
SvStreamResource.routes(
streamHandler,
operation =>
withRangeSupport.tflatMap(_ =>
buildOperation("svStream", operation)
.tflatMap(_ => provide(traceContext))
),
),
SvPublicResource.routes(
publicHandler,
operation =>
Expand Down
Loading
Loading