Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 57 additions & 17 deletions trustgraph-flow/trustgraph/bootstrap/bootstrapper/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,58 @@ async def _run_spec(self, spec, config):
# Main loop.
# ------------------------------------------------------------------

async def _run_pre_service(self):
"""Run pre-service initialisers before opening pub/sub clients.

These bring up infrastructure that other services depend on
(e.g. Pulsar tenant/namespaces). They use out-of-band APIs
(HTTP admin), not pub/sub, so they don't need a config client.
They run without flag tracking — they must be idempotent.
"""
pre_specs = [
s for s in self.specs
if not s.instance.wait_for_services
]
if not pre_specs:
return

for spec in pre_specs:
child_logger = logger.getChild(spec.name)
child_ctx = InitContext(
logger=child_logger,
config=None,
make_flow_client=self._make_flow_client,
make_iam_client=self._make_iam_client,
)
child_logger.info(f"Running pre-service initialiser")
try:
await spec.instance.run(child_ctx, None, spec.flag)
child_logger.info(f"Pre-service initialiser completed")
except Exception as e:
child_logger.error(
f"Pre-service initialiser failed: "
f"{type(e).__name__}: {e}",
exc_info=True,
)
raise

async def start(self):
# Run pre-service initialisers before opening any pub/sub
# connections. They bring up infrastructure (Pulsar
# namespaces, etc.) that super().start() depends on.
while self.running:
try:
await self._run_pre_service()
break
except Exception as e:
logger.info(
f"Pre-service initialisation failed "
f"({type(e).__name__}: {e}); retry in {GATE_BACKOFF}s"
)
await asyncio.sleep(GATE_BACKOFF)

await super().start()

async def run(self):

logger.info(
Expand All @@ -347,38 +399,26 @@ async def run(self):
continue

try:
# Phase 1: pre-service initialisers run unconditionally.
pre_specs = [
s for s in self.specs
if not s.instance.wait_for_services
]
pre_results = {}
for spec in pre_specs:
pre_results[spec.name] = await self._run_spec(
spec, config,
)

# Phase 2: gate.
# Phase 1: gate.
gate_ok = await self._gate_ready(config)

# Phase 3: post-service initialisers, if gate passed.
post_results = {}
# Phase 2: post-service initialisers, if gate passed.
results = {}
if gate_ok:
post_specs = [
s for s in self.specs
if s.instance.wait_for_services
]
for spec in post_specs:
post_results[spec.name] = await self._run_spec(
results[spec.name] = await self._run_spec(
spec, config,
)

# Cadence selection.
if not gate_ok:
sleep_for = GATE_BACKOFF
else:
all_results = {**pre_results, **post_results}
if any(r != "skip" for r in all_results.values()):
if any(r != "skip" for r in results.values()):
sleep_for = INIT_RETRY
else:
sleep_for = STEADY_INTERVAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ def _create_namespace(self, namespace, config):
def _reconcile_sync(self, logger):
if not self._tenant_exists():
clusters = self._get_clusters()
if not clusters:
raise RuntimeError(
"Pulsar cluster list is empty — broker not ready yet"
)
logger.info(
f"Creating tenant {self.tenant!r} with clusters {clusters}"
)
Expand Down
27 changes: 9 additions & 18 deletions trustgraph-flow/trustgraph/iam/service/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ def _row_to_api_key_record(self, row):

async def auto_bootstrap_if_token_mode(self):
"""Called from the service processor at startup. In
``token`` mode, if tables are empty, seeds the default
workspace / admin / signing key using the operator-provided
``token`` mode, if tables are empty, seeds the admin user,
API key, and signing key using the operator-provided
bootstrap token. The admin's API key plaintext is *the*
``bootstrap_token`` — the operator already knows it, nothing
needs to be returned or logged.
Expand All @@ -408,7 +408,7 @@ async def auto_bootstrap_if_token_mode(self):
if self.bootstrap_mode != "token":
return

if await self.table_store.any_workspace_exists():
if await self.table_store.any_signing_key_exists():
logger.info(
"IAM: token mode, tables already populated; skipping "
"auto-bootstrap"
Expand All @@ -423,22 +423,13 @@ async def auto_bootstrap_if_token_mode(self):

async def _seed_tables(self, api_key_plaintext):
"""Shared seeding logic used by token-mode auto-bootstrap and
bootstrap-mode handle_bootstrap. Creates the default
workspace, admin user, admin API key (using the given
plaintext), and an initial signing key. Returns the admin
bootstrap-mode handle_bootstrap. Creates the admin user,
admin API key (using the given plaintext), and an initial
signing key. The workspace is created separately by the
bootstrapper's WorkspaceInit initialiser. Returns the admin
user id."""
now = _now_dt()

await self.table_store.put_workspace(
id=DEFAULT_WORKSPACE,
name="Default",
enabled=True,
created=now,
)

if self._on_workspace_created:
await self._on_workspace_created(DEFAULT_WORKSPACE)

admin_user_id = str(uuid.uuid4())
admin_password = secrets.token_urlsafe(32)
await self.table_store.put_user(
Expand Down Expand Up @@ -491,7 +482,7 @@ async def handle_bootstrap(self, v):
if self.bootstrap_mode != "bootstrap":
return _err("auth-failed", "auth failure")

if await self.table_store.any_workspace_exists():
if await self.table_store.any_signing_key_exists():
return _err("auth-failed", "auth failure")

plaintext = _generate_api_key()
Expand Down Expand Up @@ -531,7 +522,7 @@ async def handle_bootstrap_status(self, v):
instead of forcing callers to probe the masked-failure path."""
available = (
self.bootstrap_mode == "bootstrap"
and not await self.table_store.any_workspace_exists()
and not await self.table_store.any_signing_key_exists()
)
return IamResponse(bootstrap_available=available)

Expand Down
4 changes: 4 additions & 0 deletions trustgraph-flow/trustgraph/tables/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,7 @@ async def update_workspace(self, id, name, enabled):
async def any_workspace_exists(self):
rows = await self.list_workspaces()
return bool(rows)

async def any_signing_key_exists(self):
rows = await self.list_signing_keys()
return bool(rows)
Loading