diff --git a/genkit-tools/cli/src/commands/start.ts b/genkit-tools/cli/src/commands/start.ts index 44c4af3ed5..ca8bda48b2 100644 --- a/genkit-tools/cli/src/commands/start.ts +++ b/genkit-tools/cli/src/commands/start.ts @@ -17,11 +17,10 @@ import type { RuntimeManager } from '@genkit-ai/tools-common/manager'; import { startServer } from '@genkit-ai/tools-common/server'; import { findProjectRoot, logger } from '@genkit-ai/tools-common/utils'; -import { spawn } from 'child_process'; import { Command } from 'commander'; import getPort, { makeRange } from 'get-port'; import open from 'open'; -import { startManager } from '../utils/manager-utils'; +import { startDevProcessManager, startManager } from '../utils/manager-utils'; interface RunOptions { noui?: boolean; @@ -44,10 +43,20 @@ export const start = new Command('start') ); } // Always start the manager. - let managerPromise: Promise = startManager( - projectRoot, - true - ); + let manager: RuntimeManager; + let processPromise: Promise | undefined; + if (start.args.length > 0) { + const result = await startDevProcessManager( + projectRoot, + start.args[0], + start.args.slice(1) + ); + manager = result.manager; + processPromise = result.processPromise; + } else { + manager = await startManager(projectRoot, true); + processPromise = new Promise(() => {}); + } if (!options.noui) { let port: number; if (options.port) { @@ -59,51 +68,10 @@ export const start = new Command('start') } else { port = await getPort({ port: makeRange(4000, 4099) }); } - managerPromise = managerPromise.then((manager) => { - startServer(manager, port); - return manager; - }); + startServer(manager, port); if (options.open) { open(`http://localhost:${port}`); } } - await managerPromise.then((manager: RuntimeManager) => { - const telemetryServerUrl = manager?.telemetryServerUrl; - return startRuntime(telemetryServerUrl); - }); + await processPromise; }); - -async function startRuntime(telemetryServerUrl?: string) { - if (start.args.length > 0) { - return new Promise((urlResolver, reject) => { - const appProcess = spawn(start.args[0], start.args.slice(1), { - env: { - ...process.env, - GENKIT_TELEMETRY_SERVER: telemetryServerUrl, - GENKIT_ENV: 'dev', - }, - shell: process.platform === 'win32', - }); - - const originalStdIn = process.stdin; - appProcess.stderr?.pipe(process.stderr); - appProcess.stdout?.pipe(process.stdout); - process.stdin?.pipe(appProcess.stdin); - - appProcess.on('error', (error): void => { - logger.error(`Error in app process: ${error}`); - reject(error); - process.exitCode = 1; - }); - appProcess.on('exit', (code) => { - process.stdin?.pipe(originalStdIn); - if (code === 0) { - urlResolver(undefined); - } else { - reject(new Error(`app process exited with code ${code}`)); - } - }); - }); - } - return new Promise(() => {}); // no runtime, return a hanging promise. -} diff --git a/genkit-tools/cli/src/utils/manager-utils.ts b/genkit-tools/cli/src/utils/manager-utils.ts index 281d7ebf8d..7badf39a11 100644 --- a/genkit-tools/cli/src/utils/manager-utils.ts +++ b/genkit-tools/cli/src/utils/manager-utils.ts @@ -20,6 +20,7 @@ import { } from '@genkit-ai/telemetry-server'; import type { Status } from '@genkit-ai/tools-common'; import { + ProcessManager, RuntimeManager, type GenkitToolsError, } from '@genkit-ai/tools-common/manager'; @@ -65,6 +66,26 @@ export async function startManager( return manager; } +export async function startDevProcessManager( + projectRoot: string, + command: string, + args: string[] +): Promise<{ manager: RuntimeManager; processPromise: Promise }> { + const telemetryServerUrl = await resolveTelemetryServer(projectRoot); + const processManager = new ProcessManager(command, args, { + GENKIT_TELEMETRY_SERVER: telemetryServerUrl, + GENKIT_ENV: 'dev', + }); + const manager = await RuntimeManager.create({ + telemetryServerUrl, + manageHealth: true, + projectRoot, + processManager, + }); + const processPromise = processManager.start(); + return { manager, processPromise }; +} + /** * Runs the given function with a runtime manager. */ diff --git a/genkit-tools/common/src/manager/index.ts b/genkit-tools/common/src/manager/index.ts index 31cfc12f1e..d28014cc0b 100644 --- a/genkit-tools/common/src/manager/index.ts +++ b/genkit-tools/common/src/manager/index.ts @@ -15,4 +15,10 @@ */ export { RuntimeManager } from './manager'; +export { + AppProcessStatus, + ProcessManager, + ProcessManagerStartOptions, + ProcessStatus, +} from './process-manager'; export * from './types'; diff --git a/genkit-tools/common/src/manager/manager.ts b/genkit-tools/common/src/manager/manager.ts index 6f9a96478a..b166095939 100644 --- a/genkit-tools/common/src/manager/manager.ts +++ b/genkit-tools/common/src/manager/manager.ts @@ -39,6 +39,7 @@ import { retriable, type DevToolsInfo, } from '../utils/utils'; +import { ProcessManager } from './process-manager'; import { GenkitToolsError, RuntimeEvent, @@ -57,9 +58,12 @@ interface RuntimeManagerOptions { manageHealth?: boolean; /** Project root dir. If not provided will be inferred from CWD. */ projectRoot: string; + /** An optional process manager for the main application process. */ + processManager?: ProcessManager; } export class RuntimeManager { + readonly processManager?: ProcessManager; private filenameToRuntimeMap: Record = {}; private filenameToDevUiMap: Record = {}; private idToFileMap: Record = {}; @@ -68,8 +72,11 @@ export class RuntimeManager { private constructor( readonly telemetryServerUrl: string | undefined, private manageHealth: boolean, - readonly projectRoot: string - ) {} + readonly projectRoot: string, + processManager?: ProcessManager + ) { + this.processManager = processManager; + } /** * Creates a new runtime manager. @@ -78,7 +85,8 @@ export class RuntimeManager { const manager = new RuntimeManager( options.telemetryServerUrl, options.manageHealth ?? true, - options.projectRoot + options.projectRoot, + options.processManager ); await manager.setupRuntimesWatcher(); await manager.setupDevUiWatcher(); diff --git a/genkit-tools/common/src/manager/process-manager.ts b/genkit-tools/common/src/manager/process-manager.ts new file mode 100644 index 0000000000..3c4f905fb7 --- /dev/null +++ b/genkit-tools/common/src/manager/process-manager.ts @@ -0,0 +1,142 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ChildProcess, spawn } from 'child_process'; +import terminate from 'terminate'; +import { logger } from '../utils'; + +export type ProcessStatus = 'running' | 'stopped' | 'unconfigured'; + +export interface AppProcessStatus { + status: ProcessStatus; +} + +export interface ProcessManagerStartOptions { + nonInteractive?: boolean; +} + +/** + * Manages a child process. + */ +export class ProcessManager { + private appProcess?: ChildProcess; + private originalStdIn?: NodeJS.ReadStream; + private _status: ProcessStatus = 'stopped'; + private manualRestart = false; + + constructor( + private readonly command: string, + private readonly args: string[], + private readonly env: NodeJS.ProcessEnv = {} + ) {} + + /** + * Starts the process. + */ + start(options?: ProcessManagerStartOptions): Promise { + return new Promise((resolve, reject) => { + this._status = 'running'; + this.appProcess = spawn(this.command, this.args, { + env: { + ...process.env, + ...this.env, + }, + shell: process.platform === 'win32', + }); + + if (!options?.nonInteractive) { + this.originalStdIn = process.stdin; + this.appProcess.stderr?.pipe(process.stderr); + this.appProcess.stdout?.pipe(process.stdout); + process.stdin?.pipe(this.appProcess.stdin!); + } + + this.appProcess.on('error', (error): void => { + logger.error(`Error in app process: ${error}`); + this.cleanup(); + reject(error); + }); + + this.appProcess.on('exit', (code, signal) => { + this.cleanup(); + if (this.manualRestart) { + this.manualRestart = false; + return; + } + // If the process was killed by a signal, it's not an error in this context. + if (code === 0 || signal) { + resolve(); + } else { + reject(new Error(`app process exited with code ${code}`)); + } + }); + }); + } + + /** + * Kills the currently-running process and starts a new one. + */ + async restart(options?: ProcessManagerStartOptions): Promise { + this.manualRestart = true; + await this.kill(); + this.start(options).catch(() => {}); + } + + /** + * Kills the currently-running process. + */ + kill(): Promise { + return new Promise((resolve) => { + if (!this.appProcess || !this.appProcess.pid || this.appProcess.killed) { + this._status = 'stopped'; + resolve(); + return; + } + + // The 'exit' listener is set up in start() and will handle cleanup. + this.appProcess.on('exit', () => { + resolve(); + }); + + terminate(this.appProcess.pid, 'SIGTERM', (err) => { + if (err) { + // This can happen if the process is already gone, which is fine. + logger.debug(`Error during process termination: ${err.message}`); + } + resolve(); + }); + }); + } + + status(): AppProcessStatus { + return { + status: this._status, + }; + } + + private cleanup() { + if (this.originalStdIn) { + process.stdin.unpipe(this.appProcess?.stdin!); + this.originalStdIn = undefined; + } + if (this.appProcess) { + this.appProcess.stdout?.unpipe(process.stdout); + this.appProcess.stderr?.unpipe(process.stderr); + } + this.appProcess = undefined; + this._status = 'stopped'; + } +} diff --git a/genkit-tools/common/src/server/router.ts b/genkit-tools/common/src/server/router.ts index 3c6329f34a..e73ccd1f34 100644 --- a/genkit-tools/common/src/server/router.ts +++ b/genkit-tools/common/src/server/router.ts @@ -22,6 +22,7 @@ import { validateSchema, } from '../eval'; import type { RuntimeManager } from '../manager/manager'; +import { AppProcessStatus } from '../manager/process-manager'; import { GenkitToolsError, type RuntimeInfo } from '../manager/types'; import { TraceDataSchema } from '../types'; import type { Action } from '../types/action'; @@ -310,6 +311,23 @@ export const TOOLS_SERVER_ROUTER = (manager: RuntimeManager) => getActiveRuntimes: t.procedure.query(() => { return manager.listRuntimes(); }), + + getAppProcessStatus: t.procedure.query((): AppProcessStatus => { + if (!manager.processManager) { + return { status: 'unconfigured' }; + } + return manager.processManager.status(); + }), + + restartAppProcess: t.procedure.query(async () => { + await manager.processManager?.restart(); + return true; + }), + + killAppProcess: t.procedure.query(async () => { + await manager.processManager?.kill(); + return true; + }), }); export type ToolsServerRouter = ReturnType; diff --git a/genkit-tools/common/tests/process-manager_test.ts b/genkit-tools/common/tests/process-manager_test.ts new file mode 100644 index 0000000000..6c113a69c7 --- /dev/null +++ b/genkit-tools/common/tests/process-manager_test.ts @@ -0,0 +1,81 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { describe, expect, it } from '@jest/globals'; +import { ProcessManager } from '../src/manager/process-manager'; + +describe('ProcessManager', () => { + it('should start and stop a process successfully', async () => { + const manager = new ProcessManager('node', [ + '-e', + 'console.log("test"); process.exit(0);', + ]); + expect(manager.status()).toEqual({ status: 'stopped' }); + await manager.start({ nonInteractive: true }); + expect(manager.status()).toEqual({ status: 'stopped' }); + }); + + it('should reject on non-zero exit code', async () => { + const manager = new ProcessManager('node', ['-e', 'process.exit(1);']); + await expect(manager.start({ nonInteractive: true })).rejects.toThrow( + 'app process exited with code 1' + ); + expect(manager.status()).toEqual({ status: 'stopped' }); + }); + + it('should be able to kill a process', async () => { + // A process that runs for a long time + const manager = new ProcessManager('node', [ + '-e', + 'setTimeout(() => {}, 10000)', + ]); + expect(manager.status()).toEqual({ status: 'stopped' }); + + const startPromise = manager.start({ nonInteractive: true }); + // Give it a moment to start + await new Promise((resolve) => setTimeout(resolve, 100)); + expect(manager.status()).toEqual({ status: 'running' }); + + await manager.kill(); + expect(manager.status()).toEqual({ status: 'stopped' }); + + // The start promise should resolve when the process is killed. + await startPromise; + }); + + it('should be able to restart a process', async () => { + const manager = new ProcessManager('node', [ + '-e', + 'console.log("test"); process.exit(0);', + ]); + expect(manager.status()).toEqual({ status: 'stopped' }); + + // Start, let it finish + await manager.start({ nonInteractive: true }); + expect(manager.status()).toEqual({ status: 'stopped' }); + + // Restart + await manager.restart({ nonInteractive: true }); + expect(manager.status()).toEqual({ status: 'running' }); + }); + + it('kill should be idempotent', async () => { + const manager = new ProcessManager('node', ['-e', '']); + await manager.kill(); + await manager.kill(); + expect(manager.status()).toEqual({ status: 'stopped' }); + }); +}); diff --git a/js/genkit/src/genkit.ts b/js/genkit/src/genkit.ts index 96b72b53f9..040d640113 100644 --- a/js/genkit/src/genkit.ts +++ b/js/genkit/src/genkit.ts @@ -1053,7 +1053,7 @@ export function genkit(options: GenkitOptions): Genkit { } const shutdown = async () => { - logger.info('Shutting down all Genkit servers...'); + logger.debug('Shutting down all Genkit servers...'); await ReflectionServer.stopAll(); process.exit(0); };