diff --git a/__init__.py b/__init__.py index 968a5ea..5031c2e 100644 --- a/__init__.py +++ b/__init__.py @@ -41,9 +41,25 @@ VictorOSBaseModule, VictorOSModuleMetadata, LGTVictorOSModule, + MorphicVictorAgent, victoros_module, ) +# Morphic Cognitive Engine +from octonion_pos_embedding import ( + OctonionEmbedding, + octonion_distance, + GravitationalOctonionPosition, +) +from polymorphic_attention_orchestrator import ( + PHASE_CONFIG, + PolymorphicAttentionOrchestrator, +) +from training_containment import ( + MorphicContainmentConfig, + MorphicContainmentProtocol, +) + # Training from training import ( ContainmentConfig, @@ -88,7 +104,16 @@ "VictorOSBaseModule", "VictorOSModuleMetadata", "LGTVictorOSModule", + "MorphicVictorAgent", "victoros_module", + # Morphic Cognitive Engine + "OctonionEmbedding", + "octonion_distance", + "GravitationalOctonionPosition", + "PHASE_CONFIG", + "PolymorphicAttentionOrchestrator", + "MorphicContainmentConfig", + "MorphicContainmentProtocol", # Training "ContainmentConfig", "ContainmentProtocol", diff --git a/octonion_pos_embedding.py b/octonion_pos_embedding.py new file mode 100644 index 0000000..75b87ac --- /dev/null +++ b/octonion_pos_embedding.py @@ -0,0 +1,149 @@ +""" +Octonion Positional Embeddings +8-dimensional non-associative Octonion embeddings for curved-spacetime +positional encoding in the Lightweight Gravitational Transformer. + +Structure of an Octonion: [real, i, j, k, l, il, jl, kl] + +By distributing the model dimension across these 8 components with +phase-shifted sinusoids, the embedding captures richer non-Euclidean +geometry than standard sinusoidal positional encodings. The resulting +``octonion_distance`` function is used in place of the standard Euclidean +distance inside the Polymorphic Attention Orchestrator. +""" + +import math +from typing import Optional + +import torch +import torch.nn as nn + + +class OctonionEmbedding(nn.Module): + """ + Generates 8-dimensional Octonion positional embeddings. + + The model dimension is divided evenly across the 8 Octonion components + (real, i, j, k, l, il, jl, kl). Each component uses a sinusoid with a + distinct phase offset ``k * π / 4`` applied on top of the standard + ``sin(position · div_term)`` basis, approximating the non-Euclidean + curvature of the Octonion manifold. + + Args: + dim_model: Total embedding / model dimension. Should be divisible + by 8; if not, the last component receives any remaining dims. + max_len: Maximum sequence length. + """ + + def __init__(self, dim_model: int, max_len: int = 5000): + super().__init__() + self.dim_model = dim_model + self.max_len = max_len + + pe = torch.zeros(max_len, dim_model) + + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # [T, 1] + # Frequencies at every 8th dimension (one per Octonion component) + div_term = torch.exp( + torch.arange(0, dim_model, 8, dtype=torch.float) + * -(math.log(10000.0) / dim_model) + ) # [dim_model // 8] + + component_width = dim_model // 8 + + for k in range(8): + start = k * component_width + # Last component absorbs any remaining dimensions + end = start + component_width if k < 7 else dim_model + actual_width = end - start + + # Align div_term to the actual width for this slice + dt = div_term[:actual_width] + pe[:, start:end] = torch.sin(position * dt + (k * math.pi / 4)) + + self.register_buffer("pe", pe.unsqueeze(0)) # [1, max_len, dim_model] + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """ + Return Octonion position embeddings for the input sequence. + + Args: + x: ``[batch, seq_len, dim_model]`` – token representations + (used only to determine ``seq_len`` and device). + + Returns: + ``[1, seq_len, dim_model]`` position embedding tensor on the + same device as ``x``. + """ + seq_len = x.size(1) + pe = self.pe[:, :seq_len, :] # type: ignore[index] + if pe.device != x.device: + pe = pe.to(x.device) + return pe # type: ignore[return-value] + + +def octonion_distance(oct_a: torch.Tensor, oct_b: torch.Tensor) -> torch.Tensor: + """ + Compute the Octonion norm of the difference between two embedding vectors. + + This replaces the Euclidean ``dist(p_i, p_j)`` in the gravitational force + formula:: + + F_ij = G · m_i · m_j / (dist(p_i, p_j)² + ε) + + The distance is the L2 norm of ``(oct_a - oct_b)`` in the full embedding + space, with a small ``ε`` (event horizon) added for numerical stability. + + Args: + oct_a: First set of Octonion vectors ``[..., dim]``. + oct_b: Second set of Octonion vectors ``[..., dim]`` (broadcast-compatible + with ``oct_a``). + + Returns: + Scalar distance tensor ``[..., 1]`` (keepdim). + """ + diff = oct_a - oct_b + norm_sq = torch.sum(diff ** 2, dim=-1, keepdim=True) + return torch.sqrt(norm_sq + 1e-9) + + +class GravitationalOctonionPosition(nn.Module): + """ + Computes pairwise Octonion distances between all token positions. + + Wraps :class:`OctonionEmbedding` and :func:`octonion_distance` into a + single module whose output is a ``[batch, seq_len, seq_len]`` distance + matrix suitable for use in gravitational force calculations. + + Args: + dim_model: Model dimension (passed to :class:`OctonionEmbedding`). + max_len: Maximum sequence length. + """ + + def __init__(self, dim_model: int, max_len: int = 5000): + super().__init__() + self.embedding = OctonionEmbedding(dim_model=dim_model, max_len=max_len) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + """ + Compute the pairwise Octonion distance matrix for all token positions. + + Args: + x: ``[batch, seq_len, dim_model]`` – token representations. + + Returns: + ``[batch, seq_len, seq_len]`` non-negative distance matrix. + """ + # pos: [1, seq_len, dim_model] + pos = self.embedding(x) + + # Expand for pairwise subtraction + pos_i = pos.unsqueeze(2) # [1, seq_len, 1, dim_model] + pos_j = pos.unsqueeze(1) # [1, 1, seq_len, dim_model] + + # dist: [1, seq_len, seq_len, 1] → squeeze last dim + dist = octonion_distance(pos_i, pos_j).squeeze(-1) # [1, seq_len, seq_len] + + # Broadcast across batch dimension + batch = x.size(0) + return dist.expand(batch, -1, -1) # [batch, seq_len, seq_len] diff --git a/polymorphic_attention_orchestrator.py b/polymorphic_attention_orchestrator.py new file mode 100644 index 0000000..6f07d4c --- /dev/null +++ b/polymorphic_attention_orchestrator.py @@ -0,0 +1,232 @@ +""" +Polymorphic Attention Orchestrator +Implements phase-shifting gravitational attention for the Morphic Cognitive Engine. + +The agent transitions between four "cognitive phases" – Solid, Fluid, Gas, and +Singularity – by dynamically reconfiguring the gravitational constant ``G``, +spacetime curvature, and information-density targets. Force-based attention +weights replace the standard dot-product similarity: + + F_ij = G · m_i · m_j / (dist(p_i, p_j)² + ε) + +Distances are computed in 8-dimensional Octonion space via +:class:`~octonion_pos_embedding.GravitationalOctonionPosition`. + +Phases +------ +- **Solid** (``G=0.5, curvature=0.0``): Precise, low-entropy reasoning. +- **Fluid** (``G=1.0, curvature=0.15``): Balanced general-purpose processing + (default). +- **Gas** (``G=0.1, curvature=0.8``): Creative / exploratory processing with + high curvature and diffuse attention. +- **Singularity** (``G=50.0, curvature=-0.1``): Extreme focus on a small + number of high-mass tokens; stabilised by the Hawking clamp. +""" + +from typing import Any, Dict, Optional, Tuple + +import torch +import torch.nn as nn +import torch.nn.functional as F + +from octonion_pos_embedding import GravitationalOctonionPosition + + +# --------------------------------------------------------------------------- +# Phase configuration map +# --------------------------------------------------------------------------- + +PHASE_CONFIG: Dict[str, Dict[str, float]] = { + "solid": {"G": 0.5, "curvature": 0.0, "hawking_clamp": 50.0}, + "fluid": {"G": 1.0, "curvature": 0.15, "hawking_clamp": 50.0}, + "gas": {"G": 0.1, "curvature": 0.8, "hawking_clamp": 50.0}, + "singularity": {"G": 50.0, "curvature": -0.1, "hawking_clamp": 50.0}, +} + +VALID_PHASES = frozenset(PHASE_CONFIG.keys()) + + +# --------------------------------------------------------------------------- +# Polymorphic Attention Orchestrator +# --------------------------------------------------------------------------- + +class PolymorphicAttentionOrchestrator(nn.Module): + """ + Phase-shifting gravitational attention module. + + Replaces standard dot-product attention with a gravitational force + calculation. The active phase is set by calling :meth:`morph` before + (or during) the forward pass. + + Args: + dim_model: Model (embedding) dimension. + num_heads: Number of attention heads. Must divide ``dim_model``. + max_len: Maximum sequence length for Octonion position embeddings. + event_horizon: Small constant ``ε`` to prevent division by zero in + the force denominator. + initial_phase: Starting cognitive phase (``"fluid"`` by default). + """ + + def __init__( + self, + dim_model: int, + num_heads: int = 4, + max_len: int = 5000, + event_horizon: float = 1e-6, + initial_phase: str = "fluid", + ): + super().__init__() + + if dim_model % num_heads != 0: + raise ValueError( + f"dim_model ({dim_model}) must be divisible by num_heads ({num_heads})" + ) + if initial_phase not in VALID_PHASES: + raise ValueError( + f"Unknown phase '{initial_phase}'. Valid phases: {sorted(VALID_PHASES)}" + ) + + self.dim_model = dim_model + self.num_heads = num_heads + self.head_dim = dim_model // num_heads + self.event_horizon = event_horizon + + # Current phase parameters (mutable, updated by morph()) + self.current_phase: str = initial_phase + cfg = PHASE_CONFIG[initial_phase] + self.G: float = cfg["G"] + self.curvature: float = cfg["curvature"] + self.hawking_clamp: float = cfg["hawking_clamp"] + + # Learnable per-head mass projections (one per head) + self.mass_projs = nn.ModuleList([ + nn.Linear(self.head_dim, 1, bias=False) + for _ in range(num_heads) + ]) + + # Value projection (one per head, recombined by out_proj) + self.v_proj = nn.Linear(dim_model, dim_model, bias=False) + self.out_proj = nn.Linear(dim_model, dim_model, bias=False) + + # Octonion positional distance module + self.oct_pos = GravitationalOctonionPosition(dim_model=dim_model, max_len=max_len) + + # ------------------------------------------------------------------ + # Phase management + # ------------------------------------------------------------------ + + def morph(self, phase: str) -> None: + """ + Reconfigure attention parameters for the given cognitive phase. + + Args: + phase: One of ``"solid"``, ``"fluid"``, ``"gas"``, + ``"singularity"``. + + Raises: + ValueError: If ``phase`` is not a recognised phase name. + """ + if phase not in VALID_PHASES: + raise ValueError( + f"Unknown phase '{phase}'. Valid phases: {sorted(VALID_PHASES)}" + ) + cfg = PHASE_CONFIG[phase] + self.current_phase = phase + self.G = cfg["G"] + self.curvature = cfg["curvature"] + self.hawking_clamp = cfg["hawking_clamp"] + + # ------------------------------------------------------------------ + # Forward pass + # ------------------------------------------------------------------ + + def forward( + self, + x: torch.Tensor, + phase: Optional[str] = None, + ) -> Tuple[torch.Tensor, Dict[str, Any]]: + """ + Compute phase-shifted gravitational attention. + + Args: + x: ``[batch, seq_len, dim_model]`` input representations. + phase: Optional phase override for this forward pass only. + Does **not** permanently change :attr:`current_phase`. + + Returns: + A tuple ``(output, diagnostics)`` where: + + - ``output``: ``[batch, seq_len, dim_model]`` + - ``diagnostics``: dict with keys ``max_force``, + ``mean_force``, ``phase``, ``G``, ``curvature``. + """ + if phase is not None: + if phase not in VALID_PHASES: + raise ValueError( + f"Unknown phase '{phase}'. Valid phases: {sorted(VALID_PHASES)}" + ) + G = PHASE_CONFIG[phase]["G"] + curvature = PHASE_CONFIG[phase]["curvature"] + hawking_clamp = PHASE_CONFIG[phase]["hawking_clamp"] + active_phase = phase + else: + G = self.G + curvature = self.curvature + hawking_clamp = self.hawking_clamp + active_phase = self.current_phase + + batch, seq_len, _ = x.shape + + # Octonion pairwise distance matrix: [batch, seq_len, seq_len] + dist_matrix = self.oct_pos(x) + + # Apply curvature modulation to distances + if curvature != 0.0: + dist_matrix = dist_matrix * (1.0 + curvature * torch.sin(dist_matrix)) + dist_matrix = dist_matrix.clamp(min=0.0) + + # dist² + event_horizon + dist_sq = dist_matrix ** 2 + self.event_horizon # [batch, seq_len, seq_len] + + # Per-head gravitational force → aggregate across heads + v = self.v_proj(x) # [batch, seq_len, dim_model] + head_outputs = [] + all_forces = [] + + for h, mass_proj in enumerate(self.mass_projs): + x_h = x[..., h * self.head_dim:(h + 1) * self.head_dim] # [b, T, hd] + + # Learnable masses (positive via sigmoid) + masses = torch.sigmoid(mass_proj(x_h)) # [b, T, 1] + + # F_ij = G * m_i * m_j / (dist² + ε) + mass_i = masses # [b, T, 1] + mass_j = masses.transpose(-2, -1) # [b, 1, T] + force = G * (mass_i * mass_j) / dist_sq # [b, T, T] + + # Hawking clamp (Singularity safety valve) + force = torch.clamp(force, max=hawking_clamp) + all_forces.append(force) + + attn = F.softmax(force, dim=-1) # [b, T, T] + + # Each head attends over the full value projection, then slices + v_h = v[..., h * self.head_dim:(h + 1) * self.head_dim] # [b, T, hd] + head_out = attn @ v_h # [b, T, hd] + head_outputs.append(head_out) + + # Recombine heads + combined = torch.cat(head_outputs, dim=-1) # [b, T, dim_model] + output = self.out_proj(combined) + + # Diagnostics + stacked_forces = torch.stack(all_forces, dim=0) # [num_heads, b, T, T] + diagnostics: Dict[str, Any] = { + "max_force": stacked_forces.max().item(), + "mean_force": stacked_forces.mean().item(), + "phase": active_phase, + "G": G, + "curvature": curvature, + } + + return output, diagnostics diff --git a/tests/test_lgt.py b/tests/test_lgt.py index f724c5c..0253972 100644 --- a/tests/test_lgt.py +++ b/tests/test_lgt.py @@ -37,6 +37,7 @@ LedgerEntry, MirrorLayer, LGTVictorOSModule, + MorphicVictorAgent, victoros_module, VictorOSBaseModule, ) @@ -49,6 +50,16 @@ ) from tri_model import TriModelTransformer, CrossGravitationalFusion from export_edge_model import build_model, export_edge_model, PRESETS +from octonion_pos_embedding import ( + OctonionEmbedding, + octonion_distance, + GravitationalOctonionPosition, +) +from polymorphic_attention_orchestrator import ( + PHASE_CONFIG, + PolymorphicAttentionOrchestrator, +) +from training_containment import MorphicContainmentConfig, MorphicContainmentProtocol # =========================================================================== @@ -700,3 +711,306 @@ def test_benchmark_edge_preset(self): assert "memory" in result assert result["latency"]["mean_ms"] > 0 assert result["throughput"]["inferences_per_sec"] > 0 + + +# =========================================================================== +# octonion_pos_embedding +# =========================================================================== + +class TestOctonionEmbedding: + def test_output_shape(self): + emb = OctonionEmbedding(dim_model=64, max_len=128) + x = torch.randn(2, 10, 64) + pe = emb(x) + assert pe.shape == (1, 10, 64) + + def test_device_consistency(self): + emb = OctonionEmbedding(dim_model=32, max_len=64) + x = torch.randn(1, 8, 32) + pe = emb(x) + assert pe.device == x.device + + def test_no_nan(self): + emb = OctonionEmbedding(dim_model=64, max_len=128) + x = torch.randn(2, 16, 64) + pe = emb(x) + assert not torch.isnan(pe).any() + + def test_dim_not_divisible_by_8(self): + # dim_model=33 is not divisible by 8 but should still work + emb = OctonionEmbedding(dim_model=33, max_len=16) + x = torch.randn(1, 4, 33) + pe = emb(x) + assert pe.shape == (1, 4, 33) + + +class TestOctonionDistance: + def test_zero_distance_for_identical_vectors(self): + v = torch.randn(3, 8) + dist = octonion_distance(v, v) + # Should be near zero (only epsilon keeps it > 0) + assert (dist < 1e-3).all() + + def test_non_negative(self): + a = torch.randn(4, 8) + b = torch.randn(4, 8) + dist = octonion_distance(a, b) + assert (dist >= 0).all() + + def test_output_shape_keepdim(self): + a = torch.randn(2, 5, 1, 16) + b = torch.randn(2, 1, 5, 16) + dist = octonion_distance(a, b) + assert dist.shape == (2, 5, 5, 1) + + +class TestGravitationalOctonionPosition: + def test_output_shape(self): + gop = GravitationalOctonionPosition(dim_model=64, max_len=128) + x = torch.randn(2, 10, 64) + dist = gop(x) + assert dist.shape == (2, 10, 10) + + def test_non_negative(self): + gop = GravitationalOctonionPosition(dim_model=32, max_len=64) + x = torch.randn(1, 6, 32) + dist = gop(x) + assert (dist >= 0).all() + + def test_no_nan(self): + gop = GravitationalOctonionPosition(dim_model=64, max_len=128) + x = torch.randn(2, 8, 64) + dist = gop(x) + assert not torch.isnan(dist).any() + + +# =========================================================================== +# polymorphic_attention_orchestrator +# =========================================================================== + +class TestPolymorphicAttentionOrchestrator: + def _make_orch(self, dim=64, heads=4): + return PolymorphicAttentionOrchestrator(dim_model=dim, num_heads=heads, max_len=128) + + def test_output_shape(self): + orch = self._make_orch() + x = torch.randn(2, 8, 64) + out, diag = orch(x) + assert out.shape == (2, 8, 64) + + def test_diagnostics_keys(self): + orch = self._make_orch() + x = torch.randn(1, 6, 64) + _, diag = orch(x) + assert "max_force" in diag + assert "mean_force" in diag + assert "phase" in diag + assert "G" in diag + assert "curvature" in diag + + def test_morph_changes_phase(self): + orch = self._make_orch() + orch.morph("singularity") + assert orch.current_phase == "singularity" + assert orch.G == PHASE_CONFIG["singularity"]["G"] + + def test_all_phases_run(self): + orch = self._make_orch() + x = torch.randn(1, 4, 64) + for phase in ["solid", "fluid", "gas", "singularity"]: + out, diag = orch(x, phase=phase) + assert out.shape == (1, 4, 64) + assert diag["phase"] == phase + + def test_phase_override_does_not_mutate_current_phase(self): + orch = self._make_orch() + orch.morph("solid") + x = torch.randn(1, 4, 64) + orch(x, phase="gas") + # current_phase should still be "solid" + assert orch.current_phase == "solid" + + def test_invalid_phase_raises(self): + orch = self._make_orch() + with pytest.raises(ValueError): + orch.morph("plasma") + + def test_dim_not_divisible_raises(self): + with pytest.raises(ValueError): + PolymorphicAttentionOrchestrator(dim_model=65, num_heads=4) + + def test_no_nan_output(self): + orch = self._make_orch() + x = torch.randn(2, 8, 64) + out, _ = orch(x, phase="singularity") + assert not torch.isnan(out).any() + + def test_gradient_flow(self): + orch = self._make_orch() + x = torch.randn(1, 4, 64, requires_grad=True) + out, _ = orch(x) + out.sum().backward() + assert x.grad is not None + + def test_phase_config_completeness(self): + for phase in ["solid", "fluid", "gas", "singularity"]: + assert phase in PHASE_CONFIG + cfg = PHASE_CONFIG[phase] + assert "G" in cfg + assert "curvature" in cfg + assert "hawking_clamp" in cfg + + +# =========================================================================== +# training_containment +# =========================================================================== + +class TestMorphicContainmentProtocol: + def _make_protocol(self, ledger=None): + model = nn.Linear(16, 16) + config = MorphicContainmentConfig( + max_grad_norm=1.0, + max_attention_force=10.0, + bekenstein_lambda=1e-4, + min_stability=0.2, + ) + return MorphicContainmentProtocol(model=model, ledger=ledger, config=config) + + def _make_loss_with_grad(self, model): + """Helper: produce a backward-ed loss so grads exist.""" + x = torch.randn(1, 16) + loss = model(x).sum() + loss.backward() + return loss.detach() + + def test_step_returns_true_on_normal_run(self): + proto = self._make_protocol() + loss = self._make_loss_with_grad(proto.model) + result = proto.step(loss, {"max_force": 1.0, "phase": "fluid", "stability": 0.9}) + assert result is True + + def test_step_returns_false_on_low_stability(self): + proto = self._make_protocol() + loss = self._make_loss_with_grad(proto.model) + result = proto.step(loss, {"max_force": 1.0, "phase": "fluid", "stability": 0.1}) + assert result is False + + def test_hawking_radiation_dampens_gradients(self): + proto = self._make_protocol() + loss = self._make_loss_with_grad(proto.model) + # Record grad norm before + before = sum(p.grad.norm().item() for p in proto.model.parameters() if p.grad is not None) + # Trigger Hawking radiation (force >> max_attention_force) + proto.step(loss, {"max_force": 1000.0, "phase": "singularity", "stability": 0.9}) + after = sum(p.grad.norm().item() for p in proto.model.parameters() if p.grad is not None) + assert after <= before + 1e-6 # gradients were scaled down (then clipped) + + def test_bekenstein_penalty_shape(self): + proto = self._make_protocol() + attn = torch.softmax(torch.randn(2, 8, 8), dim=-1) + penalty = proto.apply_bekenstein_penalty(attn) + assert penalty.shape == () + assert penalty.item() >= 0 + + def test_bekenstein_penalty_differentiable(self): + proto = self._make_protocol() + attn = torch.softmax(torch.randn(2, 8, 8), dim=-1).requires_grad_(True) + penalty = proto.apply_bekenstein_penalty(attn) + penalty.backward() + assert attn.grad is not None + + def test_ledger_logging(self): + ledger = Ledger(agent_id="test_containment") + proto = self._make_protocol(ledger=ledger) + loss = self._make_loss_with_grad(proto.model) + proto.step(loss, {"max_force": 1000.0, "phase": "singularity", "stability": 0.9}) + events = [e.event for e in ledger.entries()] + assert "containment_event" in events + + def test_default_config_used_when_none(self): + model = nn.Linear(4, 4) + proto = MorphicContainmentProtocol(model=model) + assert proto.config is not None + assert proto.config.max_grad_norm > 0 + + def test_step_no_diagnostics(self): + proto = self._make_protocol() + loss = self._make_loss_with_grad(proto.model) + # Should not raise even without diagnostics + result = proto.step(loss) + assert isinstance(result, bool) + + +# =========================================================================== +# MorphicVictorAgent +# =========================================================================== + +class TestMorphicVictorAgent: + def _make_agent(self, initial_phase="fluid"): + lgt = LightweightGravitationalTransformer( + vocab_size=100, dim_model=64, num_layers=2, num_heads=4 + ) + orch = PolymorphicAttentionOrchestrator(dim_model=64, num_heads=4, max_len=128) + return MorphicVictorAgent( + model=lgt, + orchestrator=orch, + agent_id="test_morphic_agent", + initial_phase=initial_phase, + ) + + def test_initial_phase(self): + agent = self._make_agent("solid") + assert agent.current_phase == "solid" + assert agent.orchestrator.current_phase == "solid" + + def test_process_morphic_output_shape(self): + agent = self._make_agent() + x = torch.randint(0, 100, (1, 8)) + result = agent.process_morphic(x) + assert result["output"].shape == (1, 8, 100) + assert "phase" in result + + def test_determine_phase_low_stability(self): + agent = self._make_agent() + phase = agent.determine_phase(stability=0.2) + assert phase == "gas" + + def test_determine_phase_high_stability(self): + agent = self._make_agent() + phase = agent.determine_phase(stability=0.95) + assert phase == "solid" + + def test_determine_phase_singularity(self): + agent = self._make_agent() + phase = agent.determine_phase(stability=0.85, task_complexity=1.0) + assert phase == "singularity" + + def test_phase_shift_logged_to_ledger(self): + agent = self._make_agent(initial_phase="solid") + # Drive stability low by feeding high-force diagnostics through + # the MirrorLayer's public callback interface + for _ in range(25): + agent.mirror_layer(0, {"mean_force": 1000.0}) + x = torch.randint(0, 100, (1, 4)) + agent.process_morphic(x) + events = [e.event for e in agent.ledger.entries()] + assert "phase_shift" in events + + def test_apply_phase_updates_orchestrator(self): + agent = self._make_agent("fluid") + agent.apply_phase("singularity") + assert agent.orchestrator.current_phase == "singularity" + assert agent.orchestrator.G == PHASE_CONFIG["singularity"]["G"] + + def test_process_morphic_returns_stability(self): + agent = self._make_agent() + x = torch.randint(0, 100, (1, 6)) + result = agent.process_morphic(x) + assert "stability" in result + assert 0.0 <= result["stability"] <= 1.0 + + def test_no_nan_output(self): + agent = self._make_agent() + x = torch.randint(0, 100, (1, 8)) + result = agent.process_morphic(x) + assert not torch.isnan(result["output"]).any() diff --git a/training_containment.py b/training_containment.py new file mode 100644 index 0000000..ee35076 --- /dev/null +++ b/training_containment.py @@ -0,0 +1,198 @@ +""" +Morphic Containment Protocol +Physics-aware stabiliser for high-gravity (Singularity) training phases. + +When the gravitational constant ``G`` spikes to ``50.0`` during the +Singularity phase, standard back-propagation can produce very large or NaN +gradients because attention weights become extremely peaked. + +The ``MorphicContainmentProtocol`` wraps each optimiser step and: + +1. **Bekenstein Entropy penalty** – penalises *information collapse* (near-zero + Shannon entropy in the attention distribution), preventing the model from + collapsing into a permanently singular state. +2. **Hawking Radiation (force-aware gradient damping)** – when the recorded + ``max_force`` from the forward pass exceeds ``max_attention_force``, all + parameter gradients are scaled down proportionally. +3. **Global gradient clipping** – a hard cap on the total gradient L2-norm. +4. **Stability check** – returns ``False`` to signal the training loop to + halt / reset when the stability score drops below ``min_stability``. +5. **Ledger integration** – every containment event (damping, breach) is + logged to the VictorOS :class:`~victorcos_module.Ledger`. +""" + +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + +import torch +import torch.nn as nn + + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +@dataclass +class MorphicContainmentConfig: + """ + Configuration for the :class:`MorphicContainmentProtocol`. + + Attributes: + max_grad_norm: Maximum L2-norm of gradients; triggers global clipping. + max_attention_force: Cap for the maximum gravitational force recorded + by the forward pass. Exceeding this triggers Hawking-Radiation + gradient damping. + bekenstein_lambda: Regularisation weight for the Bekenstein Entropy + penalty. A higher value penalises information collapse more + aggressively. + min_stability: Emergency shutdown threshold. When the stability + score falls below this value, :meth:`MorphicContainmentProtocol.step` + returns ``False`` to signal the caller to halt or reset. + entropy_target: Target mean Shannon entropy for the attention + distribution. The penalty is zero when the distribution entropy + equals this value and grows as entropy deviates downward. + """ + max_grad_norm: float = 1.0 + max_attention_force: float = 100.0 + bekenstein_lambda: float = 1e-4 + min_stability: float = 0.2 + entropy_target: float = 1.0 + + +# --------------------------------------------------------------------------- +# Protocol +# --------------------------------------------------------------------------- + +class MorphicContainmentProtocol: + """ + Ensures the Polymorphic Attention Orchestrator does not implode during + high-gravity (Singularity) phases. + + Args: + model: The model whose parameters are guarded. + ledger: Optional VictorOS :class:`~victorcos_module.Ledger` for + logging containment events. + config: A :class:`MorphicContainmentConfig` instance. + """ + + def __init__( + self, + model: nn.Module, + ledger: Optional[Any] = None, # victorcos_module.Ledger + config: Optional[MorphicContainmentConfig] = None, + ): + self.model = model + self.ledger = ledger + self.config = config if config is not None else MorphicContainmentConfig() + + # ------------------------------------------------------------------ + # Bekenstein entropy penalty + # ------------------------------------------------------------------ + + def apply_bekenstein_penalty(self, attention_weights: torch.Tensor) -> torch.Tensor: + """ + Compute the Bekenstein Entropy penalty for a given attention distribution. + + The Bekenstein Bound states that information in a spatial region is + limited by its surface area. Here we penalise *information collapse*: + if the attention distribution has very low Shannon entropy, the model + is "seeing" only one or two tokens (a singularity), which corresponds + to extreme information compression. + + The penalty is:: + + λ · (1 / (H + ε))² + + where ``H`` is the mean Shannon entropy of the attention distribution + and ``ε`` prevents division-by-zero. + + Args: + attention_weights: Attention distribution tensor of shape + ``[..., seq_len]`` (last dimension must sum to 1, i.e. after + softmax). + + Returns: + Scalar penalty tensor (differentiable). + """ + entropy = -torch.sum( + attention_weights * torch.log(attention_weights.clamp(min=1e-9)), + dim=-1, + ) # [...] – one scalar per query position + mean_entropy = entropy.mean() + penalty = self.config.bekenstein_lambda * ( + 1.0 / (mean_entropy + 1e-6) + ) ** 2 + return penalty + + # ------------------------------------------------------------------ + # Per-step containment hook + # ------------------------------------------------------------------ + + def step( + self, + loss: torch.Tensor, + model_diagnostics: Optional[Dict[str, Any]] = None, + ) -> bool: + """ + Apply all containment checks for the current training step. + + Call this **after** ``loss.backward()`` and **before** + ``optimizer.step()``. + + Args: + loss: The scalar training loss (used only for Ledger logging). + model_diagnostics: Optional dict from the forward pass containing + at least ``"max_force"`` and ``"phase"`` keys (as returned by + :meth:`~polymorphic_attention_orchestrator.PolymorphicAttentionOrchestrator.forward`). + + Returns: + ``True`` if training should continue; ``False`` if the caller + should halt or reset (stability below ``min_stability``). + """ + diagnostics = model_diagnostics or {} + + # 1. Evaluate force intensity and apply Hawking Radiation if needed + max_f = float(diagnostics.get("max_force", 0.0)) + phase = str(diagnostics.get("phase", "fluid")) + + if max_f > self.config.max_attention_force: + damping_factor = self.config.max_attention_force / max_f + for p in self.model.parameters(): + if p.grad is not None: + p.grad.data.mul_(damping_factor) + + self._log("containment_event", { + "type": "hawking_radiation", + "phase": phase, + "max_force": max_f, + "damping_factor": damping_factor, + "loss": loss.item(), + }) + + # 2. Global gradient clipping + torch.nn.utils.clip_grad_norm_( + self.model.parameters(), self.config.max_grad_norm + ) + + # 3. Stability check + stability = float(diagnostics.get("stability", 1.0)) + if stability < self.config.min_stability: + self._log("containment_breach", { + "type": "stability_collapse", + "stability": stability, + "threshold": self.config.min_stability, + "phase": phase, + "loss": loss.item(), + }) + return False # Signal caller to halt / reset + + return True + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _log(self, event: str, payload: Dict[str, Any]) -> None: + """Forward a log entry to the attached Ledger (if any).""" + if self.ledger is not None: + self.ledger.log(event, payload) diff --git a/victorcos_module.py b/victorcos_module.py index 3eedf05..288f4e2 100644 --- a/victorcos_module.py +++ b/victorcos_module.py @@ -457,3 +457,182 @@ def propose_architecture_change( def _on_correction(self, layer_idx: int, correction_type: str) -> None: self._corrections.append({"layer": layer_idx, "type": correction_type, "t": time.time()}) + + +# --------------------------------------------------------------------------- +# Morphic Victor Agent +# --------------------------------------------------------------------------- + +@victoros_module( + name="morphic_victor_agi", + version="1.0.0", + containment_native=True, + description=( + "Active Intelligence with Phase-Shifting Gravitational Attention. " + "Autonomously shifts cognitive phases (Solid/Fluid/Gas/Singularity) " + "based on internal stability scores and task complexity." + ), +) +class MorphicVictorAgent(LGTVictorOSModule): + """ + An autonomous VictorOS agent that shifts its gravitational attention + phase based on internal stability scores. + + The agent wraps a :class:`~polymorphic_attention_orchestrator.PolymorphicAttentionOrchestrator` + alongside the standard LGT model. On each :meth:`process_morphic` call + it: + + 1. Reads the current stability score from the Mirror Layer. + 2. Determines the appropriate cognitive phase via :meth:`determine_phase`. + 3. If the phase has changed, calls :meth:`apply_phase` and logs the + transition to the VictorOS Ledger. + 4. Runs the standard LGT forward pass with full Mirror Layer + introspection. + + Args: + model: A pre-constructed ``LightweightGravitationalTransformer``. + orchestrator: A :class:`~polymorphic_attention_orchestrator.PolymorphicAttentionOrchestrator` + instance that performs morphic gravitational attention. + agent_id: Unique identifier used in Ledger entries. + persist_path: Optional path to flush Ledger entries to disk. + max_force_threshold: Mirror Layer containment threshold. + initial_phase: Starting cognitive phase (``"fluid"`` by default). + """ + + # Phase → (G, curvature) mapping kept here for model-level updates + _PHASE_MAP: Dict[str, Dict[str, float]] = { + "solid": {"G": 0.5, "curvature": 0.0}, + "fluid": {"G": 1.0, "curvature": 0.15}, + "gas": {"G": 0.1, "curvature": 0.8}, + "singularity": {"G": 50.0, "curvature": -0.1}, + } + + def __init__( + self, + model: nn.Module, + orchestrator: "Any", # PolymorphicAttentionOrchestrator + agent_id: str = "morphic_victor_agent", + persist_path: Optional[str] = None, + max_force_threshold: float = 40.0, + initial_phase: str = "fluid", + ): + super().__init__( + model=model, + agent_id=agent_id, + persist_path=persist_path, + max_force_threshold=max_force_threshold, + ) + self.orchestrator = orchestrator + self.current_phase: str = initial_phase + # Sync orchestrator to the initial phase + self.orchestrator.morph(initial_phase) + + # ------------------------------------------------------------------ + # Phase management + # ------------------------------------------------------------------ + + def determine_phase( + self, + stability: float, + task_complexity: Optional[float] = None, + ) -> str: + """ + Heuristic for selecting the appropriate cognitive phase. + + Rules: + - ``stability < 0.4`` → **Gas** (diffuse exploration to recover). + - ``stability > 0.9`` → **Solid** (precise, low-entropy reasoning). + - Otherwise → **Fluid** (balanced default). + - Callers may override by passing ``task_complexity >= 1.0`` to + request the **Singularity** phase for extreme focus tasks. + + Args: + stability: Current rolling stability score from the Mirror Layer + (range ``[0, 1]``). + task_complexity: Optional scalar hint from the caller. When + ``>= 1.0`` and stability is high, returns ``"singularity"``. + + Returns: + Phase name string. + """ + if task_complexity is not None and task_complexity >= 1.0 and stability > 0.7: + return "singularity" + if stability < 0.4: + return "gas" + if stability > 0.9: + return "solid" + return "fluid" + + def apply_phase(self, phase: str) -> None: + """ + Apply ``phase`` to both the orchestrator and the underlying LGT model. + + Updates every attention head's G value and the model's positional + embedding curvature to match the phase configuration. + + Args: + phase: Target cognitive phase name. + """ + cfg = self._PHASE_MAP.get(phase, self._PHASE_MAP["fluid"]) + self.orchestrator.morph(phase) + + # Propagate physics constants into the underlying LGT model blocks + for block in getattr(self.model, "blocks", []): + attn = getattr(block, "attn", None) + if attn is not None: + for head in getattr(attn, "heads", []): + if hasattr(head, "G") and isinstance(head.G, nn.Parameter): + with torch.no_grad(): + head.G.fill_(cfg["G"]) + pos_emb = getattr(self.model, "pos_embedding", None) + if pos_emb is not None and hasattr(pos_emb, "curvature"): + with torch.no_grad(): + curvature_param = pos_emb.curvature + if isinstance(curvature_param, nn.Parameter): + curvature_param.fill_(cfg["curvature"]) + + # ------------------------------------------------------------------ + # Morphic processing + # ------------------------------------------------------------------ + + def process_morphic( + self, + x: torch.Tensor, + task_complexity: Optional[float] = None, + ) -> Dict[str, Any]: + """ + Run a full morphic inference pass with automatic phase shifting. + + Workflow: + + 1. Read current stability from the Mirror Layer. + 2. Determine the desired phase via :meth:`determine_phase`. + 3. If phase changed: call :meth:`apply_phase` and log the transition. + 4. Run the standard LGT inference pass via :meth:`process`. + + Args: + x: Input token-index tensor ``[batch, seq_len]``. + task_complexity: Optional complexity hint forwarded to + :meth:`determine_phase`. + + Returns: + The dict returned by :meth:`process`, augmented with a + ``"phase"`` key containing the active phase name. + """ + stability = self.mirror_layer.stability_score() + new_phase = self.determine_phase(stability, task_complexity) + + if new_phase != self.current_phase: + old_phase = self.current_phase + self.apply_phase(new_phase) + self.current_phase = new_phase + self.ledger.log("phase_shift", { + "from": old_phase, + "to": new_phase, + "stability": stability, + "task_complexity": task_complexity, + }) + + result = self.process(x) + result["phase"] = self.current_phase + return result