diff --git a/.changeset/khaki-bananas-hammer.md b/.changeset/khaki-bananas-hammer.md new file mode 100644 index 00000000..7ab70abe --- /dev/null +++ b/.changeset/khaki-bananas-hammer.md @@ -0,0 +1,5 @@ +--- +"gill": minor +--- + +add `watchAccount` using `unifiedWatcher` diff --git a/docs/content/docs/react/getting-started.mdx b/docs/content/docs/react/getting-started.mdx index 8ba10240..7db67420 100644 --- a/docs/content/docs/react/getting-started.mdx +++ b/docs/content/docs/react/getting-started.mdx @@ -195,7 +195,7 @@ import { createSolanaClient } from "gill"; import { useUpdateSolanaClient } from "@gillsdk/react"; function NetworkSwitcher() { - const updateClient = useUpdateSolanaClient(); + const { mutate: updateClient } = useUpdateSolanaClient(); const switchToMainnet = () => { const mainnetClient = createSolanaClient({ diff --git a/packages/gill/src/core/index.ts b/packages/gill/src/core/index.ts index 5b110869..5bf1d38e 100644 --- a/packages/gill/src/core/index.ts +++ b/packages/gill/src/core/index.ts @@ -20,3 +20,6 @@ export * from "./send-and-confirm-transaction-with-signers"; export * from "./simulate-transaction"; export * from "./utils"; export * from "./verify-signature"; + +export * from "./watchers/unified-watcher"; +export * from "./watchers/watch-account"; diff --git a/packages/gill/src/core/watchers/unified-watcher.ts b/packages/gill/src/core/watchers/unified-watcher.ts new file mode 100644 index 00000000..3593d0e8 --- /dev/null +++ b/packages/gill/src/core/watchers/unified-watcher.ts @@ -0,0 +1,281 @@ +import { Slot } from "@solana/kit"; + +export type SubscriptionContext = { slot: Slot }; + +export type SubscriptionItem = { context: SubscriptionContext; value: T }; + +export type UnifiedWatcherOptions = { + /** + * External AbortController to manage lifecycle. + * If omitted, a new controller is created internally. + */ + abortController?: AbortController; + + /** + * Maximum number of consecutive WS connection attempts before falling back to polling. + */ + maxRetries?: number; + + /** + * Optional error handler for non-fatal errors: + * - WS connection failures and retries + * - Polling failures + * - Stream processing errors + */ + onError?: (e: unknown) => void; + + /** + * Handler invoked for each accepted update (after slot de-duplication). + */ + onUpdate: (u: { slot: Slot; value: TNormalized }) => void; + + /** + * Polling interval (ms) used when in polling mode. + * If omitted or <= 0, periodic polling is disabled (single poll may still run). + */ + pollIntervalMs?: number; + + /** + * Delay (ms) between WS connection attempts. + */ + retryDelayMs?: number; + + /** + * Maximum time (ms) to wait for WS connection before considering it failed + * and proceeding with retry or fallback to polling. + */ + wsConnectTimeoutMs: number; +}; + +/** + * Defines the strategy for watching a data source. + * This is the core logic that the unified watcher uses to subscribe to and process updates. + * + * @remarks + * ### Handling Solana WebSocket RPC Notifications + * + * The `subscribe` function is responsible for parsing raw WebSocket notifications and yielding + * data in a format the watcher can understand. For correct slot-based deduplication, the + * watcher expects items in the `SubscriptionItem<{ context: { slot }, value }>` format. + * + * #### 1. Directly Compatible Subscriptions + * The following rpc subscriptions produce a `result` payload that directly matches the `SubscriptionItem` format. + * The `subscribe` implementation can typically just yield `notification.params.result`. + * - `logsSubscribe` - `@solana/kit` `logsNotificatios` + * - `accountSubscribe` - `@solana/kit` `accountNotifications` + * - `programSubscribe` - `@solana/kit` `programNotifications` + * - `signatureSubscribe` - `@solana/kit` `signatureNotifications` + * + * #### 2. Adaptable Subscriptions (Requires Transformation) + * Non compatible subscriptions require manual transformation into the `SubscriptionItem` format. + * + */ +export type WatcherStrategy = { + /** + * Converts a raw WS payload into the normalized value type consumed by onUpdate. + */ + normalize: (raw: TRaw | null) => TNormalized; + + /** + * Performs a single poll and emits at most one update via onEmit. + * - slot is optional; if omitted, the watcher will synthesize one. + * - value should be normalized or null. + * Implementations should throw on fatal errors to allow retry/handling upstream. + */ + poll?: (onEmit: (update: { slot?: Slot; value: TNormalized }) => void, abortSignal: AbortSignal) => Promise; + + /** + * Starts a WS subscription and returns an async iterable of updates. + * Each item can be either: + * - SubscriptionItem (preferred): includes context.slot. + * - TRaw: raw payload without context; slot will be synthesized by the watcher. + */ + subscribe: (abortSignal: AbortSignal) => Promise | TRaw>>; +}; + +const attemptSubscription = async ( + subscribeFn: () => Promise | TRaw>>, + timeoutMs: number, + abortSignal: AbortSignal, +): Promise | TRaw>> => { + if (abortSignal.aborted) { + throw new Error("Aborted"); + } + const connectPromise = subscribeFn(); + const timeoutPromise = new Promise((_, reject) => + setTimeout(() => reject(new Error(`ws connect timeout (${timeoutMs}ms)`)), timeoutMs), + ); + return await Promise.race([connectPromise, timeoutPromise]); +}; + +const executePoll = async ( + poll: (onEmit: (update: { slot?: Slot; value: TNormalized }) => void, abortSignal: AbortSignal) => Promise, + onUpdate: (slot: Slot, value: TNormalized) => void, + getLastSlot: () => Slot, + closedRef: { value: boolean }, + abortSignal: AbortSignal, + onError?: (e: unknown) => void, +) => { + if (closedRef.value) { + return; + } + + try { + const onEmitFromPoll = ({ slot, value }: { slot?: Slot; value: TNormalized }) => { + const newSlot = slot ?? getLastSlot() + 1n; + onUpdate(newSlot, value); + }; + + await poll(onEmitFromPoll, abortSignal); + } catch (e) { + if (!closedRef.value && onError) { + onError(e); + } + } +}; + +/** + * Creates a unified watcher that manages a WebSocket subscription with a polling fallback. + * + * A generic utility function that relies on the provided `WatcherStrategy` to handle the specifics + * of a data source. The watcher's primary role is to manage the connection lifecycle, handle + * retries, fall back to polling, and deduplicate updates based on an advancing slot number. + * + * @param strategy The strategy that defines how to subscribe to, poll, and normalize data. + * @param opts Configuration options for the watcher. + * @see {@link WatcherStrategy} for detailed guidance on implementation for Solana RPC subscriptions. + */ +export const createUnifiedWatcher = async ( + strategy: WatcherStrategy, + opts: UnifiedWatcherOptions, +): Promise<{ stop: () => void }> => { + const { + pollIntervalMs, + wsConnectTimeoutMs, + onUpdate, + onError, + abortController = new AbortController(), + maxRetries = 3, + retryDelayMs = 2000, // Default to a 2-second fixed retry delay + } = opts; + + const closedRef = { value: false }; + + let pollTimer: ReturnType | null = null; + + let lastSlot: Slot = -1n; + + const hasPoll = typeof strategy.poll === "function"; + + const stop = () => { + if (closedRef.value) return; + closedRef.value = true; + if (pollTimer) clearInterval(pollTimer); + abortController.abort(); + }; + + const emitIfNewer = (slot: Slot, value: TNormalized) => { + if (slot <= lastSlot) { + return; + } + lastSlot = slot; + onUpdate({ slot, value }); + }; + + const singlePoll = () => { + if (!strategy.poll) { + return Promise.resolve(); + } + return executePoll(strategy.poll, emitIfNewer, () => lastSlot, closedRef, abortController.signal, onError); + }; + + const startPollingFallback = async () => { + if (closedRef.value || !hasPoll) return; + await singlePoll(); + if (closedRef.value) return; + if (pollIntervalMs && pollIntervalMs > 0) { + pollTimer = setInterval(() => void singlePoll(), pollIntervalMs); + } + }; + + const handleStream = async (stream: AsyncIterable | TRaw>) => { + for await (const item of stream) { + if (closedRef.value) { + break; + } + + let slot: Slot; + let value: TRaw; + + if ( + typeof item === "object" && + item !== null && + "context" in item && + typeof item.context === "object" && + item.context !== null && + "slot" in item.context + ) { + const subItem = item; + slot = subItem.context.slot; + value = subItem.value; + } else { + // No context provided by the stream; synthesize a monotonic slot. + lastSlot = lastSlot + 1n; + slot = lastSlot; + value = item as TRaw; + } + + emitIfNewer(slot, strategy.normalize(value)); + } + }; + + // Main loop: attempts WS connection with retry; falls back to polling after max retries. + const run = async () => { + let connectAttempt = 0; + + while (!closedRef.value) { + try { + const stream = await attemptSubscription( + () => strategy.subscribe(abortController.signal), + wsConnectTimeoutMs, + abortController.signal, + ); + + connectAttempt = 0; // Reset on successful connection. + + // Seed state via a poll (if available) before consuming the stream. + if (hasPoll) { + await singlePoll(); + } + + if (closedRef.value) { + return; + } + + await handleStream(stream); + + if (closedRef.value) return; + // If the stream ends naturally, loop to attempt reconnection again. + } catch (e) { + if (closedRef.value) return; + + onError?.(e); + + connectAttempt++; + if (connectAttempt >= maxRetries) { + onError?.(new Error(`Failed to connect to WebSocket after ${maxRetries} attempts.`)); + await startPollingFallback(); + return; // Exit loop and remain in polling mode. + } + + // Fixed-delay retry (could be replaced with exponential backoff). + await new Promise((resolve) => setTimeout(resolve, retryDelayMs)); + } + } + }; + + // Start the watcher loop. + await run(); + + return { stop }; +}; diff --git a/packages/gill/src/core/watchers/watch-account.ts b/packages/gill/src/core/watchers/watch-account.ts new file mode 100644 index 00000000..a4d32e0c --- /dev/null +++ b/packages/gill/src/core/watchers/watch-account.ts @@ -0,0 +1,134 @@ +import { + AccountInfoBase, + type AccountInfoWithBase64EncodedData, + type Address, + assertAccountExists, + Commitment, + MaybeEncodedAccount, + parseBase64RpcAccount, +} from "@solana/kit"; + +import { SolanaClient } from "../../types/rpc.js"; +import { createUnifiedWatcher, type UnifiedWatcherOptions, type WatcherStrategy } from "./unified-watcher"; + +type AccountUpdate = { + /** + * Slot associated with the update. Monotonic and de-duplicated. + */ + slot: bigint; + /** + * Account info at the given slot, or null if the account does not exist. + */ + value: MaybeEncodedAccount; +}; + +type WatchAccountArgs = { + /** + * Target account address to watch. + */ + accountAddress: Address; + + /** + * Commitment level used for both RPC and WS subscription. + * Defaults to 'confirmed'. + */ + commitment?: Commitment; + + /** + * Maximum number of WS connection retries before falling back to polling. + * Defaults to 3. + */ + maxRetries?: number; + + /** + * Optional error callback for non-fatal failures. + */ + onError?: (e: unknown) => void; + + /** + * Update handler receiving { slot, value }. + */ + onUpdate: (u: AccountUpdate) => void; + + /** + * Poll interval (ms) when in polling mode. Defaults to 5000. + */ + pollIntervalMs?: number; + + /** + * Delay (ms) between WS connection retries. Defaults to 2000. + */ + retryDelayMs?: number; + + /** + * RPC client for HTTP requests. + */ + rpc: SolanaClient["rpc"]; + + /** + * RPC subscriptions (WS) client used to subscribe to account notifications. + */ + rpcSubscriptions: SolanaClient["rpcSubscriptions"]; + + /** + * Timeout (ms) for initial WS connection attempts. Defaults to 8000. + */ + wsConnectTimeoutMs?: number; +}; +type Base64EncodedRpcAccount = AccountInfoBase & AccountInfoWithBase64EncodedData; + +/** + * Watches a Solana account for changes. + * + * This function builds on the unified watcher and provides a resource-specific + * strategy for accounts. It tries WS subscription first and falls back to HTTP + * polling when needed. + * + * @param args - Arguments to configure the account watcher. + * @returns A function to stop the watcher. + */ +export const watchAccount = async ({ + rpc, + rpcSubscriptions, + commitment = "confirmed", + pollIntervalMs = 5000, + wsConnectTimeoutMs = 8000, + accountAddress, + onUpdate, + onError, + maxRetries, + retryDelayMs, +}: WatchAccountArgs) => { + const strategy: WatcherStrategy = { + normalize: (raw) => { + const parsed = parseBase64RpcAccount(accountAddress, raw); + return parsed; + }, + poll: async (onEmit, abortSignal) => { + const { context, value } = await rpc + .getAccountInfo(accountAddress, { commitment, encoding: "base64" }) + .send({ abortSignal }); + const parsedAccount = parseBase64RpcAccount(accountAddress, value); + assertAccountExists(parsedAccount); + onEmit({ slot: context.slot, value: parsedAccount }); + }, + subscribe: async (abortSignal) => { + return await rpcSubscriptions + .accountNotifications(accountAddress, { commitment, encoding: "base64" }) + .subscribe({ abortSignal }); + }, + }; + + const opts: UnifiedWatcherOptions = { + maxRetries, + onError, + onUpdate, + pollIntervalMs, + retryDelayMs, + wsConnectTimeoutMs, + }; + + const { stop } = await createUnifiedWatcher(strategy, opts); + + return stop; +};