diff --git a/src/clients/rpc-client/pkc-rpc-client.ts b/src/clients/rpc-client/pkc-rpc-client.ts index 06e882cb8..70c257ed2 100644 --- a/src/clients/rpc-client/pkc-rpc-client.ts +++ b/src/clients/rpc-client/pkc-rpc-client.ts @@ -36,7 +36,10 @@ import type { RpcResolveAuthorNameResult, RpcSubscriptionIdResult, RpcSuccessResult, - RpcFetchCidResult + RpcFetchCidResult, + ExportCommunityRpcParam, + CancelExportRpcParam, + RpcExportCommunityResult } from "./types.js"; import { parseRpcCommunityIdentifierParam, @@ -49,7 +52,10 @@ import { parseRpcResolveAuthorNameResult, parseRpcFetchCidResult, parseRpcSuccessResult, - parseRpcSubscriptionIdResult + parseRpcSubscriptionIdResult, + parseRpcExportCommunityParam, + parseRpcCancelExportParam, + parseRpcExportCommunityResult } from "./rpc-schema-util.js"; const log = Logger("pkc-js:PKCRpcClient"); @@ -479,6 +485,33 @@ export default class PKCRpcClient extends TypedEmitter { return res; } + // community.export() — see src/rpc/EXPORT_COMMUNITY_SPEC.md + + // HTTP origin to absolutize relative `/exports/` URLs returned by exportsSubscribe. + // Derived from the WS URL with ws[s]:// swapped to http[s]:// and the auth-key path stripped. + get rpcHttpOrigin(): string { + const parsed = new URL(this._websocketServerUrl); + const httpProto = parsed.protocol === "wss:" ? "https:" : "http:"; + return `${httpProto}//${parsed.host}`; + } + + async exportCommunity(args: ExportCommunityRpcParam): Promise { + const parsedArgs = parseRpcExportCommunityParam(args); + return parseRpcExportCommunityResult(await this._webSocketClient.call("exportCommunity", [parsedArgs])); + } + + async exportsSubscribe(args: CommunityIdentifierRpcParam): Promise { + const parsedArgs = parseRpcCommunityIdentifierParam(args); + const res = parseRpcSubscriptionIdResult(await this._webSocketClient.call("exportsSubscribe", [parsedArgs])); + this._initSubscriptionEvent(res.subscriptionId); + return res; + } + + async cancelExport(args: CancelExportRpcParam): Promise { + const parsedArgs = parseRpcCancelExportParam(args); + return parseRpcSuccessResult(await this._webSocketClient.call("cancelExport", [parsedArgs])); + } + async getDefaults() { throw Error("Not implemented"); } diff --git a/src/clients/rpc-client/rpc-schema-util.ts b/src/clients/rpc-client/rpc-schema-util.ts index 4960540b3..2ad2bee7e 100644 --- a/src/clients/rpc-client/rpc-schema-util.ts +++ b/src/clients/rpc-client/rpc-schema-util.ts @@ -11,7 +11,11 @@ import { RpcResolveAuthorNameResultSchema, RpcFetchCidResultSchema, RpcSuccessResultSchema, - RpcSubscriptionIdResultSchema + RpcSubscriptionIdResultSchema, + RpcExportCommunityParamSchema, + RpcCancelExportParamSchema, + RpcExportCommunityResultSchema, + RpcExportschangeResultSchema } from "./schema.js"; // Param parsers — all use .loose() so newer clients can send extra fields @@ -30,3 +34,9 @@ export const parseRpcResolveAuthorNameResult = (result: unknown) => RpcResolveAu export const parseRpcFetchCidResult = (result: unknown) => RpcFetchCidResultSchema.loose().parse(result); export const parseRpcSuccessResult = (result: unknown) => RpcSuccessResultSchema.loose().parse(result); export const parseRpcSubscriptionIdResult = (result: unknown) => RpcSubscriptionIdResultSchema.loose().parse(result); + +// community.export() — wire param/result parsers +export const parseRpcExportCommunityParam = (params: unknown) => RpcExportCommunityParamSchema.loose().parse(params); +export const parseRpcCancelExportParam = (params: unknown) => RpcCancelExportParamSchema.loose().parse(params); +export const parseRpcExportCommunityResult = (result: unknown) => RpcExportCommunityResultSchema.loose().parse(result); +export const parseRpcExportschangeResult = (result: unknown) => RpcExportschangeResultSchema.loose().parse(result); diff --git a/src/clients/rpc-client/schema.ts b/src/clients/rpc-client/schema.ts index f63155b2a..b233ead7c 100644 --- a/src/clients/rpc-client/schema.ts +++ b/src/clients/rpc-client/schema.ts @@ -1,7 +1,7 @@ import { z } from "zod"; import { CommentIpfsSchema, CommentUpdateSchema } from "../../publications/comment/schema.js"; import { AuthorAddressSchema, ChallengeAnswersSchema, CidStringSchema } from "../../schema/schema.js"; -import { CommunityEditOptionsSchema } from "../../community/schema.js"; +import { CommunityEditOptionsSchema, CommunityExportRecordsSchema } from "../../community/schema.js"; import { NameResolveCacheOptionsSchema } from "../../schema.js"; import type { EncodedDecryptedChallengeVerificationMessageType } from "../../pubsub-messages/types.js"; export const SubscriptionIdSchema = z.number().positive().int(); @@ -64,3 +64,18 @@ export const RpcFetchCidResultSchema = z.object({ content: z.string() }); export const RpcResolveAuthorNameResultSchema = z.object({ resolvedAuthorName: z.string().nullable() }); export const RpcSuccessResultSchema = z.object({ success: z.literal(true) }); export const RpcSubscriptionIdResultSchema = z.object({ subscriptionId: SubscriptionIdSchema }); // parsed with .loose() in rpc-schema-util.ts + +// community.export() — wire params and results +export const RpcExportCommunityParamSchema = z + .object({ + name: z.string().min(1).optional(), + publicKey: z.string().min(1).optional(), + includePrivateKey: z.boolean().optional() + }) + .refine((args) => args.name || args.publicKey, "At least one of name or publicKey must be provided"); + +export const RpcCancelExportParamSchema = z.object({ exportId: z.string().uuid() }); + +export const RpcExportCommunityResultSchema = z.object({ exportId: z.string().uuid() }); + +export const RpcExportschangeResultSchema = z.object({ records: CommunityExportRecordsSchema }); diff --git a/src/clients/rpc-client/types.ts b/src/clients/rpc-client/types.ts index c20cb2ff5..9fef33cdf 100644 --- a/src/clients/rpc-client/types.ts +++ b/src/clients/rpc-client/types.ts @@ -11,7 +11,11 @@ import { RpcResolveAuthorNameResultSchema, RpcFetchCidResultSchema, RpcSuccessResultSchema, - RpcSubscriptionIdResultSchema + RpcSubscriptionIdResultSchema, + RpcExportCommunityParamSchema, + RpcCancelExportParamSchema, + RpcExportCommunityResultSchema, + RpcExportschangeResultSchema } from "./schema.js"; import type { PageIpfs, ModQueuePageIpfs } from "../../pages/types.js"; import type { PageRuntimeFields } from "../../pages/util.js"; @@ -25,12 +29,16 @@ export type CommentPageRpcParam = z.infer; export type EditCommunityRpcParam = z.infer; export type PublishChallengeAnswersRpcParam = z.infer; +export type ExportCommunityRpcParam = z.infer; +export type CancelExportRpcParam = z.infer; // Result types (shared between RPC client and server) export type RpcResolveAuthorNameResult = z.infer; export type RpcFetchCidResult = z.infer; export type RpcSuccessResult = z.infer; export type RpcSubscriptionIdResult = z.infer; +export type RpcExportCommunityResult = z.infer; +export type RpcExportschangeResult = z.infer; // Re-export existing complex types used as RPC return values export type { RpcInternalCommunityRecordBeforeFirstUpdateType, RpcLocalCommunityUpdateResultType } from "../../community/types.js"; diff --git a/src/community/remote-community.ts b/src/community/remote-community.ts index 34ce1bd45..bf86c9da6 100644 --- a/src/community/remote-community.ts +++ b/src/community/remote-community.ts @@ -27,7 +27,9 @@ import type { RpcLocalCommunityLocalProps, CommunityEditOptions, CommunityEventArgs, - CommunityEvents + CommunityEvents, + CommunityExportRecord, + ExportCommunityUserOptions } from "./types.js"; import * as remeda from "remeda"; import { ModQueuePages, PostsPages } from "../pages/pages.js"; @@ -749,4 +751,14 @@ export class RemoteCommunity extends TypedEmitter implements Om async start() { throw Error("Can't start a remote community"); } + + // Community export (issue #79). Overridden by LocalCommunity and the RPC variants. + // The base implementation rejects because a read-only RemoteCommunity has no DB to back up. + get exports(): CommunityExportRecord[] { + return []; + } + + async export(options?: ExportCommunityUserOptions): Promise<{ exportId: string }> { + throw new PKCError("ERR_COMMUNITY_NOT_LOCAL", { address: this.address }); + } } diff --git a/src/community/rpc-local-community.ts b/src/community/rpc-local-community.ts index 838c2d014..10d40cd77 100644 --- a/src/community/rpc-local-community.ts +++ b/src/community/rpc-local-community.ts @@ -5,8 +5,10 @@ import type { RpcLocalCommunityLocalProps, RpcLocalCommunityUpdateResultType, CommunityEditOptions, + CommunityExportRecord, CommunityIpfsType, - CommunityStartedState + CommunityStartedState, + ExportCommunityUserOptions } from "./types.js"; import { RpcRemoteCommunity } from "./rpc-remote-community.js"; import { z } from "zod"; @@ -32,6 +34,13 @@ import type { import { deepMergeRuntimeFields, hideClassPrivateProps } from "../util.js"; import { findStartedCommunity, trackStartedCommunity, untrackStartedCommunity } from "../pkc/tracked-instance-registry-util.js"; +// Shallow clone preserving the nested error object so consumers can mutate without +// affecting cached state. Local copy because `local-community/export.ts` is node-only and +// RpcLocalCommunity must work in the browser. +function cloneExportRecord(record: CommunityExportRecord): CommunityExportRecord { + return { ...record, ...(record.error ? { error: { ...record.error } } : {}) }; +} + // This class is for communities that are running and publishing, over RPC. Can be used for both browser and node export class RpcLocalCommunity extends RpcRemoteCommunity { override started: boolean; // Is the community started and running? This is not specific to this instance, and applies to all instances of community with this address @@ -56,6 +65,14 @@ export class RpcLocalCommunity extends RpcRemoteCommunity { private _startRpcSubscriptionId?: z.infer = undefined; _usingDefaultChallenge!: RpcLocalCommunityLocalProps["_usingDefaultChallenge"]; + // community.export() over RPC. The subscription is attached eagerly during createCommunity + // so consumers see prior exports (including ones still in flight from earlier client sessions) + // immediately, without having to call community.export() first. + private _exportsSubscriptionId?: number = undefined; + private _exportsCache: CommunityExportRecord[] = []; + // exportId → detach AbortSignal listener. Cleared when the export reaches a terminal state. + private _exportSignalDetachers: Map void> = new Map(); + constructor(pkc: PKC) { super(pkc); this.started = false; @@ -338,6 +355,100 @@ export class RpcLocalCommunity extends RpcRemoteCommunity { return super.update(); } + // community.export() over RPC — see src/rpc/EXPORT_COMMUNITY_SPEC.md + override get exports(): CommunityExportRecord[] { + return this._exportsCache.map(cloneExportRecord); + } + + override async export(options: ExportCommunityUserOptions = {}): Promise<{ exportId: string }> { + // Sync validation — matches embedded path + if (options.exportPath !== undefined) throw new PKCError("ERR_EXPORT_PATH_NOT_SUPPORTED_OVER_RPC", { address: this.address }); + if (options.signal?.aborted) { + const reason = (options.signal as AbortSignal).reason; + throw reason ?? new DOMException("The operation was aborted.", "AbortError"); + } + + const { exportId } = await this._pkc._pkcRpcClient!.exportCommunity({ + name: this.name, + publicKey: this.publicKey, + includePrivateKey: options.includePrivateKey + }); + + if (options.signal) { + const userSignal = options.signal; + const onAbort = () => { + this._pkc._pkcRpcClient!.cancelExport({ exportId }).catch((e) => { + Logger("pkc-js:rpc-local-community:export").error("Failed to send cancelExport", exportId, e); + }); + }; + // Aborted between sync validation and exportCommunity returning — route the cancel. + if (userSignal.aborted) onAbort(); + else { + userSignal.addEventListener("abort", onAbort, { once: true }); + this._exportSignalDetachers.set(exportId, () => userSignal.removeEventListener("abort", onAbort)); + } + } + return { exportId }; + } + + async _attachExportsSubscription(): Promise { + if (this._exportsSubscriptionId !== undefined) return; + if (!this._pkc._pkcRpcClient) return; // not on an RPC PKC — should not happen for this class + + const { subscriptionId } = await this._pkc._pkcRpcClient.exportsSubscribe({ + name: this.name, + publicKey: this.publicKey + }); + this._exportsSubscriptionId = subscriptionId; + + const subscription = this._pkc._pkcRpcClient.getSubscription(subscriptionId); + let resolveInitial!: () => void; + let rejectInitial!: (e: Error) => void; + const initialReceived = new Promise((resolve, reject) => { + resolveInitial = resolve; + rejectInitial = reject; + }); + let seenInitial = false; + + subscription.on("exportschange", (msg: any) => { + const records = (msg?.params?.result?.records ?? []) as CommunityExportRecord[]; + this._absorbExportRecords(records); + if (!seenInitial) { + seenInitial = true; + resolveInitial(); + } + }); + subscription.on("error", (msg: any) => { + if (!seenInitial) { + seenInitial = true; + rejectInitial(msg?.params?.result ?? new Error("exportsSubscribe error before initial notification")); + } + }); + + this._pkc._pkcRpcClient.emitAllPendingMessages(subscriptionId); + await initialReceived; + } + + private _absorbExportRecords(wireRecords: CommunityExportRecord[]): void { + const httpOrigin = this._pkc._pkcRpcClient!.rpcHttpOrigin; + this._exportsCache = wireRecords.map((rec) => { + // Wire-format url is relative (`/exports/`) once the export completes; absolutize. + if (rec.url && rec.url.startsWith("/")) return { ...rec, url: new URL(rec.url, httpOrigin).href }; + return rec; + }); + // Detach signal listeners for terminal records — the server already finalized them. + for (const rec of wireRecords) { + if (rec.progress === 1 || rec.error) { + const detach = this._exportSignalDetachers.get(rec.exportId); + if (detach) { + detach(); + this._exportSignalDetachers.delete(rec.exportId); + } + } + } + this.emit("exportschange", this._exportsCache.map(cloneExportRecord)); + } + override async delete() { // Make sure to stop updating or starting first const startedCommunity = findStartedCommunity(this._pkc, { publicKey: this.publicKey, name: this.name }); diff --git a/src/community/schema.ts b/src/community/schema.ts index d0a31f658..71e800328 100644 --- a/src/community/schema.ts +++ b/src/community/schema.ts @@ -322,6 +322,55 @@ export const CreateCommunityFunctionArgumentsSchema = CreateNewLocalCommunityUse export const ListOfCommunitiesSchema = CommunityAddressSchema.array(); +// community.export() + +export const ExportCommunityUserOptionsSchema = z + .object({ + includePrivateKey: z.boolean().optional(), + exportPath: z.string().min(1).optional(), + signal: z.custom((v) => v === undefined || (typeof v === "object" && v !== null && "aborted" in v)).optional() + }) + .strict(); + +export const CommunityExportRecordErrorSchema = z + .object({ + code: z.string(), + message: z.string() + }) + .strict(); + +// Public per the design: progress is the only state indicator; size/sha256/url present +// when progress === 1; error present when the export failed (cancellation included). +export const CommunityExportRecordSchema = z + .object({ + exportId: z.string().uuid(), + name: z.string().optional(), + publicKey: z.string(), + includePrivateKey: z.boolean(), + progress: z.number().min(0).max(1), + size: z.number().int().nonnegative().optional(), + sha256: z + .string() + .regex(/^[0-9a-f]{64}$/) + .optional(), + url: z.string().optional(), + error: CommunityExportRecordErrorSchema.optional() + }) + .strict() + .superRefine((rec, ctx) => { + if (rec.progress !== 1 || rec.error) return; + for (const field of ["size", "sha256", "url"] as const) { + if (rec[field] === undefined) + ctx.addIssue({ + code: z.ZodIssueCode.custom, + path: [field], + message: `${field} must be present when progress === 1 and no error is set` + }); + } + }); + +export const CommunityExportRecordsSchema = CommunityExportRecordSchema.array(); + // Reserved fields // TODO should make the array of class props typed @@ -344,7 +393,8 @@ export const CommunityIpfsReservedFields = remeda.difference( "editable", "publishingState", "updatingState", - "started" + "started", + "exports" ], remeda.keys.strict(CommunityIpfsSchema.shape) ); diff --git a/src/community/types.ts b/src/community/types.ts index b62d3b2f3..c721fd2c6 100644 --- a/src/community/types.ts +++ b/src/community/types.ts @@ -22,7 +22,9 @@ import { CommunitySuggestedSchema, RpcRemoteCommunityUpdateEventResultSchema, CommunitySignedPropertyNames, - CommunityRoleNames + CommunityRoleNames, + ExportCommunityUserOptionsSchema, + CommunityExportRecordSchema } from "./schema.js"; import { RpcLocalCommunity } from "./rpc-local-community.js"; import { LocalCommunity } from "../runtime/node/community/local-community.js"; @@ -228,9 +230,14 @@ export interface CommunityEvents { update: (updatedCommunity: RemoteCommunity) => void; + exportschange: (records: CommunityExportRecord[]) => void; + removeListener: (eventName: string, listener: Function) => void; } +export type ExportCommunityUserOptions = z.infer; +export type CommunityExportRecord = z.infer; + // Create a helper type to extract the parameters of each event export type CommunityEventArgs = Parameters; diff --git a/src/constants.ts b/src/constants.ts index 151be24cd..fde4634eb 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -2,7 +2,8 @@ export enum STORAGE_KEYS { INTERNAL_COMMUNITY, // InternalCommunityType PERSISTENT_DELETED_COMMUNITIES, // These are basically community db files that we're unable to remove for some reason on windows LAST_IPNS_RECORD, // The last published IPNS record of the community, updated everytime we publish a new one - COMBINED_HASH_OF_PENDING_COMMENTS // hash of all cids of pending comments. This is used to decide to publish a new mod queue or not + COMBINED_HASH_OF_PENDING_COMMENTS, // hash of all cids of pending comments. This is used to decide to publish a new mod queue or not + EXPORTS // CommunityExportRecord[] — backups of this community produced by community.export(); persisted so they survive process restart } // Configs for LRU storage diff --git a/src/errors.ts b/src/errors.ts index 0af6c7cae..04e8c67ac 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -364,5 +364,15 @@ export enum messages { ERR_RPC_CLIENT_TRYING_TO_DELETE_REMOTE_COMMUNITY = "RPC client is attempting to delete remote community", ERR_GENERIC_RPC_CLIENT_CALL_ERROR = "RPC client received an unknown error when executing call over websocket", ERR_RPC_CLIENT_CHALLENGE_NAME_NOT_AVAILABLE_ON_SERVER = "The challenge name is not available on the RPC server. Available challenges are listed in details.availableChallenges", - ERR_ROLE_ADDRESS_NAME_COULD_NOT_BE_RESOLVED = "Role address name could not be resolved" + ERR_ROLE_ADDRESS_NAME_COULD_NOT_BE_RESOLVED = "Role address name could not be resolved", + + // Community export errors + ERR_COMMUNITY_NOT_LOCAL = "community.export() can only be called on a LocalCommunity on this daemon", + ERR_COMMUNITY_NOT_FOUND = "Community is unknown to this daemon — neither name nor publicKey matches a community", + ERR_PRIVATE_KEY_EXPORT_NOT_ALLOWED = "The RPC server is configured to disallow exporting community private keys", + ERR_EXPORT_PATH_NOT_SUPPORTED_OVER_RPC = "exportPath is only supported in embedded mode; download from the record's url instead", + ERR_EXPORT_PATH_TARGETS_LIVE_DB = "exportPath resolves to the live community database; refusing to overwrite it", + ERR_EXPORT_CANCELLED = "Community export was cancelled via AbortSignal", + ERR_EXPORT_BACKUP_FAILED = "Community export failed during the sqlite backup", + ERR_DOWNLOAD_EXPORT_ID_NOT_FOUND = "No export with that exportId exists on this RPC server" } diff --git a/src/pkc/pkc-with-rpc-client.ts b/src/pkc/pkc-with-rpc-client.ts index 05a651301..532b4583f 100644 --- a/src/pkc/pkc-with-rpc-client.ts +++ b/src/pkc/pkc-with-rpc-client.ts @@ -117,6 +117,7 @@ export class PKCWithRpcClient extends PKC { else community.initRpcInternalCommunityBeforeFirstUpdateNoMerge(rawRecord); } if (jsonified.raw) Object.assign(community.raw, jsonified.raw); + await community._attachExportsSubscription(); return community; } else if (isCommunityRpcLocal) { // No jsonified data — do a fresh fetch @@ -131,7 +132,7 @@ export class PKCWithRpcClient extends PKC { await Promise.race([updatePromise, errorPromise]); await community.stop(); if (error) throw error; - + await community._attachExportsSubscription(); return community; } else { log.trace("Creating a remote RPC community instance with address", effectiveAddress); @@ -157,6 +158,7 @@ export class PKCWithRpcClient extends PKC { await community.initRpcInternalCommunityBeforeFirstUpdateNoMerge(communityPropsAfterCreation); community.emit("update", community); await this._awaitCommunitiesToIncludeCommunity(communityPropsAfterCreation.localCommunity.address); + await community._attachExportsSubscription(); return community; } else throw Error("Failed to create community rpc instance, are you sure you provided the correct args?"); } diff --git a/src/pkc/pkc.ts b/src/pkc/pkc.ts index 62f9c4063..a304e8404 100644 --- a/src/pkc/pkc.ts +++ b/src/pkc/pkc.ts @@ -1136,6 +1136,21 @@ export class PKC extends PKCTypedEmitter implements ParsedPKCOptions this._destroyAbortController?.abort(new PKCError("ERR_PKC_IS_DESTROYED")); // Clean up connections + // Abort in-flight community exports before stopping — leaving them running would race + // the dbHandler / RPC teardown and leak .partial files. See EXPORT_COMMUNITY_SPEC.md. + const exportDonePromises: Promise[] = []; + for (const community of listStartedCommunities(this)) { + const activeExports = ( + community as { _activeExports?: Map }> } + )._activeExports; + if (!activeExports) continue; + for (const handle of activeExports.values()) { + handle.controller.abort(); + exportDonePromises.push(handle.donePromise.catch((e) => log.error("Error during export teardown", e))); + } + } + if (exportDonePromises.length) await Promise.all(exportDonePromises); + for (const comment of listUpdatingComments(this)) await comment.stop(); for (const community of listUpdatingCommunities(this)) await community.stop(); diff --git a/src/rpc/EXPORT_COMMUNITY_SPEC.md b/src/rpc/EXPORT_COMMUNITY_SPEC.md new file mode 100644 index 000000000..9e8c5d3a0 --- /dev/null +++ b/src/rpc/EXPORT_COMMUNITY_SPEC.md @@ -0,0 +1,259 @@ +# Community Export Specification + +This document describes the design of the `community.export()` feature added in [issue #79](https://github.com/pkcprotocol/pkc-js/issues/79). It supersedes any earlier draft of this file. + +## Implementation status + +| Surface | Status | +|---|---| +| Embedded `community.export()` on `LocalCommunity` | ✅ Shipped (PR #100) | +| Per-community KeyV persistence of `community.exports` | ✅ Shipped | +| `exportschange` event (in-process) | ✅ Shipped | +| AbortSignal cancellation (in-process) | ✅ Shipped | +| `pkc.destroy()` aborts in-flight exports | ✅ Shipped | +| Read-only `RemoteCommunity.export()` rejection | ✅ Shipped | +| RPC wire methods (`exportCommunity` / `exportsSubscribe` / `cancelExport`) | ✅ Shipped (PR #100) | +| HTTP `GET /exports/` download endpoint | ✅ Shipped (PR #100) | +| 24h orphan sweep on server startup | ✅ Shipped (PR #100) | +| Post-download server-side deletion | ✅ Shipped (PR #100) | +| `allowPrivateKeyExport` policy on RPC server | ✅ Shipped (PR #100) | + +## Public API + +The export feature lives entirely on `Community` (every variant: `LocalCommunity`, `RemoteCommunity`, `RpcLocalCommunity`, `RpcRemoteCommunity`). PKC has no `exportCommunity`/`cancelExport` methods — exports are per-community. + +```ts +community.export(options?: ExportCommunityUserOptions): Promise<{ exportId: string }> +community.exports: CommunityExportRecord[]; +community.on("exportschange", (records: CommunityExportRecord[]) => void): this; +``` + +`community.export()` validates the request and returns a fresh `exportId` once the work is enqueued. The actual backup runs asynchronously; observers watch progress and completion through `community.exports` and the `exportschange` event. Cancellation is exclusively via `options.signal`. + +Both `community.exports` and the `exportschange` payload are **deep-cloned snapshots** — mutating a record returned to the consumer does not affect internal state. + +### Types + +```ts +export interface ExportCommunityUserOptions { + includePrivateKey?: boolean; // default false + exportPath?: string; // EMBEDDED MODE ONLY. Throws in RPC-client mode. + // Default: /exports/.sqlite. + signal?: AbortSignal; // Optional. If already aborted at call time, + // the promise rejects with signal.reason and no + // record is created. This is the only way to + // cancel an export. +} + +export interface CommunityExportRecord { + exportId: string; // UUIDv4 generated when community.export() is called + name?: string; // community name, if it has one + publicKey: string; // community public key + includePrivateKey: boolean; // echoes the request option + + progress: number; // 0..1; 1 means complete + + // present once progress === 1: + size?: number; // bytes + sha256?: string; // hex + url?: string; // consumer-resolvable: + // - embedded: file:/// + // - RPC client view: http(s):///exports/ + + // present if the export failed (also covers cancellation): + error?: { code: string; message: string }; +} +``` + +### Inferring state from the record + +Per the design discussion on issue #79, the record intentionally does not carry an explicit state enum or progress-internal counters (`pagesCopied`, `totalPages`, `startedAt`, `completedAt`). Consumers infer state purely from `progress` and `error`: + +| State | How to detect | +|--------------|--------------------------------------------| +| in progress | `progress < 1 && !error` | +| complete | `progress === 1` | +| failed | `error !== undefined` | +| cancelled | `error?.code === "ERR_EXPORT_CANCELLED"` | + +If a future need arises for a finer-grained state (e.g. additional states beyond "in progress" / "failed" / "succeeded"), adding a `state` field is backwards-compatible. + +### Validation: sync vs async + +- **Synchronous (rejects the `community.export()` promise before any record is created):** + - Community is not a `LocalCommunity` on this daemon (e.g. it's a read-only `RemoteCommunity`). + - `includePrivateKey: true` but RPC server policy disallows it. *(Planned — depends on RPC server.)* + - `exportPath` provided but caller is using an RPC client. *(Planned — depends on RPC client.)* + - `exportPath` resolves to the live community database file (would clobber it). + - `options.signal` is already aborted (rejects with `signal.reason`). + +- **Asynchronous (creates a record with `error` set and fires `exportschange`):** + - Disk full while writing the sqlite backup. + - Source DB read errors mid-backup. + - Any other failure after the work was enqueued. + +### Cancellation + +Cancellation is exclusively via the `signal` option: + +```ts +const ac = new AbortController(); +const { exportId } = await community.export({ signal: ac.signal }); +// ...later, anywhere we still hold `ac`... +ac.abort(); +``` + +Behavior: +- Aborting while in progress stops the sqlite backup, unlinks the `.partial` file, sets `error.code = "ERR_EXPORT_CANCELLED"`, fires `exportschange`. +- Aborting after the export is already complete is a no-op (idempotent). + +The AbortSignal lifetime intentionally outlives the `community.export()` promise: the promise resolves once the export is enqueued (returning `{ exportId }`), but the signal stays observed by the underlying backup task until terminal state. Same pattern as `node:fs.watch(path, { signal })` and `fetch(url, { signal })` for streaming responses. + +There is no public `cancelExport(exportId)` method. *(Planned)* the RPC client will use a wire-level `cancelExport({ exportId })` to translate `signal.abort()` into a server-side cancel. + +### Identifiers and URLs + +- `exportId` (UUIDv4) is the canonical identifier returned to the caller and used in URLs, filenames, log lines, and `_activeExports` map keys. +- On-disk filename: `.sqlite` under `/exports/`. Caller-supplied `exportPath` (embedded only) overrides this; the embedded path validates that `exportPath` does not resolve to the live community DB and rejects with `ERR_EXPORT_PATH_TARGETS_LIVE_DB` if it does. +- Embedded mode (shipped) emits an absolute `file://` URL directly. +- RPC download URL path *(planned)*: `/exports/`. The wire-format `record.url` will be a **relative URL** (`/exports/`); the RPC client will absolutize via `new URL(wireUrl, rpcHttpOrigin).href` before exposing to the consumer, where `rpcHttpOrigin` is the WebSocket URL with `ws[s]://` swapped to `http[s]://` and any `authKey` path stripped. +- Consumer code branches on `new URL(record.url).protocol === "file:"` and uses `fileURLToPath()` for `fs.*` calls; otherwise `fetch()` (once the HTTP endpoint ships). + +### Integration with `pkc.destroy()` + +Each `LocalCommunity` keeps a private `_activeExports: Map` keyed by `exportId`. Each enqueued export adds an entry; terminal transitions remove it. `pkc.destroy()` walks every community's `_activeExports` and cancels everything in flight before the existing teardown. + +On the RPC server side, `PKCWsServer.close()` will likewise cancel per-connection subscriptions. *(Planned — depends on RPC server.)* + +## RPC Wire Protocol (Planned — deferred from this PR) + +> The wire methods, subscription channel, and HTTP download endpoint are **not implemented** in PR #100. They are described here as the agreed design for the follow-up PR on issue #79. Embedded callers should ignore this section. + +Three new RPC methods. `AbortSignal` is a client-side concept and never crosses the wire — when an RPC-side `community.export({ signal })` caller's signal aborts, the RPC client routes that to a `cancelExport` call. + +### `exportCommunity({ name?, publicKey?, includePrivateKey? })` → `{ exportId: string }` + +- At least one of `name` or `publicKey` must be provided. +- Server resolves the community address using the existing `_findCommunityAddress({ name, publicKey })`. +- Validates community is a `LocalCommunity` (error `ERR_COMMUNITY_NOT_LOCAL` otherwise). +- If `includePrivateKey === true`, checks policy; on denial returns `ERR_PRIVATE_KEY_EXPORT_NOT_ALLOWED`. +- On success: generates `exportId`, appends a `progress: 0` record to that community's `exports`, kicks off the backup task, returns `{ exportId }` synchronously. + +### `exportsSubscribe({ name?, publicKey? })` → subscription + +- Per-community subscription. +- Initial notification carries the **current** `community.exports` array. +- Subsequent notifications fire whenever any record on the community changes. +- Notification shape: `event: "exportschange"`, `result: CommunityExportRecord[]`. +- The wire-format `record.url` is a **relative URL** (`/exports/`); the RPC client absolutizes. +- Tearing down the subscription does **not** cancel any in-flight exports. + +### `cancelExport({ exportId })` → `void` + +- Idempotent: unknown `exportId` returns success without action. +- Server stops the backup if running, unlinks the `.partial`, sets `error.code = ERR_EXPORT_CANCELLED`, fires `exportschange`. + +### Disconnect behavior + +On client disconnect, the server **does not** automatically cancel that client's exports. Records stay alive in `community.exports`; another client that subscribes will see them. The export feature exists partly to survive disconnects. + +> **Future-work hook**: if `exportschange` notifications carrying full lists become bandwidth-heavy with many concurrent in-progress exports, we can split progress into a separate `exportProgressNotification` channel that carries `{ exportId, progress }` deltas without re-emitting the whole list. Backwards-compatible: the full-list `exportschange` would still fire on every terminal transition. + +## Server-Side Backup (also used by embedded mode) + +Helper in `src/runtime/node/util.ts`: + +```ts +backupCommunityDb({ sourcePath, destPath, includePrivateKey, onProgress, signal }): + Promise<{ size: number; sha256: string }> +``` + +`destPath` is whatever the caller decides: +- RPC server → always `/exports/`. +- Embedded mode → caller-supplied `exportPath`, or `/exports/` default. + +Steps: +1. `mkdir -p path.dirname(destPath)`. +2. Open source DB with `better-sqlite3` (read-only; `.backup()` is safe under WAL — already used by the deletion path). +3. Call `sourceDb.backup(destPath + ".partial", { progress: onProgress })` in chunks; check `signal.aborted` between chunks and throw `BackupAbortError` to cancel. +4. On completion, open the `.partial` copy, scrub private key if requested, close; recheck `signal.aborted` afterwards. +5. Compute sha256 of `.partial`; recheck `signal.aborted` afterwards. +6. `fs.rename(destPath + ".partial", destPath)` — atomic commit. +7. On any error (including abort), `unlink(destPath + ".partial")` and rethrow. + +The hash is computed on the `.partial` file before the rename. Since the rename is a metadata-only operation on the same filesystem and is the very last step, the bytes hashed are the bytes that end up at `destPath`. + +### Private-key scrubbing (`includePrivateKey: false`) + +The signer lives inside the `internalCommunity` KeyV record. With the backup DB open as a second `better-sqlite3` connection, read that record, set `signer.privateKey = undefined` (and `signer.ipfsKey` if present), write it back, close. + +**Scope**: only the community signer's keys are scrubbed. The `pseudonymityAliases` table's `aliasPrivateKey` column is intentionally **not** scrubbed — those keys are part of the per-comment alias identity and belong to the publication record, not the community's own signing material. + +## Server-Side File Location & Naming + +- Directory: `/exports/` (sibling of `/communities/`). +- On-disk filename: `.sqlite`. Embedded callers can override the entire path with `exportPath`. +- Persistence: each community persists its `exports` array in its internal KeyV record so records survive process restart. On community load, the server prunes any records whose backing files no longer exist on disk. + +### Retention + +- **Embedded mode (shipped)**: no auto-deletion. The record stays in `community.exports` indefinitely. The user manages cleanup; deleting the file out-of-band causes the record to be pruned from `community.exports` on next community load (via `loadAndPruneExportsFromKeyv`). +- **After successful HTTP download** *(Planned)*: the RPC server deletes the export file once it finishes streaming the HTTP response, removes the record from `community.exports`, and fires `exportschange`. +- **Never-downloaded exports** *(Planned)*: on RPC server startup, delete any files in `/exports/` older than 24 hours and prune the matching records. + +## HTTP Download Endpoint (Planned — deferred from this PR) + +- Attached to the **same port as the WebSocket RPC**, via the `server` option already accepted by `RpcWebsocketsServer`. Construct a plain `http.Server`, register a `request` listener for `GET /exports/`, then pass the server to `RpcWebsocketsServer` so the `upgrade` event routes WS traffic correctly. +- Returns `200` with `Content-Length` and `Content-Type: application/vnd.sqlite3`, streams file. +- After streaming completes successfully, server deletes the file, removes the record from `community.exports`, and fires `exportschange`. +- Unknown `exportId`: `404`. +- No range/resume in v1. + +### Capability via `exportId` + +The `exportId` is unguessable (UUIDv4 = 122 bits of entropy), so it doubles as the HTTP capability. No separate token map is needed; the URL path is just the `exportId` and the file on disk is `.sqlite`. Records persist with the community's KeyV state, so capabilities survive RPC server restart automatically. + +## Private Key Policy (RPC Server) (Planned — deferred from this PR) + +- Config flag on `pkcOptions.rpcServer`: `allowPrivateKeyExport` (default `true` — matches private-RPC scope). +- Public-RPC operators can set `allowPrivateKeyExport: false`; server rejects any `includePrivateKey: true` request with `ERR_PRIVATE_KEY_EXPORT_NOT_ALLOWED`. +- Embedded pkc-js (no RPC server) always honors `includePrivateKey` (shipped). + +## Concurrency + +- Per-community serialization. Each `LocalCommunity` instance keeps a private `_exportQueue: Promise` chained per call. A second `community.export()` call for the same community immediately appends a record (`progress: 0`) and waits for the prior export to finish before stepping its own backup. The same in-process serialization will back the RPC server when that lands. +- Different communities export in parallel. + +## File Format + +Raw sqlite file (no wrapping). + +- `better-sqlite3.backup()` already produces exactly a `.sqlite` file. +- Inspectable directly with the `sqlite3` CLI. +- Metadata (community address, signer if included) is already in the DB itself. +- Importing later = open the `.sqlite`, copy rows into `/communities/
`. + +## Error Codes (in `src/errors.ts`) + +Sync errors (thrown from `community.export()`): +- `ERR_COMMUNITY_NOT_LOCAL` — community doesn't correspond to a LocalCommunity on this daemon (shipped). +- `ERR_EXPORT_PATH_TARGETS_LIVE_DB` — caller-supplied `exportPath` resolves to the live community DB; refusing to overwrite it (shipped). +- `ERR_PRIVATE_KEY_EXPORT_NOT_ALLOWED` — server refused due to policy *(planned)*. +- `ERR_EXPORT_PATH_NOT_SUPPORTED_OVER_RPC` — caller used an RPC client and passed `exportPath` *(planned)*. + +Async errors (recorded in `record.error.code`): +- `ERR_EXPORT_CANCELLED` — recorded when an `AbortSignal` aborts an in-progress record (or when `pkc.destroy()` cancels in-flight exports). The signal is honored throughout the backup pipeline, including the post-backup scrub, hash, and rename steps. +- `ERR_EXPORT_BACKUP_FAILED` — generic catch-all for `better-sqlite3.backup()` failures (disk full, source DB corruption, etc.); inspect `error.message` for specifics. + +HTTP-only *(planned)*: +- `ERR_DOWNLOAD_EXPORT_ID_NOT_FOUND` — `GET /exports/` hit but no record/file exists (404 to the caller; logged server-side with this code). + +## Out of Scope (future work) + +- `pkc.importCommunity()` — companion import. +- `pkc.deleteExport()` / `renameExport()` — explicit management APIs. +- Range/resume support on HTTP download. +- Splitting progress into a dedicated `exportProgressNotification` channel. +- `tar.gz` + manifest format version. +- Public/multi-tenant RPC host considerations (per-user quotas, auditing, per-user `community.exports` filtering). +- `bitsocial-cli` command (separate plan for the bitsocial-cli repo). diff --git a/src/rpc/src/index.ts b/src/rpc/src/index.ts index fd4eba8fe..f57277822 100644 --- a/src/rpc/src/index.ts +++ b/src/rpc/src/index.ts @@ -1,6 +1,9 @@ import pLimit from "p-limit"; import { Server as RpcWebsocketsServer } from "rpc-websockets"; -import { mkdirSync } from "fs"; +import { createReadStream, existsSync, mkdirSync, promises as fsPromises, statSync } from "fs"; +import http, { type Server as HTTPServer, type ServerResponse } from "http"; +import type { Server as HTTPSServer } from "https"; +import { fileURLToPath } from "node:url"; import path from "path"; import Database, { type Database as BetterSqlite3Database } from "better-sqlite3"; import PKCJs, { setPKCJs } from "./lib/pkc-js/index.js"; @@ -81,7 +84,9 @@ import { parseRpcCommunityPageParam, parseRpcEditCommunityParam, parseRpcPublishChallengeAnswersParam, - parseRpcUnsubscribeParam + parseRpcUnsubscribeParam, + parseRpcExportCommunityParam, + parseRpcCancelExportParam } from "../../clients/rpc-client/rpc-schema-util.js"; import type { CommunityIdentifierRpcParam, @@ -91,8 +96,10 @@ import type { RpcResolveAuthorNameResult, RpcCommentPageResult, RpcCommunityPageResult, - RpcLocalCommunityUpdateResultType + RpcLocalCommunityUpdateResultType, + RpcExportCommunityResult } from "../../clients/rpc-client/types.js"; +import type { CommunityExportRecord } from "../../community/types.js"; import { findStartedCommunity } from "../../pkc/tracked-instance-registry-util.js"; // store started communities to be able to stop them @@ -125,19 +132,62 @@ class PKCWsServer extends TypedEmitter { private _autoStartOnBoot: boolean = false; private _autoStartConcurrency: number; private _rpcStateDb: BetterSqlite3Database | undefined; - - constructor({ port, server, pkc, authKey, startStartedCommunitiesOnStartup, autoStartConcurrency }: PKCWsServerClassOptions) { + private _allowPrivateKeyExport: boolean; + // Per-address cache of LocalCommunity instances loaded for export purposes when the + // community isn't currently in _startedCommunities. Keeps `_exports` and the + // `exportschange` event consistent across exportCommunity / exportsSubscribe / cancelExport + // calls for the same address. + private _exportCommunityInstances: Map = new Map(); + // http.Server backing the WS — populated when caller passes neither `server` nor relies on + // rpc-websockets to bind its own. We attach the GET /exports/ route to whichever + // http server underlies the WS. + private _httpServer: HTTPServer | HTTPSServer | undefined; + private _ownsHttpServer: boolean = false; + + constructor({ + port, + server, + pkc, + authKey, + startStartedCommunitiesOnStartup, + autoStartConcurrency, + allowPrivateKeyExport + }: PKCWsServerClassOptions) { super(); const log = Logger("pkc-js:PKCWsServer"); this.authKey = authKey; this._autoStartOnBoot = startStartedCommunitiesOnStartup ?? true; // Clamp to at least 1 since pLimit(0) throws. 0 or 1 means sequential (no parallelism) this._autoStartConcurrency = Math.max(1, autoStartConcurrency ?? 5); + // Default true — matches private-RPC scope. Public-RPC operators set false. + this._allowPrivateKeyExport = allowPrivateKeyExport ?? true; // don't instantiate pkc in constructor because it's an async function this._initPKC(pkc); + + // Always operate on an explicit http.Server so we can attach the /exports/ + // route alongside the WS upgrade handler. When the caller passed neither `server` nor + // delegated port-binding to rpc-websockets, we create and own one. + if (server) { + this._httpServer = server; + } else { + this._httpServer = http.createServer(); + this._ownsHttpServer = true; + if (typeof port === "number") this._httpServer.listen(port); + } + this._httpServer.on("request", (req, res) => { + // rpc-websockets does not register request listeners — any HTTP request reaching + // this http.Server is ours to handle. Currently the only route is GET /exports/. + this._handleExportsHttpRequest(req, res).catch((e) => { + log.error("Unhandled error in /exports HTTP handler", e); + if (!res.headersSent) { + res.statusCode = 500; + res.end(); + } + }); + }); + this.rpcWebsockets = new RpcWebsocketsServer({ - port, - server, + server: this._httpServer, verifyClient: ({ req }, callback) => { // block non-localhost requests without auth key for security @@ -232,6 +282,11 @@ class PKCWsServer extends TypedEmitter { this.rpcWebsocketsRegister("publishChallengeAnswers", this.publishChallengeAnswers.bind(this)); this.rpcWebsocketsRegister("unsubscribe", this.unsubscribe.bind(this)); + // community.export() — see EXPORT_COMMUNITY_SPEC.md + this.rpcWebsocketsRegister("exportCommunity", this.exportCommunity.bind(this)); + this.rpcWebsocketsRegister("exportsSubscribe", this.exportsSubscribe.bind(this)); + this.rpcWebsocketsRegister("cancelExport", this.cancelExport.bind(this)); + hideClassPrivateProps(this); } @@ -1520,6 +1575,208 @@ class PKCWsServer extends TypedEmitter { return pkc.resolveAuthorName(parsedArgs); } + // community.export() — see src/rpc/EXPORT_COMMUNITY_SPEC.md + // Resolves to the canonical LocalCommunity instance for the given identifier so that + // exportCommunity / exportsSubscribe / cancelExport all share the same `_exports` array + // and `exportschange` event source. Cache-first ordering matters: once the first + // export-related call lands on an instance, every subsequent one must reuse it. Otherwise + // a subscription attached before community.start() would listen on a different + // LocalCommunity than the started instance picked up by a later exportCommunity call + // (pkc.createCommunity at src/pkc/pkc.ts:749 constructs a new LocalCommunity per call), + // and the export's `exportschange` would never reach the listener. + private async _resolveLocalCommunityForExport(parsedArgs: { name?: string; publicKey?: string }): Promise { + const address = this._findCommunityAddress(parsedArgs); + if (!address) throw new PKCError("ERR_COMMUNITY_NOT_FOUND", { name: parsedArgs.name, publicKey: parsedArgs.publicKey }); + + const cached = this._exportCommunityInstances.get(address); + if (cached) return cached; + + // No cache yet — prefer the started instance if one exists, otherwise load fresh. + const started = findStartedCommunity(this.pkc, parsedArgs); + if (started instanceof LocalCommunity) { + this._exportCommunityInstances.set(address, started); + return started; + } + + const community = await this.pkc.createCommunity({ address }); + if (!(community instanceof LocalCommunity)) throw new PKCError("ERR_COMMUNITY_NOT_LOCAL", { address }); + this._exportCommunityInstances.set(address, community); + return community; + } + + // Server-side records carry `file://` URLs (the embedded path). Over the wire we emit a + // relative `/exports/` URL; the RPC client absolutizes against rpcHttpOrigin. + private _toWireExportRecord(rec: CommunityExportRecord): CommunityExportRecord { + if (rec.progress === 1 && rec.url) return { ...rec, url: `/exports/${rec.exportId}` }; + return rec; + } + + async exportCommunity(params: any, connectionId: string): Promise { + const parsedArgs = parseRpcExportCommunityParam(params[0]); + if (parsedArgs.includePrivateKey === true && !this._allowPrivateKeyExport) + throw new PKCError("ERR_PRIVATE_KEY_EXPORT_NOT_ALLOWED", {}); + const community = await this._resolveLocalCommunityForExport(parsedArgs); + return community.export({ includePrivateKey: parsedArgs.includePrivateKey }); + } + + async exportsSubscribe(params: any, connectionId: string): Promise { + const parsedArgs = parseRpcCommunityIdentifierParam(params[0]); + const subscriptionId = generateSubscriptionId(); + const community = await this._resolveLocalCommunityForExport(parsedArgs); + + const sendEvent = (event: string, result: any) => + this.jsonRpcSendNotification({ + method: "exportschange", + subscription: subscriptionId, + event, + result, + connectionId + }); + + const exportschangeListener = (records: CommunityExportRecord[]) => + sendEvent("exportschange", { records: records.map((r) => this._toWireExportRecord(r)) }); + community.on("exportschange", exportschangeListener); + + this.subscriptionCleanups[connectionId][subscriptionId] = async () => { + community.removeListener("exportschange", exportschangeListener); + }; + + // Initial notification carries the current exports array. Per spec, tearing down this + // subscription does NOT cancel any in-flight exports — exports outlive client connections. + sendEvent("exportschange", { records: community.exports.map((r) => this._toWireExportRecord(r)) }); + + return { subscriptionId }; + } + + async cancelExport(params: any, connectionId: string): Promise { + const { exportId } = parseRpcCancelExportParam(params[0]); + for (const community of this._exportLoadedCommunities()) + if (community._activeExports.has(exportId)) { + await community._cancelExport(exportId); + break; + } + // Idempotent: unknown exportId returns success without action (per spec). + return { success: true }; + } + + private *_exportLoadedCommunities(): IterableIterator { + for (const value of Object.values(this._startedCommunities)) if (value instanceof LocalCommunity) yield value; + for (const value of this._exportCommunityInstances.values()) yield value; + } + + private _findCommunityOwningExport(exportId: string): { community: LocalCommunity; record: CommunityExportRecord } | undefined { + for (const community of this._exportLoadedCommunities()) { + const record = community._exports.find((r) => r.exportId === exportId); + if (record) return { community, record }; + } + return undefined; + } + + // GET /exports/ — streams the sqlite backup over HTTP on the same port as the WS. + // After a successful response, deletes the file and prunes the record (per spec). + // Deletes export files older than 24h. Called once on server startup (per spec). + // Records pointing to deleted files get pruned the next time the community loads via + // loadAndPruneExportsFromKeyv. We do not eagerly load every community here to avoid the + // boot cost; the embedded loader handles cleanup lazily. + async _sweepOldExportFiles(): Promise { + const MAX_AGE_MS = 24 * 60 * 60 * 1000; + if (!this.pkc.dataPath) return; + const exportsDir = path.join(this.pkc.dataPath, "exports"); + if (!existsSync(exportsDir)) return; + + const now = Date.now(); + let entries: string[] = []; + try { + entries = await fsPromises.readdir(exportsDir); + } catch { + return; + } + for (const entry of entries) { + if (!entry.endsWith(".sqlite")) continue; + const filePath = path.join(exportsDir, entry); + try { + const stat = await fsPromises.stat(filePath); + if (now - stat.mtimeMs > MAX_AGE_MS) await fsPromises.unlink(filePath); + } catch { + // file may have been removed concurrently — ignore + } + } + } + + private async _handleExportsHttpRequest(req: IncomingMessage, res: ServerResponse): Promise { + const httpLog = Logger("pkc-js:PKCWsServer:exports-http"); + const requestUrl = new URL(req.url ?? "/", "http://localhost"); + const match = requestUrl.pathname.match(/^\/exports\/([0-9a-fA-F-]{36})$/); + if (!match) { + res.statusCode = 404; + res.end(); + return; + } + if (req.method !== "GET") { + res.statusCode = 405; + res.setHeader("Allow", "GET"); + res.end(); + return; + } + const exportId = match[1]; + const owner = this._findCommunityOwningExport(exportId); + if (!owner) { + httpLog("GET /exports — unknown exportId", exportId); + res.statusCode = 404; + res.end(); + return; + } + const { community, record } = owner; + if (record.progress !== 1 || !record.url) { + res.statusCode = 404; + res.end(); + return; + } + + let filePath: string; + try { + const parsed = new URL(record.url); + if (parsed.protocol !== "file:") throw new Error("Record url is not file://"); + filePath = fileURLToPath(parsed); + } catch { + res.statusCode = 404; + res.end(); + return; + } + let size: number; + try { + size = statSync(filePath).size; + } catch { + res.statusCode = 404; + res.end(); + return; + } + + res.statusCode = 200; + res.setHeader("Content-Type", "application/vnd.sqlite3"); + res.setHeader("Content-Length", String(size)); + + const stream = createReadStream(filePath); + let streamEnded = false; + let streamErrored = false; + stream.on("end", () => { + streamEnded = true; + }); + stream.on("error", (err) => { + streamErrored = true; + httpLog.error("Error streaming export file", filePath, err); + if (!res.headersSent) res.statusCode = 500; + res.end(); + }); + res.on("close", () => { + // Cleanup only on fully streamed + flushed responses. Client aborts and stream + // errors leave the record + file intact so the consumer can retry. + if (streamErrored || !streamEnded || !res.writableEnded) return; + community._deleteExport(exportId).catch((e) => httpLog.error("Failed to cleanup downloaded export", exportId, e)); + }); + stream.pipe(res); + } + async unsubscribe(params: any, connectionId: string): Promise { const { subscriptionId } = parseRpcUnsubscribeParam(params[0]); @@ -1538,11 +1795,13 @@ class PKCWsServer extends TypedEmitter { await this.unsubscribe([{ subscriptionId: Number(subscriptionId) }], connectionId); this.ws.close(); + if (this._ownsHttpServer && this._httpServer) await new Promise((r) => this._httpServer!.close(() => r())); const pkc = await this._getPKCInstance(); await pkc.destroy(); // this will stop all started communities for (const communityAddress of remeda.keys.strict(this._startedCommunities)) { delete this._startedCommunities[communityAddress]; } + this._exportCommunityInstances.clear(); this._rpcStateDb?.close(); this._rpcStateDb = undefined; this._onSettingsChange = {}; @@ -1559,7 +1818,8 @@ const createPKCWsServer = async (options: CreatePKCWsServerOptions) => { server: parsedOptions.server, authKey: parsedOptions.authKey, startStartedCommunitiesOnStartup: parsedOptions.startStartedCommunitiesOnStartup, - autoStartConcurrency: parsedOptions.autoStartConcurrency + autoStartConcurrency: parsedOptions.autoStartConcurrency, + allowPrivateKeyExport: parsedOptions.allowPrivateKeyExport }); // Auto-start previously started communities (fire-and-forget, non-blocking) @@ -1567,6 +1827,11 @@ const createPKCWsServer = async (options: CreatePKCWsServerOptions) => { log.error("Failed to auto-start previous communities", e); }); + // Delete export files older than 24h (fire-and-forget) — see EXPORT_COMMUNITY_SPEC.md + pkcWss._sweepOldExportFiles().catch((e) => { + log.error("Failed to sweep old export files", e); + }); + return pkcWss; }; diff --git a/src/rpc/src/schema.ts b/src/rpc/src/schema.ts index 1a70ac3f9..9f60258f8 100644 --- a/src/rpc/src/schema.ts +++ b/src/rpc/src/schema.ts @@ -20,7 +20,10 @@ export const CreatePKCWsServerOptionsSchema = z startStartedCommunitiesOnStartup: z.boolean().optional(), // Controls how many communities are auto-started in parallel on boot. // 0 or 1 disables parallelism (sequential start). Default: 5 - autoStartConcurrency: z.number().int().nonnegative().optional() + autoStartConcurrency: z.number().int().nonnegative().optional(), + // community.export() policy: when false, the server rejects exportCommunity calls with + // includePrivateKey: true. Default true (private-RPC scope). Public-RPC operators set false. + allowPrivateKeyExport: z.boolean().optional() }) .merge(WsServerClassOptions) .loose(); diff --git a/src/runtime/node/community/local-community.ts b/src/runtime/node/community/local-community.ts index bfbff63d6..28e37de18 100644 --- a/src/runtime/node/community/local-community.ts +++ b/src/runtime/node/community/local-community.ts @@ -72,6 +72,15 @@ import { } from "./local-community/ipns-publishing.js"; import { deleteCommunity, start as lifecycleStart, stop as lifecycleStop, update as lifecycleUpdate } from "./local-community/lifecycle.js"; import { edit as editCommunity } from "./local-community/editing.js"; +import { + cancelExportEmbedded, + cloneExportRecord, + deleteExportRecord, + exportCommunityEmbedded, + loadAndPruneExportsFromKeyv +} from "./local-community/export.js"; +import type { InternalExportHandle } from "./local-community/export.js"; +import type { CommunityExportRecord, ExportCommunityUserOptions } from "../../../community/types.js"; // This is a sub we have locally in our pkc datapath, in a NodeJS environment export class LocalCommunity extends RpcLocalCommunity implements CreateNewLocalCommunityParsedOptions { @@ -122,6 +131,13 @@ export class LocalCommunity extends RpcLocalCommunity implements CreateNewLocalC _blocksToRm: string[] = []; _postsAllPageCids: AllPageCids | undefined = undefined; + // Community export (issue #79). _exports is the public list surfaced through + // `community.exports`; _activeExports tracks in-flight backups so pkc.destroy() can cancel + // them; _exportQueue serializes back-to-back exports on the same community per spec. + _exports: CommunityExportRecord[] = []; + _activeExports: Map = new Map(); + _exportQueue: Promise = Promise.resolve(); + constructor(pkc: PKC) { super(pkc); this.handleChallengeExchange = this.handleChallengeExchange.bind(this); @@ -371,6 +387,26 @@ export class LocalCommunity extends RpcLocalCommunity implements CreateNewLocalC return (await editCommunity(this, newCommunityOptions)) as typeof this; } + override get exports(): CommunityExportRecord[] { + return this._exports.map(cloneExportRecord); + } + + override async export(options?: ExportCommunityUserOptions): Promise<{ exportId: string }> { + return exportCommunityEmbedded(this, options); + } + + async _cancelExport(exportId: string): Promise { + return cancelExportEmbedded(this, exportId); + } + + async _deleteExport(exportId: string): Promise { + return deleteExportRecord(this, exportId); + } + + async _loadExportsFromKeyv(): Promise { + return loadAndPruneExportsFromKeyv(this); + } + // The three helpers below stay as methods (in addition to being free functions in their // respective modules) because integration tests in test/node/community/ monkey-patch // community._xxx = async () => { throw ... } to inject failures into the start/publish diff --git a/src/runtime/node/community/local-community/db-state.ts b/src/runtime/node/community/local-community/db-state.ts index d12bab179..22fbafbd9 100644 --- a/src/runtime/node/community/local-community/db-state.ts +++ b/src/runtime/node/community/local-community/db-state.ts @@ -148,6 +148,18 @@ export async function updateInstancePropsWithStartedCommunityOrDb(community: Loc await community.initInternalCommunityAfterFirstUpdateNoMerge(startedCommunity.toJSONInternalAfterFirstUpdate()); else await community.initInternalCommunityBeforeFirstUpdateNoMerge(startedCommunity.toJSONInternalBeforeFirstUpdate()); community.started = true; + // Always read exports from the keyv-backed DB rather than snapshotting from the started + // instance: keyv is the canonical persisted source and is invariant across a stale or + // cross-PKC `processStartedCommunities` match. + await community.initDbHandlerIfNeeded(); + try { + // Open the DB read-write — opening readonly silently fails to see WAL-mode writes + // committed by a still-open writer connection on the same file. + await community._dbHandler.initDbIfNeeded(); + await community._loadExportsFromKeyv(); + } finally { + community._dbHandler.destoryConnection(); + } } else { await community.initDbHandlerIfNeeded(); try { @@ -167,6 +179,7 @@ export async function updateInstancePropsWithStartedCommunityOrDb(community: Loc if (!community.signer) throw new PKCError("ERR_LOCAL_COMMUNITY_HAS_NO_SIGNER_IN_INTERNAL_STATE", { address: community.address }); + await community._loadExportsFromKeyv(); // Load community.exports from DB await community._updateStartedValue(); log("Loaded local community", community.address, "from db"); } catch (e) { diff --git a/src/runtime/node/community/local-community/export.ts b/src/runtime/node/community/local-community/export.ts new file mode 100644 index 000000000..f35604eef --- /dev/null +++ b/src/runtime/node/community/local-community/export.ts @@ -0,0 +1,260 @@ +import Logger from "../../../../logger.js"; +import path from "node:path"; +import { promises as fsPromises, existsSync } from "node:fs"; +import { pathToFileURL, fileURLToPath } from "node:url"; +import { v4 as uuidV4 } from "uuid"; +import { STORAGE_KEYS } from "../../../../constants.js"; +import { PKCError } from "../../../../pkc-error.js"; +import { backupCommunityDb, BackupAbortError } from "../../util.js"; +import type { LocalCommunity } from "../local-community.js"; +import type { CommunityExportRecord, ExportCommunityUserOptions } from "../../../../community/types.js"; + +// Internal handle for an in-flight export, held in LocalCommunity._activeExports. +// `controller` is internal — user-supplied signals are wired to abort it via the listener stored +// here so we can detach the listener on terminal transitions and avoid leaks. +export interface InternalExportHandle { + exportId: string; + controller: AbortController; + detachUserSignal?: () => void; + donePromise: Promise; +} + +const EXPORTS_KEY = STORAGE_KEYS[STORAGE_KEYS.EXPORTS]; + +function defaultExportPathFor(community: LocalCommunity, exportId: string): string { + if (typeof community._pkc.dataPath !== "string") + throw new PKCError("ERR_DATA_PATH_IS_NOT_DEFINED", { communityAddress: community.address }); + return path.join(community._pkc.dataPath, "exports", `${exportId}.sqlite`); +} + +function sourceDbPathFor(community: LocalCommunity): string { + if (typeof community._pkc.dataPath !== "string") + throw new PKCError("ERR_DATA_PATH_IS_NOT_DEFINED", { communityAddress: community.address }); + return path.join(community._pkc.dataPath, "communities", community.address); +} + +async function persistExports(community: LocalCommunity): Promise { + if (!community._dbHandler) return; + try { + await community._dbHandler.keyvSet(EXPORTS_KEY, community._exports); + } catch (e) { + Logger("pkc-js:local-community:export").error("Failed to persist exports to keyv", e); + } +} + +export function cloneExportRecord(record: CommunityExportRecord): CommunityExportRecord { + return { + ...record, + ...(record.error ? { error: { ...record.error } } : {}) + }; +} + +function snapshotExports(community: LocalCommunity): CommunityExportRecord[] { + return community._exports.map(cloneExportRecord); +} + +function emitExportsChange(community: LocalCommunity): void { + community.emit("exportschange", snapshotExports(community)); +} + +async function updateRecord(community: LocalCommunity, exportId: string, patch: Partial): Promise { + const idx = community._exports.findIndex((r) => r.exportId === exportId); + if (idx === -1) return; + community._exports[idx] = { ...community._exports[idx], ...patch }; + await persistExports(community); + emitExportsChange(community); +} + +async function runExportTask( + community: LocalCommunity, + exportId: string, + opts: { destPath: string; includePrivateKey: boolean; signal: AbortSignal } +) { + const log = Logger("pkc-js:local-community:export:run"); + const sourcePath = sourceDbPathFor(community); + + try { + if (opts.signal.aborted) throw new BackupAbortError(); + + let lastEmittedProgress = 0; + const { size, sha256 } = await backupCommunityDb({ + sourcePath, + destPath: opts.destPath, + includePrivateKey: opts.includePrivateKey, + signal: opts.signal, + onProgress: (progress) => { + // Throttle emissions so we don't churn keyv/listeners on every step + if (progress - lastEmittedProgress < 0.05 && progress < 0.99) return; + lastEmittedProgress = progress; + void updateRecord(community, exportId, { progress }); + } + }); + + await updateRecord(community, exportId, { + progress: 1, + size, + sha256, + url: pathToFileURL(opts.destPath).href + }); + log.trace("Export complete for community", community.address, "exportId", exportId); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + if (err instanceof BackupAbortError || opts.signal.aborted) { + await updateRecord(community, exportId, { + error: { code: "ERR_EXPORT_CANCELLED", message: message || "Export was cancelled" } + }); + } else { + log.error("Backup failed for community", community.address, "exportId", exportId, err); + await updateRecord(community, exportId, { + error: { code: "ERR_EXPORT_BACKUP_FAILED", message } + }); + } + } finally { + community._activeExports.delete(exportId); + } +} + +export async function exportCommunityEmbedded( + community: LocalCommunity, + options: ExportCommunityUserOptions = {} +): Promise<{ exportId: string }> { + // Sync validation + if (options.signal?.aborted) { + const reason = (options.signal as AbortSignal).reason; + throw reason ?? new DOMException("The operation was aborted.", "AbortError"); + } + + const exportId = uuidV4(); + const destPath = options.exportPath ? path.resolve(options.exportPath) : defaultExportPathFor(community, exportId); + const includePrivateKey = options.includePrivateKey === true; + + if (path.resolve(destPath) === path.resolve(sourceDbPathFor(community))) + throw new PKCError("ERR_EXPORT_PATH_TARGETS_LIVE_DB", { communityAddress: community.address, exportPath: destPath }); + + if (!community.publicKey) throw new PKCError("ERR_LOCAL_COMMUNITY_HAS_NO_SIGNER_IN_INTERNAL_STATE", { address: community.address }); + + const record: CommunityExportRecord = { + exportId, + ...(community.name ? { name: community.name } : {}), + publicKey: community.publicKey, + includePrivateKey, + progress: 0 + }; + community._exports.push(record); + await persistExports(community); + emitExportsChange(community); + + const controller = new AbortController(); + let detachUserSignal: (() => void) | undefined; + if (options.signal) { + const userSignal = options.signal; + const onAbort = () => controller.abort(userSignal.reason); + userSignal.addEventListener("abort", onAbort, { once: true }); + detachUserSignal = () => userSignal.removeEventListener("abort", onAbort); + } + + // Serialize per-community: each export waits for the prior queue entry before running. + // Using a chained Promise guarantees ordering even under high concurrency. + const previousQueue = community._exportQueue; + let resolveDone!: () => void; + const donePromise = new Promise((res) => { + resolveDone = res; + }); + community._activeExports.set(exportId, { exportId, controller, detachUserSignal, donePromise }); + + community._exportQueue = previousQueue + .then(async () => { + // Skip the actual work if the record was already cancelled while queued + const current = community._exports.find((r) => r.exportId === exportId); + if (!current || current.error) return; + if (controller.signal.aborted) { + await updateRecord(community, exportId, { + error: { code: "ERR_EXPORT_CANCELLED", message: "Export was cancelled before it could start" } + }); + return; + } + await runExportTask(community, exportId, { destPath, includePrivateKey, signal: controller.signal }); + }) + .catch(() => { + // runExportTask already records errors on the record; swallow so the queue stays alive + }) + .finally(() => { + detachUserSignal?.(); + resolveDone(); + }); + + return { exportId }; +} + +export async function cancelExportEmbedded(community: LocalCommunity, exportId: string): Promise { + const handle = community._activeExports.get(exportId); + if (!handle) return; + handle.controller.abort(); + try { + await handle.donePromise; + } catch { + // already recorded + } +} + +// Removes an export record from `community._exports`, deletes its backing file (if it lives +// under a file:// URL), persists to KeyV, and emits `exportschange`. Used by the RPC HTTP +// download endpoint to clean up after a successful download. +export async function deleteExportRecord(community: LocalCommunity, exportId: string): Promise { + const idx = community._exports.findIndex((r) => r.exportId === exportId); + if (idx === -1) return; + const record = community._exports[idx]; + + if (record.url) { + try { + const parsed = new URL(record.url); + if (parsed.protocol === "file:") await fsPromises.unlink(fileURLToPath(parsed)).catch(() => {}); + } catch { + // Malformed URL — ignore; we still drop the record. + } + } + + community._exports.splice(idx, 1); + await persistExports(community); + emitExportsChange(community); +} + +// Called on community load to: +// (a) hydrate community._exports from KeyV, and +// (b) prune records whose backing file no longer exists on disk (e.g. user deleted it). +export async function loadAndPruneExportsFromKeyv(community: LocalCommunity): Promise { + if (!community._dbHandler) return; + let stored: CommunityExportRecord[] | undefined; + try { + stored = community._dbHandler.keyvHas(EXPORTS_KEY) ? community._dbHandler.keyvGet(EXPORTS_KEY) ?? [] : []; + } catch (e) { + Logger("pkc-js:local-community:export").error("Failed to load exports from keyv", e); + return; + } + const pruned: CommunityExportRecord[] = []; + let didPrune = false; + for (const record of stored) { + // Drop records that never reached completion (in-flight crashes) or whose file is missing + if (record.progress === 1 && record.url) { + try { + const parsed = new URL(record.url); + if (parsed.protocol === "file:" && !existsSync(fileURLToPath(parsed))) { + didPrune = true; + continue; + } + } catch { + didPrune = true; + continue; + } + pruned.push(record); + } else if (record.error) { + // Terminal failure record — keep so the user can inspect it + pruned.push(record); + } else { + // Was in-flight when the process exited; we can't resume, so drop + didPrune = true; + } + } + community._exports = pruned; + if (didPrune) await persistExports(community); +} diff --git a/src/runtime/node/community/local-community/lifecycle.ts b/src/runtime/node/community/local-community/lifecycle.ts index bf0aa3b96..32d138905 100644 --- a/src/runtime/node/community/local-community/lifecycle.ts +++ b/src/runtime/node/community/local-community/lifecycle.ts @@ -124,6 +124,7 @@ export async function start(community: LocalCommunity) { await community._dbHandler.initDbIfNeeded(); await community._dbHandler.createOrMigrateTablesIfNeeded(); await updateInstanceStateWithDbState(community); // sync in-memory state after potential migration + await community._loadExportsFromKeyv(); await setChallengesToDefaultIfNotDefined(community, log); // Import community keys onto ipfs node diff --git a/src/runtime/node/util.ts b/src/runtime/node/util.ts index 38f1f3dcd..8c89ed60f 100644 --- a/src/runtime/node/util.ts +++ b/src/runtime/node/util.ts @@ -1,4 +1,15 @@ -import { existsSync, readdirSync, openSync, readSync, closeSync, rm as rmSync, watch as fsWatch, promises as fsPromises } from "node:fs"; +import { + existsSync, + readdirSync, + openSync, + readSync, + closeSync, + rm as rmSync, + watch as fsWatch, + promises as fsPromises, + createReadStream +} from "node:fs"; +import { createHash } from "node:crypto"; import { default as nodeNativeFunctions } from "./native-functions.js"; import type { KuboRpcClient, NativeFunctions } from "../../types.js"; import path from "path"; @@ -313,6 +324,129 @@ export async function importSignerIntoKuboNode( }); } +export type BackupCommunityDbOptions = { + sourcePath: string; + destPath: string; + includePrivateKey: boolean; + onProgress?: (progress: number) => void; + signal?: AbortSignal; +}; + +// Cancellation sentinel thrown from the better-sqlite3 progress handler when the AbortSignal fires. +// Re-thrown by backupCommunityDb so callers can distinguish abort from genuine backup failures. +export class BackupAbortError extends Error { + override readonly name = "BackupAbortError"; + constructor() { + super("Backup aborted via AbortSignal"); + } +} + +export async function backupCommunityDb(opts: BackupCommunityDbOptions): Promise<{ size: number; sha256: string }> { + const { sourcePath, destPath, includePrivateKey, onProgress, signal } = opts; + if (signal?.aborted) throw new BackupAbortError(); + + await fsPromises.mkdir(path.dirname(destPath), { recursive: true }); + const partialPath = destPath + ".partial"; + + try { + await fsPromises.unlink(partialPath); + } catch { + // .partial may not exist; ignore + } + + let sourceDb: Database.Database | undefined; + try { + sourceDb = new Database(sourcePath, { fileMustExist: true, readonly: true }); + + await sourceDb.backup(partialPath, { + progress: ({ totalPages, remainingPages }) => { + if (signal?.aborted) throw new BackupAbortError(); + if (onProgress && totalPages > 0) { + // The final progress invocation comes with remainingPages === 0; we still + // intentionally report < 1 here so the "complete" emission is owned by the + // caller after sha256 + rename are done. + const fraction = (totalPages - remainingPages) / totalPages; + onProgress(Math.min(0.99, fraction)); + } + return 100; // pages-per-step rate; mirrors better-sqlite3's internal default + } + }); + + sourceDb.close(); + sourceDb = undefined; + + if (signal?.aborted) throw new BackupAbortError(); + + if (!includePrivateKey) { + await scrubPrivateKeyFromBackup(partialPath); + if (signal?.aborted) throw new BackupAbortError(); + } + + const { size, sha256 } = await statAndHashFile(partialPath); + if (signal?.aborted) throw new BackupAbortError(); + + await fsPromises.rename(partialPath, destPath); + + return { size, sha256 }; + } catch (err) { + try { + sourceDb?.close(); + } catch { + // already closed + } + try { + await fsPromises.unlink(partialPath); + } catch { + // already gone + } + throw err; + } +} + +type KeyvSignerPayload = { + value: { signer?: { privateKey?: unknown; ipfsKey?: unknown; [k: string]: unknown }; [k: string]: unknown }; + expires?: number | null; +}; + +function isKeyvSignerPayload(v: unknown): v is KeyvSignerPayload { + if (typeof v !== "object" || v === null) return false; + const value = (v as { value?: unknown }).value; + if (typeof value !== "object" || value === null) return false; + const signer = (value as { signer?: unknown }).signer; + return signer === undefined || (typeof signer === "object" && signer !== null); +} + +// Scope: only the community signer's privateKey/ipfsKey (in the internalCommunity KeyV record). +// The pseudonymityAliases table stores per-comment aliasPrivateKey values and is intentionally +// NOT scrubbed — those keys belong to the alias identities themselves and are part of the +// publication record, not the community's own signing material. +async function scrubPrivateKeyFromBackup(dbPath: string): Promise { + const db = new Database(dbPath, { fileMustExist: true }); + try { + const internalKey = "keyv:" + STORAGE_KEYS[STORAGE_KEYS.INTERNAL_COMMUNITY]; + const row = db.prepare("SELECT value FROM keyv WHERE key = ?").get(internalKey) as { value: string } | undefined; + if (!row) return; + const parsed: unknown = JSON.parse(row.value); + if (!isKeyvSignerPayload(parsed) || !parsed.value.signer) return; + parsed.value.signer = { ...parsed.value.signer, privateKey: undefined, ipfsKey: undefined }; + db.prepare("UPDATE keyv SET value = ? WHERE key = ?").run(JSON.stringify(parsed), internalKey); + } finally { + db.close(); + } +} + +async function statAndHashFile(filePath: string): Promise<{ size: number; sha256: string }> { + const stat = await fsPromises.stat(filePath); + const hash = createHash("sha256"); + await new Promise((resolve, reject) => { + const stream = createReadStream(filePath); + stream.on("data", (chunk) => hash.update(chunk)); + stream.on("end", resolve); + stream.on("error", reject); + }); + return { size: stat.size, sha256: hash.digest("hex") }; +} + export async function moveCommunityDbToDeletedDirectory(communityAddress: string, pkc: PKC) { if (typeof pkc.dataPath !== "string") throw Error("pkc.dataPath is not defined"); diff --git a/test/node-and-browser/pkc/test.configs.pkc.test.ts b/test/node-and-browser/pkc/test.configs.pkc.test.ts index e343c8977..114df11bf 100644 --- a/test/node-and-browser/pkc/test.configs.pkc.test.ts +++ b/test/node-and-browser/pkc/test.configs.pkc.test.ts @@ -1,6 +1,8 @@ import { describe, it, beforeAll, afterAll, expect } from "vitest"; -import { getAvailablePKCConfigsToTestAgainst, isRpcFlagOn, isRunningInBrowser } from "../../../dist/node/test/test-util.js"; +import PKCFactory from "../../../dist/node/index.js"; +import { getAvailablePKCConfigsToTestAgainst, isRpcFlagOn, isRunningInBrowser, mockPKC } from "../../../dist/node/test/test-util.js"; import { itIfRpc } from "../../helpers/conditional-tests.js"; +import { getDefaultDataPath } from "../../../dist/node/runtime/node/util.js"; import type { PKC } from "../../../dist/node/pkc/pkc.js"; const DEFAULT_IPFS_GATEWAYS = ["https://ipfsgateway.xyz", "https://gateway.plebpubsub.xyz", "https://gateway.forumindex.com"]; @@ -145,3 +147,24 @@ describe.concurrent("getAvailablePKCConfigsToTestAgainst", () => { }); }); }); + +describe("mockPKC respects caller-supplied transport options", () => { + it("PKC({ pkcRpcClientsOptions, dataPath }) retains both options when caller provides both", async () => { + // Providing both an RPC URL and a dataPath should be permissible — the resulting PKC + // should connect to the RPC server while keeping dataPath available for any local + // operations the caller intends. Today PKC's constructor silently drops dataPath when + // pkcRpcClientsOptions is set (src/pkc/pkc.ts:299-301), so this assertion fails and + // documents the bug. + const rpcUrl = "ws://127.0.0.1:39652"; + const dataPath = getDefaultDataPath(); + const pkc = await PKCFactory({ pkcRpcClientsOptions: [rpcUrl], dataPath, httpRoutersOptions: [] }); + pkc.on("error", () => {}); // swallow async RPC errors so they don't fail the test + try { + expect(pkc.pkcRpcClientsOptions).to.deep.equal([rpcUrl]); + expect(pkc.dataPath).to.equal(dataPath); + expect(Object.keys(pkc.clients.pkcRpcClients)).to.deep.equal([rpcUrl]); + } finally { + await pkc.destroy(); + } + }); +}); diff --git a/test/node/community/export.test.ts b/test/node/community/export.test.ts new file mode 100644 index 000000000..16286be54 --- /dev/null +++ b/test/node/community/export.test.ts @@ -0,0 +1,572 @@ +// Tests for community.export() — embedded LocalCommunity and RPC RpcLocalCommunity. +// Issue: https://github.com/pkcprotocol/pkc-js/issues/79 +// Spec: src/rpc/EXPORT_COMMUNITY_SPEC.md +// +// The matrix runs each test under whichever pkc-config the test runner selected +// (local-kubo-rpc for embedded, remote-pkc-rpc for RPC). Tests that exercise +// embedded-only semantics (fs straggler checks, exportPath, pkc.destroy() cancellation) +// are individually gated; tests that exercise RPC-only behaviors live in the +// RPC-only describe block at the bottom. +import { describe, beforeAll, afterAll, expect, it } from "vitest"; +import { fileURLToPath } from "node:url"; +import { promises as fsPromises, existsSync, createReadStream, createWriteStream } from "node:fs"; +import { createHash } from "node:crypto"; +import { pipeline } from "node:stream/promises"; +import path from "node:path"; +import Database from "better-sqlite3"; +import { + mockPKC, + mockPKCNoDataPathWithOnlyKuboClient, + mockRpcRemotePKC, + createSubWithNoChallenge, + publishRandomPost, + resolveWhenConditionIsTrue, + getAvailablePKCConfigsToTestAgainst +} from "../../../dist/node/test/test-util.js"; +import { itSkipIfRpc, itIfRpc, describeIfRpc } from "../../helpers/conditional-tests.js"; +import type { PKC as PKCType } from "../../../dist/node/pkc/pkc.js"; +import type { LocalCommunity } from "../../../dist/node/runtime/node/community/local-community.js"; +import { RpcLocalCommunity } from "../../../dist/node/community/rpc-local-community.js"; +import type { CommunityExportRecord } from "../../../dist/node/community/types.js"; + +// Either flavor of community has `.export()`, `.exports`, `.signer`, and emits `exportschange`. +type AnyLocalCommunity = LocalCommunity | RpcLocalCommunity; + +// Narrow shape of the signer entry inside the exported community's internalCommunity KeyV record. +// Private material is undefined after scrubbing; public material is always present. +interface ExportedSigner { + privateKey?: string; + ipfsKey?: Uint8Array; + publicKey?: string; + address?: string; +} + +async function hashFile(p: string): Promise { + const hash = createHash("sha256"); + await new Promise((resolve, reject) => { + const s = createReadStream(p); + s.on("data", (chunk) => hash.update(chunk)); + s.on("end", () => resolve()); + s.on("error", reject); + }); + return hash.digest("hex"); +} + +// URL-agnostic helper: downloads HTTP-served exports to a local temp path so the rest of the +// assertions (sha256 verification, sqlite open) don't care whether the record came from the +// embedded path (file://) or the RPC HTTP endpoint (http://). +async function materializeExport(rec: CommunityExportRecord): Promise<{ filePath: string; cleanup: () => Promise }> { + if (!rec.url) throw new Error("Export record has no url"); + const parsed = new URL(rec.url); + if (parsed.protocol === "file:") return { filePath: fileURLToPath(parsed), cleanup: async () => {} }; + const out = path.join(".tmp", "test-downloads", `${rec.exportId}.sqlite`); + await fsPromises.mkdir(path.dirname(out), { recursive: true }); + const res = await fetch(rec.url); + if (!res.ok) throw new Error(`Download failed: ${res.status}`); + if (!res.body) throw new Error("No body in fetch response"); + await pipeline(res.body as unknown as NodeJS.ReadableStream, createWriteStream(out)); + return { filePath: out, cleanup: async () => fsPromises.rm(out, { force: true }) }; +} + +interface WaitForRecordOptions { + community: AnyLocalCommunity; + exportId: string; + predicate: (r: CommunityExportRecord | undefined) => boolean; + timeoutMs?: number; +} + +async function waitForRecord({ + community, + exportId, + predicate, + timeoutMs = 30_000 +}: WaitForRecordOptions): Promise { + let timer: NodeJS.Timeout | undefined; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(`Timed out waiting on export ${exportId} predicate`)), timeoutMs); + }); + // TODO can we actually await here till we get a url instead? I dont like waiting for community.exports we need to verify events are emitted properly + // speaking of events, are we verifying events are emitted properly along with their fields? We need to test them explicitly + const wait = resolveWhenConditionIsTrue({ + toUpdate: community, + eventName: "exportschange", + predicate: async () => predicate(community.exports.find((r) => r.exportId === exportId)) + }); + try { + await Promise.race([wait, timeout]); + } finally { + clearTimeout(timer); + } + return community.exports.find((r) => r.exportId === exportId); +} + +interface WaitForCompleteRecordOptions { + community: AnyLocalCommunity; + exportId: string; + timeoutMs?: number; +} + +async function waitForCompleteRecord({ + community, + exportId, + timeoutMs = 30_000 +}: WaitForCompleteRecordOptions): Promise { + const rec = await waitForRecord({ + community, + exportId, + predicate: (r) => r?.progress === 1 || Boolean(r?.error), + timeoutMs + }); + if (rec?.error) throw new Error(`Export failed: ${rec.error.code}: ${rec.error.message}`); + return rec!; +} + +// Portable test bodies — shared between the started-community matrix (embedded + RPC) and the +// non-started RPC sub-suite below. Each test body grabs pkc/community/isEmbedded via the getter +// so it sees the values populated by the enclosing beforeAll, not stale undefineds from module +// load time. +function defineExportTests(getCtx: () => { pkc: PKCType; community: AnyLocalCommunity; isEmbedded: boolean }) { + it("happy path: file is reachable, sha256 matches, sqlite is readable", async () => { + const { community } = getCtx(); + const { exportId } = await community.export(); + const rec = await waitForCompleteRecord({ community, exportId }); + expect(rec.url).toBeDefined(); + + const { filePath, cleanup } = await materializeExport(rec); + try { + const recomputed = await hashFile(filePath); + expect(recomputed).to.equal(rec.sha256); + const stat = await fsPromises.stat(filePath); + expect(stat.size).to.equal(rec.size); + const db = new Database(filePath, { readonly: true }); + try { + const tables = db.prepare("SELECT name FROM sqlite_master WHERE type='table'").all() as { name: string }[]; + expect(tables.some((t) => t.name === "comments")).to.equal(true); + expect(tables.some((t) => t.name === "keyv")).to.equal(true); + } finally { + db.close(); + } + } finally { + await cleanup(); + } + }); + + it("includePrivateKey: false (default) scrubs the signer.privateKey", async () => { + const { community } = getCtx(); + const { exportId } = await community.export(); + const rec = await waitForCompleteRecord({ community, exportId }); + const { filePath, cleanup } = await materializeExport(rec); + try { + const db = new Database(filePath, { readonly: true }); + try { + const row = db.prepare("SELECT value FROM keyv WHERE key = ?").get("keyv:INTERNAL_COMMUNITY") as + | { value: string } + | undefined; + expect(row).toBeDefined(); + const parsed = JSON.parse(row!.value) as { value: { signer: ExportedSigner } }; + expect(parsed.value.signer.privateKey).to.equal(undefined); + expect(parsed.value.signer.ipfsKey).to.equal(undefined); + expect(typeof parsed.value.signer.publicKey).to.equal("string"); + expect(typeof parsed.value.signer.address).to.equal("string"); + } finally { + db.close(); + } + } finally { + await cleanup(); + } + }); + + it("includePrivateKey: true preserves the signer.privateKey", async () => { + const { community } = getCtx(); + const { exportId } = await community.export({ includePrivateKey: true }); + const rec = await waitForCompleteRecord({ community, exportId }); + const { filePath, cleanup } = await materializeExport(rec); + try { + const db = new Database(filePath, { readonly: true }); + try { + const row = db.prepare("SELECT value FROM keyv WHERE key = ?").get("keyv:INTERNAL_COMMUNITY") as + | { value: string } + | undefined; + expect(row).toBeDefined(); + const parsed = JSON.parse(row!.value) as { value: { signer: ExportedSigner } }; + // Under embedded mode the client holds the same privateKey the server wrote, so + // we can do strict equality. The RPC client receives a scrubbed signer (no + // privateKey transmitted on the wire), so we settle for an "is a string" check. + const clientSigner = community.signer as { privateKey?: string } | undefined; + if (clientSigner?.privateKey !== undefined) { + expect(parsed.value.signer.privateKey).to.equal(clientSigner.privateKey); + } else { + expect(typeof parsed.value.signer.privateKey).to.equal("string"); + expect(parsed.value.signer.privateKey!.length).to.be.greaterThan(0); + } + } finally { + db.close(); + } + } finally { + await cleanup(); + } + }); + + // RPC: skipped — exportPath is embedded-only by spec. The RPC-only suite below verifies + // that passing exportPath through an RPC client rejects synchronously. + itSkipIfRpc("exportPath option writes to the caller-supplied location", async () => { + const { community } = getCtx(); + const customPath = path.join( + await fsPromises.mkdtemp(path.join((await import("node:os")).tmpdir(), "pkc-export-")), + "custom.sqlite" + ); + const { exportId } = await community.export({ exportPath: customPath }); + const rec = await waitForCompleteRecord({ community, exportId }); + expect(existsSync(customPath)).to.equal(true); + expect(fileURLToPath(new URL(rec.url!))).to.equal(customPath); + await fsPromises.rm(customPath, { force: true }); + }); + + it("exportschange fires for every transition with the full list", async () => { + const { community } = getCtx(); + const seen: CommunityExportRecord[][] = []; + const listener = (records: CommunityExportRecord[]) => seen.push(records); + community.on("exportschange", listener); + try { + const { exportId } = await community.export(); + await waitForCompleteRecord({ community, exportId }); + // At least: initial 0-progress emission + complete emission. Progress emissions may + // be throttled to 0 if the DB is small enough that the backup finishes in one transfer. + expect(seen.length).to.be.greaterThanOrEqual(2); + const last = seen[seen.length - 1].find((r) => r.exportId === exportId); + expect(last?.progress).to.equal(1); + } finally { + community.removeListener("exportschange", listener); + } + }); + + it("cancellation via AbortSignal records ERR_EXPORT_CANCELLED", async () => { + const { pkc, community, isEmbedded } = getCtx(); + const ac = new AbortController(); + const { exportId } = await community.export({ signal: ac.signal }); + ac.abort(); + const rec = await waitForRecord({ community, exportId, predicate: (r) => Boolean(r?.error || r?.progress === 1) }); + expect(rec?.error?.code).to.equal("ERR_EXPORT_CANCELLED"); + + // Embedded-only: verify no straggler files on the local fs. Under RPC the server's + // exports directory is not the client's, so this assertion would be meaningless. + if (isEmbedded && pkc.dataPath) { + const exportsDir = path.join(pkc.dataPath, "exports"); + if (existsSync(exportsDir)) { + const stragglers = (await fsPromises.readdir(exportsDir)).filter((f) => f.includes(exportId)); + expect(stragglers).to.deep.equal([]); + } + } + }); + + it("pre-aborted signal rejects synchronously without creating a record", async () => { + const { community } = getCtx(); + const ac = new AbortController(); + ac.abort(new Error("nope")); + const exportsBefore = community.exports.length; + await expect(community.export({ signal: ac.signal })).rejects.toThrow(); + expect(community.exports.length).to.equal(exportsBefore); + }); + + it("two concurrent exports of the same community both reach progress=1", async () => { + const { community } = getCtx(); + const [{ exportId: a }, { exportId: b }] = await Promise.all([community.export(), community.export()]); + const recA = await waitForCompleteRecord({ community, exportId: a }); + const recB = await waitForCompleteRecord({ community, exportId: b }); + expect(recA.progress).to.equal(1); + expect(recB.progress).to.equal(1); + expect(recA.exportId).to.not.equal(recB.exportId); + + const matA = await materializeExport(recA); + try { + expect(existsSync(matA.filePath)).to.equal(true); + } finally { + await matA.cleanup(); + } + const matB = await materializeExport(recB); + try { + expect(existsSync(matB.filePath)).to.equal(true); + } finally { + await matB.cleanup(); + } + }); +} + +// Started-community matrix — embedded (local-kubo-rpc) and RPC (remote-pkc-rpc). +getAvailablePKCConfigsToTestAgainst().map((config) => { + describe(`community.export() — ${config.name} (started)`, async () => { + let pkc: PKCType; + let community: AnyLocalCommunity; + + beforeAll(async () => { + pkc = await config.pkcInstancePromise(); + community = (await createSubWithNoChallenge({}, pkc)) as AnyLocalCommunity; + await community.start(); + await resolveWhenConditionIsTrue({ toUpdate: community, predicate: async () => typeof community.updatedAt === "number" }); + await publishRandomPost({ communityAddress: community.address, pkc }); + }); + + afterAll(async () => { + await community.stop(); + await pkc.destroy(); + }); + + defineExportTests(() => ({ pkc, community, isEmbedded: config.testConfigCode === "local-kubo-rpc" })); + }); +}); + +// Non-started community over RPC — keeps the _exportCommunityInstances cache honest. When the +// community isn't in _startedCommunities, the server's _resolveLocalCommunityForExport falls +// through to pkc.createCommunity, which constructs a fresh LocalCommunity per call. Without the +// cache, exportCommunity and the eager exportsSubscribe would land on different instances and +// the export's exportschange would never reach the subscription's listener — every poll-based +// assertion here would time out. +describeIfRpc(`community.export() — RPC, non-started`, async () => { + let pkc: PKCType; + let community: RpcLocalCommunity; + + beforeAll(async () => { + // Bootstrap: start once via pkcA so we can publish a post (publishing requires the + // community running), then stop and disconnect. The community lives on disk after this. + const pkcA = await mockRpcRemotePKC(); + const commA = (await createSubWithNoChallenge({}, pkcA)) as RpcLocalCommunity; + await commA.start(); + await resolveWhenConditionIsTrue({ toUpdate: commA, predicate: async () => typeof commA.updatedAt === "number" }); + await publishRandomPost({ communityAddress: commA.address, pkc: pkcA }); + await commA.stop(); + const address = commA.address; + await pkcA.destroy(); + + // Fresh client. Do NOT call community.start() — every export call from this point hits + // the non-started branch of _resolveLocalCommunityForExport and uses the cache. + pkc = await mockRpcRemotePKC(); + community = (await pkc.createCommunity({ address })) as RpcLocalCommunity; + }); + + afterAll(async () => { + await pkc.destroy(); + }); + + defineExportTests(() => ({ pkc, community, isEmbedded: false })); +}); + +// RemoteCommunity rejection — same observable contract on both transports, but the setups +// don't share much, so each transport gets its own test. The embedded test spins up two +// separate PKCs (one with a dataPath, one without) to construct a read-only RemoteCommunity. +// The RPC test asks the daemon for an address the daemon doesn't host, which is what causes +// pkc-with-rpc-client.ts to return an RpcRemoteCommunity. Both variants assert the same +// ERR_COMMUNITY_NOT_LOCAL throw from the base RemoteCommunity.export(). +describe(`community.export() — error paths`, async () => { + itSkipIfRpc("a read-only RemoteCommunity rejects with ERR_COMMUNITY_NOT_LOCAL", async () => { + const pkc1 = await mockPKC({}); + const localComm = (await createSubWithNoChallenge({}, pkc1)) as LocalCommunity; + await localComm.start(); + await resolveWhenConditionIsTrue({ toUpdate: localComm, predicate: async () => typeof localComm.updatedAt === "number" }); + + const pkc2 = await mockPKCNoDataPathWithOnlyKuboClient(); + const remoteComm = await pkc2.createCommunity({ address: localComm.address }); + try { + await expect(remoteComm.export()).rejects.toMatchObject({ code: "ERR_COMMUNITY_NOT_LOCAL" }); + } finally { + await localComm.stop(); + await pkc1.destroy(); + await pkc2.destroy(); + } + }); + + itIfRpc("an RpcRemoteCommunity rejects with ERR_COMMUNITY_NOT_LOCAL", async () => { + const pkc = await mockRpcRemotePKC(); + try { + // Fresh signer → address the RPC server has never seen as a community. With the + // address absent from rpcCommunities, pkc-with-rpc-client.ts returns an + // RpcRemoteCommunity (which doesn't override export()), so the call hits the base + // RemoteCommunity.export() rejection client-side. + const freshSigner = await pkc.createSigner(); + const remoteComm = await pkc.createCommunity({ address: freshSigner.address }); + expect(remoteComm).not.toBeInstanceOf(RpcLocalCommunity); + await expect(remoteComm.export()).rejects.toMatchObject({ code: "ERR_COMMUNITY_NOT_LOCAL" }); + } finally { + await pkc.destroy(); + } + }); +}); + +// Persistence: a fresh instance for the same community sees prior exports. Under embedded this +// goes through KeyV reload on a fresh PKC pointing at the same dataPath; under RPC it goes +// through the initial exportsSubscribe notification from a fresh client to the same server. +getAvailablePKCConfigsToTestAgainst().map((config) => { + describe(`pkc.createCommunity loads community.exports — ${config.name}`, async () => { + it("a fresh instance for the same community sees prior exports", async () => { + const pkc = await config.pkcInstancePromise(); + const first = (await createSubWithNoChallenge({}, pkc)) as AnyLocalCommunity; + await first.start(); + await resolveWhenConditionIsTrue({ toUpdate: first, predicate: async () => typeof first.updatedAt === "number" }); + await publishRandomPost({ communityAddress: first.address, pkc }); + + const { exportId } = await first.export(); + const completed = await waitForCompleteRecord({ community: first, exportId }); + expect(completed.progress).to.equal(1); + + // Sibling instance in the same pkc mirrors from the started instance + const sibling = (await pkc.createCommunity({ address: first.address })) as AnyLocalCommunity; + expect(sibling.exports.find((r) => r.exportId === exportId)?.progress).to.equal(1); + + await first.stop(); + + // Fresh PKC pointing at the same daemon. Embedded: same dataPath. RPC: fresh client. + const pkc2 = + config.testConfigCode === "local-kubo-rpc" ? await mockPKC({ dataPath: pkc.dataPath }) : await config.pkcInstancePromise(); + try { + const reloaded = (await pkc2.createCommunity({ address: first.address })) as AnyLocalCommunity; + const rec = reloaded.exports.find((r) => r.exportId === exportId); + expect(rec?.progress).to.equal(1); + expect(rec?.sha256).to.equal(completed.sha256); + } finally { + await pkc2.destroy(); + await pkc.destroy(); + } + }); + }); +}); + +// pkc.destroy() cancellation: embedded-only by spec. RPC client disconnect must NOT cancel +// the server's in-flight exports — the feature exists partly to survive disconnects. The +// disconnect-then-reconnect behavior is covered in the RPC-only suite below. +describe(`pkc.destroy() cancels in-flight exports`, async () => { + itSkipIfRpc("aborts active exports and resolves cleanly", async () => { + const pkc = await mockPKC({}); + const community = (await createSubWithNoChallenge({}, pkc)) as LocalCommunity; + await community.start(); + await resolveWhenConditionIsTrue({ toUpdate: community, predicate: async () => typeof community.updatedAt === "number" }); + await publishRandomPost({ communityAddress: community.address, pkc }); + + const { exportId } = await community.export(); + await pkc.destroy(); + + const finalRec = community.exports.find((r) => r.exportId === exportId); + if (finalRec) { + const isTerminal = finalRec.progress === 1 || Boolean(finalRec.error); + expect(isTerminal).to.equal(true); + } + }); + + // The RPC counterpart to this — surviving disconnect mid-export — lives in the RPC-only + // suite below at "client disconnect mid-export". Under embedded the destroy cancels the + // export instead of letting it continue, so loadAndPruneExportsFromKeyv must keep the + // resulting terminal-error record visible to a freshly constructed PKC on the same dataPath. + itSkipIfRpc("a fresh PKC sees the in-flight record after destroy() persists its terminal state", async () => { + const pkc = await mockPKC({}); + const community = (await createSubWithNoChallenge({}, pkc)) as LocalCommunity; + await community.start(); + await resolveWhenConditionIsTrue({ toUpdate: community, predicate: async () => typeof community.updatedAt === "number" }); + for (let i = 0; i < 10; i++) await publishRandomPost({ communityAddress: community.address, pkc }); + + const { exportId } = await community.export(); + await waitForRecord({ community, exportId, predicate: (r) => r !== undefined, timeoutMs: 5_000 }); + + const dataPath = pkc.dataPath; + const address = community.address; + await pkc.destroy(); + + const pkc2 = await mockPKC({ dataPath }); + try { + const reloaded = (await pkc2.createCommunity({ address })) as LocalCommunity; + const rec = reloaded.exports.find((r) => r.exportId === exportId); + expect(rec).toBeDefined(); + const isTerminal = rec!.progress === 1 || Boolean(rec!.error); + expect(isTerminal).to.equal(true); + if (rec!.error) expect(rec!.error.code).to.equal("ERR_EXPORT_CANCELLED"); + } finally { + await pkc2.destroy(); + } + }); +}); + +// RPC-only behaviors — only the wire transport exposes these. +describeIfRpc(`community.export() — RPC-only`, async () => { + let pkc: PKCType; + let community: RpcLocalCommunity; + + beforeAll(async () => { + pkc = await mockRpcRemotePKC(); + community = (await createSubWithNoChallenge({}, pkc)) as RpcLocalCommunity; + await community.start(); + await resolveWhenConditionIsTrue({ toUpdate: community, predicate: async () => typeof community.updatedAt === "number" }); + await publishRandomPost({ communityAddress: community.address, pkc }); + }); + + afterAll(async () => { + await community.stop(); + await pkc.destroy(); + }); + + it("exportPath rejects synchronously with ERR_EXPORT_PATH_NOT_SUPPORTED_OVER_RPC", async () => { + await expect(community.export({ exportPath: "/tmp/whatever.sqlite" })).rejects.toMatchObject({ + code: "ERR_EXPORT_PATH_NOT_SUPPORTED_OVER_RPC" + }); + }); + + it("HTTP GET /exports/ returns 404", async () => { + const httpOrigin = pkc._pkcRpcClient!.rpcHttpOrigin; + const res = await fetch(`${httpOrigin}/exports/00000000-0000-0000-0000-000000000000`); + expect(res.status).to.equal(404); + await res.body?.cancel(); + }); + + it("download cleans up the export server-side", async () => { + const { exportId } = await community.export(); + const rec = await waitForCompleteRecord({ community, exportId }); + const res = await fetch(rec.url!); + expect(res.status).to.equal(200); + // Consume the body fully so the server fires its cleanup hook after the stream finishes. + await res.arrayBuffer(); + + // Wait for the cleanup-driven exportschange notification to propagate to the client. + try { + await waitForRecord({ community, exportId, predicate: (r) => r === undefined, timeoutMs: 5_000 }); + } catch { + // Either the notification arrived (predicate met) or it didn't — the next assertion + // is authoritative. + } + expect(community.exports.find((r) => r.exportId === exportId)).to.equal(undefined); + + // A second download attempt for the same exportId should now 404. + const second = await fetch(rec.url!); + expect(second.status).to.equal(404); + await second.body?.cancel(); + }); + + it("client disconnect mid-export: reconnecting client sees the record (survives disconnect)", async () => { + // Publish multiple posts so the backup spans multiple progress emissions and the export + // is more likely to still be in-flight when we disconnect. + const heavyPkc = await mockRpcRemotePKC(); + const heavyComm = (await createSubWithNoChallenge({}, heavyPkc)) as RpcLocalCommunity; + await heavyComm.start(); + await resolveWhenConditionIsTrue({ toUpdate: heavyComm, predicate: async () => typeof heavyComm.updatedAt === "number" }); + for (let i = 0; i < 10; i++) await publishRandomPost({ communityAddress: heavyComm.address, pkc: heavyPkc }); + + const { exportId } = await heavyComm.export(); + // The record must be observable before we disconnect; the export may already be complete, + // both outcomes are acceptable. + await waitForRecord({ community: heavyComm, exportId, predicate: (r) => r !== undefined, timeoutMs: 5_000 }); + + const address = heavyComm.address; + await heavyPkc.destroy(); + + const pkc2 = await mockRpcRemotePKC(); + try { + const comm2 = (await pkc2.createCommunity({ address })) as RpcLocalCommunity; + const survivor = comm2.exports.find((r) => r.exportId === exportId); + expect(survivor).toBeDefined(); + // The export must NOT have been cancelled by the disconnect — the whole point of + // persisting records server-side is to survive client disconnects. + expect(survivor?.error).to.equal(undefined); + + const finalRec = await waitForCompleteRecord({ community: comm2, exportId }); + expect(finalRec.progress).to.equal(1); + expect(finalRec.sha256).toBeDefined(); + } finally { + await pkc2.destroy(); + } + }); +});