diff --git a/src/client/ClientGameRunner.ts b/src/client/ClientGameRunner.ts index b71fb7cd69..67790b8d7d 100644 --- a/src/client/ClientGameRunner.ts +++ b/src/client/ClientGameRunner.ts @@ -262,6 +262,10 @@ export class ClientGameRunner { private lastTickReceiveTime: number = 0; private currentTickDelay: number | undefined = undefined; + private pendingUpdates: GameUpdateViewData[] = []; + private pendingUpdateIndex: number = 0; + private drainPendingUpdatesRafId: number | null = null; + constructor( private lobby: LobbyConfig, private eventBus: EventBus, @@ -361,35 +365,9 @@ export class ClientGameRunner { this.stop(); return; } - this.transport.turnComplete(); - gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => { - this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash)); - }); - this.gameView.update(gu); - this.renderer.tick(); - - // Emit tick metrics event for performance overlay - this.eventBus.emit( - new TickMetricsEvent(gu.tickExecutionDuration, this.currentTickDelay), - ); - - // Reset tick delay for next measurement - this.currentTickDelay = undefined; - - if (gu.updates[GameUpdateType.Win].length > 0) { - this.saveGame(gu.updates[GameUpdateType.Win][0]); - } + this.enqueueWorkerUpdate(gu); }); - const worker = this.worker; - const keepWorkerAlive = () => { - if (this.isActive) { - worker.sendHeartbeat(); - requestAnimationFrame(keepWorkerAlive); - } - }; - requestAnimationFrame(keepWorkerAlive); - const onconnect = () => { console.log("Connected to game server!"); this.transport.rejoinGame(this.turnsSeen); @@ -511,6 +489,12 @@ export class ClientGameRunner { if (!this.isActive) return; this.isActive = false; + if (this.drainPendingUpdatesRafId !== null) { + cancelAnimationFrame(this.drainPendingUpdatesRafId); + this.drainPendingUpdatesRafId = null; + } + this.pendingUpdates = []; + this.pendingUpdateIndex = 0; this.worker.cleanup(); this.transport.leaveGame(); if (this.connectionCheckInterval) { @@ -523,6 +507,97 @@ export class ClientGameRunner { } } + private enqueueWorkerUpdate(update: GameUpdateViewData): void { + this.pendingUpdates.push(update); + + if (this.drainPendingUpdatesRafId !== null || !this.isActive) { + return; + } + + this.drainPendingUpdatesRafId = requestAnimationFrame(() => { + this.drainPendingUpdatesRafId = null; + this.drainPendingUpdates(); + }); + } + + private drainPendingUpdates(): void { + if (!this.isActive) { + return; + } + + const backlog = this.pendingUpdates.length - this.pendingUpdateIndex; + if (backlog <= 0) { + return; + } + + const startTime = performance.now(); + const maxDrainMs = backlog > 200 ? 12 : 8; + const maxUpdates = backlog > 200 ? 1000 : 200; + + let processed = 0; + let totalTickExecutionDuration = 0; + let winUpdate: WinUpdate | null = null; + + while ( + processed < maxUpdates && + this.pendingUpdateIndex < this.pendingUpdates.length && + performance.now() - startTime < maxDrainMs + ) { + const gu = this.pendingUpdates[this.pendingUpdateIndex]; + this.pendingUpdateIndex++; + processed++; + + totalTickExecutionDuration += gu.tickExecutionDuration ?? 0; + + gu.updates[GameUpdateType.Hash].forEach((hu: HashUpdate) => { + this.eventBus.emit(new SendHashEvent(hu.tick, hu.hash)); + }); + + if (gu.updates[GameUpdateType.Win].length > 0) { + winUpdate = gu.updates[GameUpdateType.Win][0]; + } + + this.gameView.update(gu); + this.renderer.tick(); + } + + if (processed > 0) { + const avgTickExecutionDuration = totalTickExecutionDuration / processed; + this.eventBus.emit( + new TickMetricsEvent(avgTickExecutionDuration, this.currentTickDelay), + ); + this.currentTickDelay = undefined; + + if (winUpdate) { + this.saveGame(winUpdate); + } + + // In singleplayer/replay (local server), acknowledge how many turns we've + // actually applied this frame. LocalServer uses this as bounded backpressure. + this.transport.turnComplete(processed); + } + + // Compact the queue occasionally to avoid unbounded growth from array holes. + if (this.pendingUpdateIndex >= this.pendingUpdates.length) { + this.pendingUpdates = []; + this.pendingUpdateIndex = 0; + } else if (this.pendingUpdateIndex > 1024) { + this.pendingUpdates = this.pendingUpdates.slice(this.pendingUpdateIndex); + this.pendingUpdateIndex = 0; + } + + // Keep draining if we still have backlog. + if ( + this.pendingUpdates.length - this.pendingUpdateIndex > 0 && + this.drainPendingUpdatesRafId === null + ) { + this.drainPendingUpdatesRafId = requestAnimationFrame(() => { + this.drainPendingUpdatesRafId = null; + this.drainPendingUpdates(); + }); + } + } + private inputEvent(event: MouseUpEvent) { if (!this.isActive || this.renderer.uiState.ghostStructure !== null) { return; diff --git a/src/client/LocalServer.ts b/src/client/LocalServer.ts index 75121b38ae..91dc5dec9e 100644 --- a/src/client/LocalServer.ts +++ b/src/client/LocalServer.ts @@ -20,13 +20,7 @@ import { import { getPersistentID } from "./Auth"; import { LobbyConfig } from "./ClientGameRunner"; import { ReplaySpeedChangeEvent } from "./InputHandler"; -import { - defaultReplaySpeedMultiplier, - ReplaySpeedMultiplier, -} from "./utilities/ReplaySpeedMultiplier"; - -// build a small backlog so MAX can catch up. -const MAX_REPLAY_BACKLOG_TURNS = 60; +import { defaultReplaySpeedMultiplier } from "./utilities/ReplaySpeedMultiplier"; export class LocalServer { // All turns from the game record on replay. @@ -36,6 +30,7 @@ export class LocalServer { private intents: Intent[] = []; private startedAt: number; + private nextTurnAtMs: number = 0; private paused = false; private replaySpeedMultiplier = defaultReplaySpeedMultiplier; @@ -44,7 +39,6 @@ export class LocalServer { private allPlayersStats: AllPlayersStats = {}; private turnsExecuted = 0; - private turnStartTime = 0; private turnCheckInterval: NodeJS.Timeout; private clientConnect: () => void; @@ -66,30 +60,54 @@ export class LocalServer { start() { console.log("local server starting"); + this.nextTurnAtMs = Date.now(); this.turnCheckInterval = setInterval(() => { - const turnIntervalMs = - this.lobbyConfig.serverConfig.turnIntervalMs() * - this.replaySpeedMultiplier; - const backlog = Math.max(0, this.turns.length - this.turnsExecuted); - const allowReplayBacklog = - this.replaySpeedMultiplier === ReplaySpeedMultiplier.fastest && - this.lobbyConfig.gameRecord !== undefined; - const maxBacklog = allowReplayBacklog ? MAX_REPLAY_BACKLOG_TURNS : 0; + if (this.paused) { + return; + } + + const baseTurnIntervalMs = this.lobbyConfig.serverConfig.turnIntervalMs(); + const turnIntervalMs = baseTurnIntervalMs * this.replaySpeedMultiplier; + + // Outstanding work is the number of turns we've emitted that the client hasn't applied yet. + const outstandingTurns = this.turns.length - this.turnsExecuted; + + // For ×0.5/×1/×2 we aim to stay close to real time. + // For "fastest" (interval 0), allow a larger but bounded backlog so the sim can sprint + // while the main thread drains updates opportunistically. + const maxOutstandingTurns = turnIntervalMs === 0 ? 200 : 5; + const maxTurnsToEmitPerCheck = turnIntervalMs === 0 ? 200 : 5; - const canQueueNextTurn = - backlog === 0 || (maxBacklog > 0 && backlog < maxBacklog); - if ( - canQueueNextTurn && - Date.now() > this.turnStartTime + turnIntervalMs + if (outstandingTurns >= maxOutstandingTurns) { + return; + } + + const now = Date.now(); + let emitted = 0; + while ( + emitted < maxTurnsToEmitPerCheck && + this.turns.length - this.turnsExecuted < maxOutstandingTurns && + (turnIntervalMs === 0 || now >= this.nextTurnAtMs) ) { - this.turnStartTime = Date.now(); // End turn on the server means the client will start processing the turn. this.endTurn(); + emitted++; + + if (turnIntervalMs === 0) { + // "Fastest": no wall-clock pacing; rely on backlog caps above. + this.nextTurnAtMs = now; + } else { + // Fixed-rate pacing: do not try to "catch up" after stalls; resume from now. + this.nextTurnAtMs = now + turnIntervalMs; + } } }, 5); this.eventBus.on(ReplaySpeedChangeEvent, (event) => { this.replaySpeedMultiplier = event.replaySpeedMultiplier; + // Apply speed changes immediately; the next scheduled turn time will be + // recalculated by the interval loop. + this.nextTurnAtMs = Date.now(); }); this.startedAt = Date.now(); @@ -126,11 +144,13 @@ export class LocalServer { this.intents.push(clientMsg.intent); this.endTurn(); this.paused = true; + this.nextTurnAtMs = Date.now(); } else { // Unpausing: clear pause flag before adding intent so next turn can execute this.paused = false; this.intents.push(clientMsg.intent); this.endTurn(); + this.nextTurnAtMs = Date.now(); } return; } @@ -185,8 +205,11 @@ export class LocalServer { } // This is so the client can tell us when it finished processing the turn. - public turnComplete() { - this.turnsExecuted++; + public turnComplete(count: number = 1) { + this.turnsExecuted = Math.min( + this.turnsExecuted + count, + this.turns.length, + ); } // endTurn in this context means the server has collected all the intents diff --git a/src/client/Transport.ts b/src/client/Transport.ts index 58307e1132..5248f37d05 100644 --- a/src/client/Transport.ts +++ b/src/client/Transport.ts @@ -392,9 +392,9 @@ export class Transport { this.connect(this.onconnect, this.onmessage); } - public turnComplete() { + public turnComplete(count: number = 1) { if (this.isLocal) { - this.localServer.turnComplete(); + this.localServer.turnComplete(count); } } diff --git a/src/core/GameRunner.ts b/src/core/GameRunner.ts index 0f93a94f68..7e6a12eb15 100644 --- a/src/core/GameRunner.ts +++ b/src/core/GameRunner.ts @@ -181,10 +181,6 @@ export class GameRunner { return true; } - public pendingTurns(): number { - return Math.max(0, this.turns.length - this.currTurn); - } - public playerActions( playerID: PlayerID, x?: number, diff --git a/src/core/worker/Worker.worker.ts b/src/core/worker/Worker.worker.ts index 31fd3f1362..41500b5e78 100644 --- a/src/core/worker/Worker.worker.ts +++ b/src/core/worker/Worker.worker.ts @@ -16,7 +16,7 @@ import { const ctx: Worker = self as any; let gameRunner: Promise | null = null; const mapLoader = new FetchGameMapLoader(`/maps`, version); -const MAX_TICKS_PER_HEARTBEAT = 4; +let isProcessingTurns = false; function gameUpdate(gu: GameUpdateViewData | ErrorUpdate) { // skip if ErrorUpdate @@ -33,23 +33,33 @@ function sendMessage(message: WorkerMessage) { ctx.postMessage(message); } +async function processPendingTurns() { + if (isProcessingTurns) { + return; + } + if (!gameRunner) { + return; + } + + const gr = await gameRunner; + if (!gr) { + return; + } + + isProcessingTurns = true; + try { + while (gr.executeNextTick()) { + // Keep running until no pending turns. + } + } finally { + isProcessingTurns = false; + } +} + ctx.addEventListener("message", async (e: MessageEvent) => { const message = e.data; switch (message.type) { - case "heartbeat": { - const gr = await gameRunner; - if (!gr) { - break; - } - const ticksToRun = Math.min(gr.pendingTurns(), MAX_TICKS_PER_HEARTBEAT); - for (let i = 0; i < ticksToRun; i++) { - if (!gr.executeNextTick()) { - break; - } - } - break; - } case "init": try { gameRunner = createGameRunner( @@ -62,6 +72,7 @@ ctx.addEventListener("message", async (e: MessageEvent) => { type: "initialized", id: message.id, } as InitializedMessage); + processPendingTurns(); return gr; }); } catch (error) { @@ -78,6 +89,7 @@ ctx.addEventListener("message", async (e: MessageEvent) => { try { const gr = await gameRunner; await gr.addTurn(message.turn); + processPendingTurns(); } catch (error) { console.error("Failed to process turn:", error); throw error; diff --git a/src/core/worker/WorkerClient.ts b/src/core/worker/WorkerClient.ts index fe0ac38fca..514f14c677 100644 --- a/src/core/worker/WorkerClient.ts +++ b/src/core/worker/WorkerClient.ts @@ -102,12 +102,6 @@ export class WorkerClient { }); } - sendHeartbeat() { - this.worker.postMessage({ - type: "heartbeat", - }); - } - playerProfile(playerID: number): Promise { return new Promise((resolve, reject) => { if (!this.isInitialized) { diff --git a/src/core/worker/WorkerMessages.ts b/src/core/worker/WorkerMessages.ts index a8d30e9b1f..0c5344da14 100644 --- a/src/core/worker/WorkerMessages.ts +++ b/src/core/worker/WorkerMessages.ts @@ -9,7 +9,6 @@ import { GameUpdateViewData } from "../game/GameUpdates"; import { ClientID, GameStartInfo, Turn } from "../Schemas"; export type WorkerMessageType = - | "heartbeat" | "init" | "initialized" | "turn" @@ -31,10 +30,6 @@ interface BaseWorkerMessage { id?: string; } -export interface HeartbeatMessage extends BaseWorkerMessage { - type: "heartbeat"; -} - // Messages from main thread to worker export interface InitMessage extends BaseWorkerMessage { type: "init"; @@ -114,7 +109,6 @@ export interface TransportShipSpawnResultMessage extends BaseWorkerMessage { // Union types for type safety export type MainThreadMessage = - | HeartbeatMessage | InitMessage | TurnMessage | PlayerActionsMessage