Skip to content
13 changes: 11 additions & 2 deletions py/packages/genkit/src/genkit/ai/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Base/shared implementation for Genkit user-facing API."""

import asyncio
import os
import threading
from collections.abc import Coroutine
from http.server import HTTPServer
Expand All @@ -39,6 +40,9 @@

T = TypeVar('T')

_instance_count = -1
_instance_lock = threading.Lock()


class GenkitBase(GenkitRegistry):
"""Base class with shared infra for Genkit instances (sync and async)."""
Expand All @@ -58,6 +62,11 @@ def __init__(
server.
"""
super().__init__()
global _instance_count
global _instance_lock
with _instance_lock:
_instance_count += 1
self.id = f'{os.getpid()}-{_instance_count}'
self._initialize_server(reflection_server_spec)
self._initialize_registry(model, plugins)
define_generate_action(self.registry)
Expand Down Expand Up @@ -165,10 +174,10 @@ def _start_server(self, spec: ServerSpec, loop: asyncio.AbstractEventLoop) -> No
"""
httpd = HTTPServer(
(spec.host, spec.port),
make_reflection_server(registry=self.registry, loop=loop),
make_reflection_server(registry=self.registry, loop=loop, id=self.id),
)
# We need to write the runtime file closest to the point of starting up
# the server to avoid race conditions with the manager's runtime
# handler.
init_default_runtime(spec)
init_default_runtime(spec, self.id)
httpd.serve_forever()
11 changes: 4 additions & 7 deletions py/packages/genkit/src/genkit/ai/_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def url(self) -> str:
def create_runtime(
runtime_dir: str,
reflection_server_spec: ServerSpec,
id: str,
at_exit_fn: Callable[[Path], None] | None = None,
) -> Path:
"""Create a runtime configuration for use with the genkit CLI.
Expand All @@ -87,7 +88,7 @@ def create_runtime(
runtime_file_path = Path(os.path.join(runtime_dir, runtime_file_name))
metadata = json.dumps({
'reflectionApiSpecVersion': 1,
'id': f'{os.getpid()}',
'id': id,
'pid': os.getpid(),
'genkitVersion': 'py/' + DEFAULT_GENKIT_VERSION,
'reflectionServerUrl': reflection_server_spec.url,
Expand All @@ -104,11 +105,7 @@ def cleanup_runtime() -> None:
return runtime_file_path


def init_default_runtime(spec: ServerSpec) -> None:
def init_default_runtime(spec: ServerSpec, id: str) -> None:
"""Initialize the runtime for the Genkit instance."""
runtimes_dir = os.path.join(os.getcwd(), '.genkit/runtimes')
create_runtime(
runtime_dir=runtimes_dir,
reflection_server_spec=spec,
at_exit_fn=os.remove,
)
create_runtime(runtime_dir=runtimes_dir, reflection_server_spec=spec, id=id, at_exit_fn=os.remove)
13 changes: 11 additions & 2 deletions py/packages/genkit/src/genkit/core/reflection.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
def make_reflection_server(
registry: Registry,
loop: asyncio.AbstractEventLoop,
id: str,
encoding='utf-8',
quiet=True,
):
Expand Down Expand Up @@ -113,11 +114,19 @@ def do_GET(self) -> None: # noqa: N802
For the /api/actions endpoint, returns a JSON object mapping action
keys to their metadata, including input/output schemas.
"""
if urllib.parse.urlparse(self.path).path == '/api/__health':
parsed_url = urllib.parse.urlparse(self.path)
if parsed_url.path == '/api/__health':
query_params = urllib.parse.parse_qs(parsed_url.query)
expected_id = query_params.get('id', [None])[0]
if expected_id is not None and expected_id != id:
self.send_response(500)
self.end_headers()
return

self.send_response(200, 'OK')
self.end_headers()

elif self.path == '/api/actions':
elif parsed_url.path == '/api/actions':
self.send_response(200)
self.send_header('content-type', 'application/json')
self.end_headers()
Expand Down
6 changes: 3 additions & 3 deletions py/packages/genkit/tests/genkit/veneer/server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,19 @@ def test_create_runtime() -> None:
spec = ServerSpec(port=3100)

# Test runtime file creation
runtime_path = create_runtime(temp_dir, spec)
runtime_path = create_runtime(temp_dir, spec, '123')
assert runtime_path.exists()

# Verify file content
content = json.loads(runtime_path.read_text(encoding='utf-8'))
assert isinstance(content, dict)
assert 'id' in content
assert content['id'] == '123'
assert 'pid' in content
assert content['reflectionServerUrl'] == 'http://localhost:3100'
assert 'timestamp' in content

# Test directory creation
new_dir = os.path.join(temp_dir, 'new_dir')
runtime_path = create_runtime(new_dir, spec)
runtime_path = create_runtime(new_dir, spec, '123')
assert os.path.exists(new_dir)
assert runtime_path.exists()
Loading