Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions src/clients/rpc-client/pkc-rpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ import type {
RpcResolveAuthorNameResult,
RpcSubscriptionIdResult,
RpcSuccessResult,
RpcFetchCidResult
RpcFetchCidResult,
ExportCommunityRpcParam,
CancelExportRpcParam,
RpcExportCommunityResult
} from "./types.js";
import {
parseRpcCommunityIdentifierParam,
Expand All @@ -49,7 +52,10 @@ import {
parseRpcResolveAuthorNameResult,
parseRpcFetchCidResult,
parseRpcSuccessResult,
parseRpcSubscriptionIdResult
parseRpcSubscriptionIdResult,
parseRpcExportCommunityParam,
parseRpcCancelExportParam,
parseRpcExportCommunityResult
} from "./rpc-schema-util.js";

const log = Logger("pkc-js:PKCRpcClient");
Expand Down Expand Up @@ -479,6 +485,33 @@ export default class PKCRpcClient extends TypedEmitter<PKCRpcClientEvents> {
return res;
}

// community.export() — see src/rpc/EXPORT_COMMUNITY_SPEC.md

// HTTP origin to absolutize relative `/exports/<exportId>` URLs returned by exportsSubscribe.
// Derived from the WS URL with ws[s]:// swapped to http[s]:// and the auth-key path stripped.
get rpcHttpOrigin(): string {
const parsed = new URL(this._websocketServerUrl);
const httpProto = parsed.protocol === "wss:" ? "https:" : "http:";
return `${httpProto}//${parsed.host}`;
}

async exportCommunity(args: ExportCommunityRpcParam): Promise<RpcExportCommunityResult> {
const parsedArgs = parseRpcExportCommunityParam(args);
return parseRpcExportCommunityResult(await this._webSocketClient.call("exportCommunity", [parsedArgs]));
}

async exportsSubscribe(args: CommunityIdentifierRpcParam): Promise<RpcSubscriptionIdResult> {
const parsedArgs = parseRpcCommunityIdentifierParam(args);
const res = parseRpcSubscriptionIdResult(await this._webSocketClient.call("exportsSubscribe", [parsedArgs]));
this._initSubscriptionEvent(res.subscriptionId);
return res;
}

async cancelExport(args: CancelExportRpcParam): Promise<RpcSuccessResult> {
const parsedArgs = parseRpcCancelExportParam(args);
return parseRpcSuccessResult(await this._webSocketClient.call("cancelExport", [parsedArgs]));
}

async getDefaults() {
throw Error("Not implemented");
}
Expand Down
12 changes: 11 additions & 1 deletion src/clients/rpc-client/rpc-schema-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import {
RpcResolveAuthorNameResultSchema,
RpcFetchCidResultSchema,
RpcSuccessResultSchema,
RpcSubscriptionIdResultSchema
RpcSubscriptionIdResultSchema,
RpcExportCommunityParamSchema,
RpcCancelExportParamSchema,
RpcExportCommunityResultSchema,
RpcExportschangeResultSchema
} from "./schema.js";

// Param parsers — all use .loose() so newer clients can send extra fields
Expand All @@ -30,3 +34,9 @@ export const parseRpcResolveAuthorNameResult = (result: unknown) => RpcResolveAu
export const parseRpcFetchCidResult = (result: unknown) => RpcFetchCidResultSchema.loose().parse(result);
export const parseRpcSuccessResult = (result: unknown) => RpcSuccessResultSchema.loose().parse(result);
export const parseRpcSubscriptionIdResult = (result: unknown) => RpcSubscriptionIdResultSchema.loose().parse(result);

// community.export() — wire param/result parsers
export const parseRpcExportCommunityParam = (params: unknown) => RpcExportCommunityParamSchema.loose().parse(params);
export const parseRpcCancelExportParam = (params: unknown) => RpcCancelExportParamSchema.loose().parse(params);
export const parseRpcExportCommunityResult = (result: unknown) => RpcExportCommunityResultSchema.loose().parse(result);
export const parseRpcExportschangeResult = (result: unknown) => RpcExportschangeResultSchema.loose().parse(result);
17 changes: 16 additions & 1 deletion src/clients/rpc-client/schema.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { z } from "zod";
import { CommentIpfsSchema, CommentUpdateSchema } from "../../publications/comment/schema.js";
import { AuthorAddressSchema, ChallengeAnswersSchema, CidStringSchema } from "../../schema/schema.js";
import { CommunityEditOptionsSchema } from "../../community/schema.js";
import { CommunityEditOptionsSchema, CommunityExportRecordsSchema } from "../../community/schema.js";
import { NameResolveCacheOptionsSchema } from "../../schema.js";
import type { EncodedDecryptedChallengeVerificationMessageType } from "../../pubsub-messages/types.js";
export const SubscriptionIdSchema = z.number().positive().int();
Expand Down Expand Up @@ -64,3 +64,18 @@ export const RpcFetchCidResultSchema = z.object({ content: z.string() });
export const RpcResolveAuthorNameResultSchema = z.object({ resolvedAuthorName: z.string().nullable() });
export const RpcSuccessResultSchema = z.object({ success: z.literal(true) });
export const RpcSubscriptionIdResultSchema = z.object({ subscriptionId: SubscriptionIdSchema }); // parsed with .loose() in rpc-schema-util.ts

// community.export() — wire params and results
export const RpcExportCommunityParamSchema = z
.object({
name: z.string().min(1).optional(),
publicKey: z.string().min(1).optional(),
includePrivateKey: z.boolean().optional()
})
.refine((args) => args.name || args.publicKey, "At least one of name or publicKey must be provided");

export const RpcCancelExportParamSchema = z.object({ exportId: z.string().uuid() });

export const RpcExportCommunityResultSchema = z.object({ exportId: z.string().uuid() });

export const RpcExportschangeResultSchema = z.object({ records: CommunityExportRecordsSchema });
10 changes: 9 additions & 1 deletion src/clients/rpc-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import {
RpcResolveAuthorNameResultSchema,
RpcFetchCidResultSchema,
RpcSuccessResultSchema,
RpcSubscriptionIdResultSchema
RpcSubscriptionIdResultSchema,
RpcExportCommunityParamSchema,
RpcCancelExportParamSchema,
RpcExportCommunityResultSchema,
RpcExportschangeResultSchema
} from "./schema.js";
import type { PageIpfs, ModQueuePageIpfs } from "../../pages/types.js";
import type { PageRuntimeFields } from "../../pages/util.js";
Expand All @@ -25,12 +29,16 @@ export type CommentPageRpcParam = z.infer<typeof RpcCommentRepliesPageParamSchem
export type CommunityPageRpcParam = z.infer<typeof RpcCommunityPageParamSchema>;
export type EditCommunityRpcParam = z.infer<typeof RpcEditCommunityParamSchema>;
export type PublishChallengeAnswersRpcParam = z.infer<typeof RpcPublishChallengeAnswersParamSchema>;
export type ExportCommunityRpcParam = z.infer<typeof RpcExportCommunityParamSchema>;
export type CancelExportRpcParam = z.infer<typeof RpcCancelExportParamSchema>;

// Result types (shared between RPC client and server)
export type RpcResolveAuthorNameResult = z.infer<typeof RpcResolveAuthorNameResultSchema>;
export type RpcFetchCidResult = z.infer<typeof RpcFetchCidResultSchema>;
export type RpcSuccessResult = z.infer<typeof RpcSuccessResultSchema>;
export type RpcSubscriptionIdResult = z.infer<typeof RpcSubscriptionIdResultSchema>;
export type RpcExportCommunityResult = z.infer<typeof RpcExportCommunityResultSchema>;
export type RpcExportschangeResult = z.infer<typeof RpcExportschangeResultSchema>;

// Re-export existing complex types used as RPC return values
export type { RpcInternalCommunityRecordBeforeFirstUpdateType, RpcLocalCommunityUpdateResultType } from "../../community/types.js";
Expand Down
14 changes: 13 additions & 1 deletion src/community/remote-community.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import type {
RpcLocalCommunityLocalProps,
CommunityEditOptions,
CommunityEventArgs,
CommunityEvents
CommunityEvents,
CommunityExportRecord,
ExportCommunityUserOptions
} from "./types.js";
import * as remeda from "remeda";
import { ModQueuePages, PostsPages } from "../pages/pages.js";
Expand Down Expand Up @@ -749,4 +751,14 @@ export class RemoteCommunity extends TypedEmitter<CommunityEvents> implements Om
async start() {
throw Error("Can't start a remote community");
}

// Community export (issue #79). Overridden by LocalCommunity and the RPC variants.
// The base implementation rejects because a read-only RemoteCommunity has no DB to back up.
get exports(): CommunityExportRecord[] {
return [];
}

async export(options?: ExportCommunityUserOptions): Promise<{ exportId: string }> {
throw new PKCError("ERR_COMMUNITY_NOT_LOCAL", { address: this.address });
}
}
113 changes: 112 additions & 1 deletion src/community/rpc-local-community.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import type {
RpcLocalCommunityLocalProps,
RpcLocalCommunityUpdateResultType,
CommunityEditOptions,
CommunityExportRecord,
CommunityIpfsType,
CommunityStartedState
CommunityStartedState,
ExportCommunityUserOptions
} from "./types.js";
import { RpcRemoteCommunity } from "./rpc-remote-community.js";
import { z } from "zod";
Expand All @@ -32,6 +34,13 @@ import type {
import { deepMergeRuntimeFields, hideClassPrivateProps } from "../util.js";
import { findStartedCommunity, trackStartedCommunity, untrackStartedCommunity } from "../pkc/tracked-instance-registry-util.js";

// Shallow clone preserving the nested error object so consumers can mutate without
// affecting cached state. Local copy because `local-community/export.ts` is node-only and
// RpcLocalCommunity must work in the browser.
function cloneExportRecord(record: CommunityExportRecord): CommunityExportRecord {
return { ...record, ...(record.error ? { error: { ...record.error } } : {}) };
}

// This class is for communities that are running and publishing, over RPC. Can be used for both browser and node
export class RpcLocalCommunity extends RpcRemoteCommunity {
override started: boolean; // Is the community started and running? This is not specific to this instance, and applies to all instances of community with this address
Expand All @@ -56,6 +65,14 @@ export class RpcLocalCommunity extends RpcRemoteCommunity {
private _startRpcSubscriptionId?: z.infer<typeof SubscriptionIdSchema> = undefined;
_usingDefaultChallenge!: RpcLocalCommunityLocalProps["_usingDefaultChallenge"];

// community.export() over RPC. The subscription is attached eagerly during createCommunity
// so consumers see prior exports (including ones still in flight from earlier client sessions)
// immediately, without having to call community.export() first.
private _exportsSubscriptionId?: number = undefined;
private _exportsCache: CommunityExportRecord[] = [];
// exportId → detach AbortSignal listener. Cleared when the export reaches a terminal state.
private _exportSignalDetachers: Map<string, () => void> = new Map();

constructor(pkc: PKC) {
super(pkc);
this.started = false;
Expand Down Expand Up @@ -338,6 +355,100 @@ export class RpcLocalCommunity extends RpcRemoteCommunity {
return super.update();
}

// community.export() over RPC — see src/rpc/EXPORT_COMMUNITY_SPEC.md
override get exports(): CommunityExportRecord[] {
return this._exportsCache.map(cloneExportRecord);
}

override async export(options: ExportCommunityUserOptions = {}): Promise<{ exportId: string }> {
// Sync validation — matches embedded path
if (options.exportPath !== undefined) throw new PKCError("ERR_EXPORT_PATH_NOT_SUPPORTED_OVER_RPC", { address: this.address });
if (options.signal?.aborted) {
const reason = (options.signal as AbortSignal).reason;
throw reason ?? new DOMException("The operation was aborted.", "AbortError");
}

const { exportId } = await this._pkc._pkcRpcClient!.exportCommunity({
name: this.name,
publicKey: this.publicKey,
includePrivateKey: options.includePrivateKey
});

if (options.signal) {
const userSignal = options.signal;
const onAbort = () => {
this._pkc._pkcRpcClient!.cancelExport({ exportId }).catch((e) => {
Logger("pkc-js:rpc-local-community:export").error("Failed to send cancelExport", exportId, e);
});
};
// Aborted between sync validation and exportCommunity returning — route the cancel.
if (userSignal.aborted) onAbort();
else {
userSignal.addEventListener("abort", onAbort, { once: true });
this._exportSignalDetachers.set(exportId, () => userSignal.removeEventListener("abort", onAbort));
}
}
return { exportId };
}

async _attachExportsSubscription(): Promise<void> {
if (this._exportsSubscriptionId !== undefined) return;
if (!this._pkc._pkcRpcClient) return; // not on an RPC PKC — should not happen for this class

const { subscriptionId } = await this._pkc._pkcRpcClient.exportsSubscribe({
name: this.name,
publicKey: this.publicKey
});
this._exportsSubscriptionId = subscriptionId;

const subscription = this._pkc._pkcRpcClient.getSubscription(subscriptionId);
let resolveInitial!: () => void;
let rejectInitial!: (e: Error) => void;
const initialReceived = new Promise<void>((resolve, reject) => {
resolveInitial = resolve;
rejectInitial = reject;
});
let seenInitial = false;

subscription.on("exportschange", (msg: any) => {
const records = (msg?.params?.result?.records ?? []) as CommunityExportRecord[];
this._absorbExportRecords(records);
if (!seenInitial) {
seenInitial = true;
resolveInitial();
}
});
subscription.on("error", (msg: any) => {
if (!seenInitial) {
seenInitial = true;
rejectInitial(msg?.params?.result ?? new Error("exportsSubscribe error before initial notification"));
}
});

this._pkc._pkcRpcClient.emitAllPendingMessages(subscriptionId);
await initialReceived;
}

private _absorbExportRecords(wireRecords: CommunityExportRecord[]): void {
const httpOrigin = this._pkc._pkcRpcClient!.rpcHttpOrigin;
this._exportsCache = wireRecords.map((rec) => {
// Wire-format url is relative (`/exports/<exportId>`) once the export completes; absolutize.
if (rec.url && rec.url.startsWith("/")) return { ...rec, url: new URL(rec.url, httpOrigin).href };
return rec;
});
// Detach signal listeners for terminal records — the server already finalized them.
for (const rec of wireRecords) {
if (rec.progress === 1 || rec.error) {
const detach = this._exportSignalDetachers.get(rec.exportId);
if (detach) {
detach();
this._exportSignalDetachers.delete(rec.exportId);
}
}
}
this.emit("exportschange", this._exportsCache.map(cloneExportRecord));
}

override async delete() {
// Make sure to stop updating or starting first
const startedCommunity = findStartedCommunity(this._pkc, { publicKey: this.publicKey, name: this.name });
Expand Down
52 changes: 51 additions & 1 deletion src/community/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,55 @@ export const CreateCommunityFunctionArgumentsSchema = CreateNewLocalCommunityUse

export const ListOfCommunitiesSchema = CommunityAddressSchema.array();

// community.export()

export const ExportCommunityUserOptionsSchema = z
.object({
includePrivateKey: z.boolean().optional(),
exportPath: z.string().min(1).optional(),
signal: z.custom<AbortSignal>((v) => v === undefined || (typeof v === "object" && v !== null && "aborted" in v)).optional()
})
.strict();

export const CommunityExportRecordErrorSchema = z
.object({
code: z.string(),
message: z.string()
})
.strict();

// Public per the design: progress is the only state indicator; size/sha256/url present
// when progress === 1; error present when the export failed (cancellation included).
export const CommunityExportRecordSchema = z
.object({
exportId: z.string().uuid(),
name: z.string().optional(),
publicKey: z.string(),
includePrivateKey: z.boolean(),
progress: z.number().min(0).max(1),
size: z.number().int().nonnegative().optional(),
sha256: z
.string()
.regex(/^[0-9a-f]{64}$/)
.optional(),
url: z.string().optional(),
error: CommunityExportRecordErrorSchema.optional()
})
.strict()
.superRefine((rec, ctx) => {
if (rec.progress !== 1 || rec.error) return;
for (const field of ["size", "sha256", "url"] as const) {
if (rec[field] === undefined)
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: [field],
message: `${field} must be present when progress === 1 and no error is set`
});
}
});

export const CommunityExportRecordsSchema = CommunityExportRecordSchema.array();

// Reserved fields

// TODO should make the array of class props typed
Expand All @@ -344,7 +393,8 @@ export const CommunityIpfsReservedFields = remeda.difference(
"editable",
"publishingState",
"updatingState",
"started"
"started",
"exports"
],
remeda.keys.strict(CommunityIpfsSchema.shape)
);
9 changes: 8 additions & 1 deletion src/community/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import {
CommunitySuggestedSchema,
RpcRemoteCommunityUpdateEventResultSchema,
CommunitySignedPropertyNames,
CommunityRoleNames
CommunityRoleNames,
ExportCommunityUserOptionsSchema,
CommunityExportRecordSchema
} from "./schema.js";
import { RpcLocalCommunity } from "./rpc-local-community.js";
import { LocalCommunity } from "../runtime/node/community/local-community.js";
Expand Down Expand Up @@ -228,9 +230,14 @@ export interface CommunityEvents {

update: (updatedCommunity: RemoteCommunity) => void;

exportschange: (records: CommunityExportRecord[]) => void;

removeListener: (eventName: string, listener: Function) => void;
}

export type ExportCommunityUserOptions = z.infer<typeof ExportCommunityUserOptionsSchema>;
export type CommunityExportRecord = z.infer<typeof CommunityExportRecordSchema>;

// Create a helper type to extract the parameters of each event
export type CommunityEventArgs<T extends keyof CommunityEvents> = Parameters<CommunityEvents[T]>;

Expand Down
Loading