Skip to content

Commit 0ad9d7d

Browse files
authored
refactor(cli): refactored runtime process manager, added process control APIs (#3743)
1 parent ed71be8 commit 0ad9d7d

File tree

8 files changed

+297
-53
lines changed

8 files changed

+297
-53
lines changed

genkit-tools/cli/src/commands/start.ts

Lines changed: 17 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
import type { RuntimeManager } from '@genkit-ai/tools-common/manager';
1818
import { startServer } from '@genkit-ai/tools-common/server';
1919
import { findProjectRoot, logger } from '@genkit-ai/tools-common/utils';
20-
import { spawn } from 'child_process';
2120
import { Command } from 'commander';
2221
import getPort, { makeRange } from 'get-port';
2322
import open from 'open';
24-
import { startManager } from '../utils/manager-utils';
23+
import { startDevProcessManager, startManager } from '../utils/manager-utils';
2524

2625
interface RunOptions {
2726
noui?: boolean;
@@ -44,10 +43,20 @@ export const start = new Command('start')
4443
);
4544
}
4645
// Always start the manager.
47-
let managerPromise: Promise<RuntimeManager> = startManager(
48-
projectRoot,
49-
true
50-
);
46+
let manager: RuntimeManager;
47+
let processPromise: Promise<void> | undefined;
48+
if (start.args.length > 0) {
49+
const result = await startDevProcessManager(
50+
projectRoot,
51+
start.args[0],
52+
start.args.slice(1)
53+
);
54+
manager = result.manager;
55+
processPromise = result.processPromise;
56+
} else {
57+
manager = await startManager(projectRoot, true);
58+
processPromise = new Promise(() => {});
59+
}
5160
if (!options.noui) {
5261
let port: number;
5362
if (options.port) {
@@ -59,51 +68,10 @@ export const start = new Command('start')
5968
} else {
6069
port = await getPort({ port: makeRange(4000, 4099) });
6170
}
62-
managerPromise = managerPromise.then((manager) => {
63-
startServer(manager, port);
64-
return manager;
65-
});
71+
startServer(manager, port);
6672
if (options.open) {
6773
open(`http://localhost:${port}`);
6874
}
6975
}
70-
await managerPromise.then((manager: RuntimeManager) => {
71-
const telemetryServerUrl = manager?.telemetryServerUrl;
72-
return startRuntime(telemetryServerUrl);
73-
});
76+
await processPromise;
7477
});
75-
76-
async function startRuntime(telemetryServerUrl?: string) {
77-
if (start.args.length > 0) {
78-
return new Promise((urlResolver, reject) => {
79-
const appProcess = spawn(start.args[0], start.args.slice(1), {
80-
env: {
81-
...process.env,
82-
GENKIT_TELEMETRY_SERVER: telemetryServerUrl,
83-
GENKIT_ENV: 'dev',
84-
},
85-
shell: process.platform === 'win32',
86-
});
87-
88-
const originalStdIn = process.stdin;
89-
appProcess.stderr?.pipe(process.stderr);
90-
appProcess.stdout?.pipe(process.stdout);
91-
process.stdin?.pipe(appProcess.stdin);
92-
93-
appProcess.on('error', (error): void => {
94-
logger.error(`Error in app process: ${error}`);
95-
reject(error);
96-
process.exitCode = 1;
97-
});
98-
appProcess.on('exit', (code) => {
99-
process.stdin?.pipe(originalStdIn);
100-
if (code === 0) {
101-
urlResolver(undefined);
102-
} else {
103-
reject(new Error(`app process exited with code ${code}`));
104-
}
105-
});
106-
});
107-
}
108-
return new Promise(() => {}); // no runtime, return a hanging promise.
109-
}

genkit-tools/cli/src/utils/manager-utils.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
} from '@genkit-ai/telemetry-server';
2121
import type { Status } from '@genkit-ai/tools-common';
2222
import {
23+
ProcessManager,
2324
RuntimeManager,
2425
type GenkitToolsError,
2526
} from '@genkit-ai/tools-common/manager';
@@ -65,6 +66,26 @@ export async function startManager(
6566
return manager;
6667
}
6768

69+
export async function startDevProcessManager(
70+
projectRoot: string,
71+
command: string,
72+
args: string[]
73+
): Promise<{ manager: RuntimeManager; processPromise: Promise<void> }> {
74+
const telemetryServerUrl = await resolveTelemetryServer(projectRoot);
75+
const processManager = new ProcessManager(command, args, {
76+
GENKIT_TELEMETRY_SERVER: telemetryServerUrl,
77+
GENKIT_ENV: 'dev',
78+
});
79+
const manager = await RuntimeManager.create({
80+
telemetryServerUrl,
81+
manageHealth: true,
82+
projectRoot,
83+
processManager,
84+
});
85+
const processPromise = processManager.start();
86+
return { manager, processPromise };
87+
}
88+
6889
/**
6990
* Runs the given function with a runtime manager.
7091
*/

genkit-tools/common/src/manager/index.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,10 @@
1515
*/
1616

1717
export { RuntimeManager } from './manager';
18+
export {
19+
AppProcessStatus,
20+
ProcessManager,
21+
ProcessManagerStartOptions,
22+
ProcessStatus,
23+
} from './process-manager';
1824
export * from './types';

genkit-tools/common/src/manager/manager.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import {
3939
retriable,
4040
type DevToolsInfo,
4141
} from '../utils/utils';
42+
import { ProcessManager } from './process-manager';
4243
import {
4344
GenkitToolsError,
4445
RuntimeEvent,
@@ -57,9 +58,12 @@ interface RuntimeManagerOptions {
5758
manageHealth?: boolean;
5859
/** Project root dir. If not provided will be inferred from CWD. */
5960
projectRoot: string;
61+
/** An optional process manager for the main application process. */
62+
processManager?: ProcessManager;
6063
}
6164

6265
export class RuntimeManager {
66+
readonly processManager?: ProcessManager;
6367
private filenameToRuntimeMap: Record<string, RuntimeInfo> = {};
6468
private filenameToDevUiMap: Record<string, DevToolsInfo> = {};
6569
private idToFileMap: Record<string, string> = {};
@@ -68,8 +72,11 @@ export class RuntimeManager {
6872
private constructor(
6973
readonly telemetryServerUrl: string | undefined,
7074
private manageHealth: boolean,
71-
readonly projectRoot: string
72-
) {}
75+
readonly projectRoot: string,
76+
processManager?: ProcessManager
77+
) {
78+
this.processManager = processManager;
79+
}
7380

7481
/**
7582
* Creates a new runtime manager.
@@ -78,7 +85,8 @@ export class RuntimeManager {
7885
const manager = new RuntimeManager(
7986
options.telemetryServerUrl,
8087
options.manageHealth ?? true,
81-
options.projectRoot
88+
options.projectRoot,
89+
options.processManager
8290
);
8391
await manager.setupRuntimesWatcher();
8492
await manager.setupDevUiWatcher();
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { ChildProcess, spawn } from 'child_process';
18+
import terminate from 'terminate';
19+
import { logger } from '../utils';
20+
21+
export type ProcessStatus = 'running' | 'stopped' | 'unconfigured';
22+
23+
export interface AppProcessStatus {
24+
status: ProcessStatus;
25+
}
26+
27+
export interface ProcessManagerStartOptions {
28+
nonInteractive?: boolean;
29+
}
30+
31+
/**
32+
* Manages a child process.
33+
*/
34+
export class ProcessManager {
35+
private appProcess?: ChildProcess;
36+
private originalStdIn?: NodeJS.ReadStream;
37+
private _status: ProcessStatus = 'stopped';
38+
private manualRestart = false;
39+
40+
constructor(
41+
private readonly command: string,
42+
private readonly args: string[],
43+
private readonly env: NodeJS.ProcessEnv = {}
44+
) {}
45+
46+
/**
47+
* Starts the process.
48+
*/
49+
start(options?: ProcessManagerStartOptions): Promise<void> {
50+
return new Promise((resolve, reject) => {
51+
this._status = 'running';
52+
this.appProcess = spawn(this.command, this.args, {
53+
env: {
54+
...process.env,
55+
...this.env,
56+
},
57+
shell: process.platform === 'win32',
58+
});
59+
60+
if (!options?.nonInteractive) {
61+
this.originalStdIn = process.stdin;
62+
this.appProcess.stderr?.pipe(process.stderr);
63+
this.appProcess.stdout?.pipe(process.stdout);
64+
process.stdin?.pipe(this.appProcess.stdin!);
65+
}
66+
67+
this.appProcess.on('error', (error): void => {
68+
logger.error(`Error in app process: ${error}`);
69+
this.cleanup();
70+
reject(error);
71+
});
72+
73+
this.appProcess.on('exit', (code, signal) => {
74+
this.cleanup();
75+
if (this.manualRestart) {
76+
this.manualRestart = false;
77+
return;
78+
}
79+
// If the process was killed by a signal, it's not an error in this context.
80+
if (code === 0 || signal) {
81+
resolve();
82+
} else {
83+
reject(new Error(`app process exited with code ${code}`));
84+
}
85+
});
86+
});
87+
}
88+
89+
/**
90+
* Kills the currently-running process and starts a new one.
91+
*/
92+
async restart(options?: ProcessManagerStartOptions): Promise<void> {
93+
this.manualRestart = true;
94+
await this.kill();
95+
this.start(options).catch(() => {});
96+
}
97+
98+
/**
99+
* Kills the currently-running process.
100+
*/
101+
kill(): Promise<void> {
102+
return new Promise((resolve) => {
103+
if (!this.appProcess || !this.appProcess.pid || this.appProcess.killed) {
104+
this._status = 'stopped';
105+
resolve();
106+
return;
107+
}
108+
109+
// The 'exit' listener is set up in start() and will handle cleanup.
110+
this.appProcess.on('exit', () => {
111+
resolve();
112+
});
113+
114+
terminate(this.appProcess.pid, 'SIGTERM', (err) => {
115+
if (err) {
116+
// This can happen if the process is already gone, which is fine.
117+
logger.debug(`Error during process termination: ${err.message}`);
118+
}
119+
resolve();
120+
});
121+
});
122+
}
123+
124+
status(): AppProcessStatus {
125+
return {
126+
status: this._status,
127+
};
128+
}
129+
130+
private cleanup() {
131+
if (this.originalStdIn) {
132+
process.stdin.unpipe(this.appProcess?.stdin!);
133+
this.originalStdIn = undefined;
134+
}
135+
if (this.appProcess) {
136+
this.appProcess.stdout?.unpipe(process.stdout);
137+
this.appProcess.stderr?.unpipe(process.stderr);
138+
}
139+
this.appProcess = undefined;
140+
this._status = 'stopped';
141+
}
142+
}

genkit-tools/common/src/server/router.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
validateSchema,
2323
} from '../eval';
2424
import type { RuntimeManager } from '../manager/manager';
25+
import { AppProcessStatus } from '../manager/process-manager';
2526
import { GenkitToolsError, type RuntimeInfo } from '../manager/types';
2627
import { TraceDataSchema } from '../types';
2728
import type { Action } from '../types/action';
@@ -310,6 +311,23 @@ export const TOOLS_SERVER_ROUTER = (manager: RuntimeManager) =>
310311
getActiveRuntimes: t.procedure.query(() => {
311312
return manager.listRuntimes();
312313
}),
314+
315+
getAppProcessStatus: t.procedure.query((): AppProcessStatus => {
316+
if (!manager.processManager) {
317+
return { status: 'unconfigured' };
318+
}
319+
return manager.processManager.status();
320+
}),
321+
322+
restartAppProcess: t.procedure.query(async () => {
323+
await manager.processManager?.restart();
324+
return true;
325+
}),
326+
327+
killAppProcess: t.procedure.query(async () => {
328+
await manager.processManager?.kill();
329+
return true;
330+
}),
313331
});
314332

315333
export type ToolsServerRouter = ReturnType<typeof TOOLS_SERVER_ROUTER>;

0 commit comments

Comments
 (0)