Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/views/onboarding/step-complete.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function getOnboardingIssues(): OnboardingIssueDef[] {
"",
"**Steps:**",
"1. Go to **Settings** in the sidebar",
"2. Under **Repositories**, add a GitHub repo URL",
"2. Under **Repositories**, add a Git repository URL",
"3. The agent daemon will sync the repo locally",
"",
"Once connected, your agents can clone, branch, and push code as part of any task.",
Expand Down
4 changes: 2 additions & 2 deletions packages/views/settings/components/repositories-tab.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export function RepositoriesTab() {
<Card>
<CardContent className="space-y-3">
<p className="text-xs text-muted-foreground">
GitHub repositories associated with this workspace. Agents use these to clone and work on code.
Git repositories associated with this workspace. Agents use these to clone and work on code.
</p>

{repos.map((repo, index) => (
Expand All @@ -78,7 +78,7 @@ export function RepositoriesTab() {
value={repo.url}
onChange={(e) => handleRepoChange(index, "url", e.target.value)}
disabled={!canManageWorkspace}
placeholder="https://github.com/org/repo"
placeholder="https://git.example.com/org/repo.git"
className="text-sm"
/>
<Input
Expand Down
1 change: 1 addition & 0 deletions server/cmd/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Post("/register", h.DaemonRegister)
r.Post("/deregister", h.DaemonDeregister)
r.Post("/heartbeat", h.DaemonHeartbeat)
r.Get("/workspaces/{workspaceId}/repos", h.GetDaemonWorkspaceRepos)

r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime)
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
Expand Down
19 changes: 17 additions & 2 deletions server/internal/daemon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,9 @@ func (c *Client) Deregister(ctx context.Context, runtimeIDs []string) error {

// RegisterResponse holds the server's response to a daemon registration.
type RegisterResponse struct {
Runtimes []Runtime `json:"runtimes"`
Repos []RepoData `json:"repos"`
Runtimes []Runtime `json:"runtimes"`
Repos []RepoData `json:"repos"`
ReposVersion string `json:"repos_version"`
}

func (c *Client) Register(ctx context.Context, req map[string]any) (*RegisterResponse, error) {
Expand All @@ -233,6 +234,20 @@ func (c *Client) Register(ctx context.Context, req map[string]any) (*RegisterRes
return &resp, nil
}

type WorkspaceReposResponse struct {
WorkspaceID string `json:"workspace_id"`
Repos []RepoData `json:"repos"`
ReposVersion string `json:"repos_version"`
}

func (c *Client) GetWorkspaceRepos(ctx context.Context, workspaceID string) (*WorkspaceReposResponse, error) {
var resp WorkspaceReposResponse
if err := c.getJSON(ctx, fmt.Sprintf("/api/daemon/workspaces/%s/repos", workspaceID), &resp); err != nil {
return nil, err
}
return &resp, nil
}

func (c *Client) postJSON(ctx context.Context, path string, reqBody any, respBody any) error {
var body io.Reader
if reqBody != nil {
Expand Down
152 changes: 141 additions & 11 deletions server/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package daemon

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
Expand All @@ -18,10 +19,19 @@ import (
"github.com/multica-ai/multica/server/pkg/agent"
)

// ErrRepoNotConfigured is returned by ensureRepoReady when the requested repo
// URL is not present in the workspace's repo configuration after a fresh
// server refresh.
var ErrRepoNotConfigured = errors.New("repo is not configured for this workspace")

// workspaceState tracks registered runtimes for a single workspace.
type workspaceState struct {
workspaceID string
runtimeIDs []string
workspaceID string
runtimeIDs []string
reposVersion string
allowedRepoURLs map[string]struct{}
lastRepoSyncErr string
repoRefreshMu sync.Mutex
}

// Daemon is the local agent runtime that polls for and executes tasks.
Expand All @@ -34,7 +44,7 @@ type Daemon struct {
mu sync.Mutex
workspaces map[string]*workspaceState
runtimeIndex map[string]Runtime // runtimeID -> Runtime for provider lookups
reloading sync.Mutex // prevents concurrent reloadWorkspaces
reloading sync.Mutex // prevents concurrent workspace syncs

cancelFunc context.CancelFunc // set by Run(); called by triggerRestart
restartBinary string // non-empty after a successful update; path to the new binary
Expand Down Expand Up @@ -145,7 +155,6 @@ func (d *Daemon) resolveAuth() error {
return nil
}


// allRuntimeIDs returns all runtime IDs across all watched workspaces.
func (d *Daemon) allRuntimeIDs() []string {
d.mu.Lock()
Expand Down Expand Up @@ -224,6 +233,131 @@ func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID s
return resp, nil
}

func newWorkspaceState(workspaceID string, runtimeIDs []string, reposVersion string, repos []RepoData) *workspaceState {
return &workspaceState{
workspaceID: workspaceID,
runtimeIDs: runtimeIDs,
reposVersion: reposVersion,
allowedRepoURLs: repoAllowlist(repos),
}
}

func repoAllowlist(repos []RepoData) map[string]struct{} {
allowed := make(map[string]struct{}, len(repos))
for _, repo := range repos {
if repo.URL == "" {
continue
}
allowed[repo.URL] = struct{}{}
}
return allowed
}

func (d *Daemon) setWorkspaceRepoSyncError(workspaceID, syncErr string) {
d.mu.Lock()
defer d.mu.Unlock()
if ws, ok := d.workspaces[workspaceID]; ok {
ws.lastRepoSyncErr = syncErr
}
}

func (d *Daemon) workspaceRepoAllowed(workspaceID, repoURL string) bool {
d.mu.Lock()
defer d.mu.Unlock()
ws, ok := d.workspaces[workspaceID]
if !ok {
return false
}
_, allowed := ws.allowedRepoURLs[repoURL]
return allowed
}

func (d *Daemon) workspaceLastRepoSyncErr(workspaceID string) string {
d.mu.Lock()
defer d.mu.Unlock()
ws, ok := d.workspaces[workspaceID]
if !ok {
return ""
}
return ws.lastRepoSyncErr
}

func (d *Daemon) syncWorkspaceRepos(workspaceID string, repos []RepoData) {
if d.repoCache == nil {
return
}
if err := d.repoCache.Sync(workspaceID, repoDataToInfo(repos)); err != nil {
d.setWorkspaceRepoSyncError(workspaceID, err.Error())
d.logger.Warn("repo cache sync failed", "workspace_id", workspaceID, "error", err)
return
}
d.setWorkspaceRepoSyncError(workspaceID, "")
}

func (d *Daemon) refreshWorkspaceRepos(ctx context.Context, workspaceID string) (*WorkspaceReposResponse, error) {
refreshCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

resp, err := d.client.GetWorkspaceRepos(refreshCtx, workspaceID)
if err != nil {
return nil, err
}

d.mu.Lock()
if ws, ok := d.workspaces[workspaceID]; ok {
ws.reposVersion = resp.ReposVersion
ws.allowedRepoURLs = repoAllowlist(resp.Repos)
}
d.mu.Unlock()

return resp, nil
}

func (d *Daemon) ensureRepoReady(ctx context.Context, workspaceID, repoURL string) error {
if d.repoCache == nil {
return fmt.Errorf("repo cache not initialized")
}

d.mu.Lock()
ws, ok := d.workspaces[workspaceID]
d.mu.Unlock()
if !ok {
return fmt.Errorf("workspace is not watched by this daemon: %s", workspaceID)
}

if d.workspaceRepoAllowed(workspaceID, repoURL) && d.repoCache.Lookup(workspaceID, repoURL) != "" {
return nil
}

ws.repoRefreshMu.Lock()
defer ws.repoRefreshMu.Unlock()

if d.workspaceRepoAllowed(workspaceID, repoURL) && d.repoCache.Lookup(workspaceID, repoURL) != "" {
return nil
}

resp, err := d.refreshWorkspaceRepos(ctx, workspaceID)
if err != nil {
return fmt.Errorf("refresh workspace repos: %w", err)
}

if !d.workspaceRepoAllowed(workspaceID, repoURL) {
return ErrRepoNotConfigured
}

d.syncWorkspaceRepos(workspaceID, resp.Repos)

if d.repoCache.Lookup(workspaceID, repoURL) != "" {
return nil
}

if syncErr := d.workspaceLastRepoSyncErr(workspaceID); syncErr != "" {
return fmt.Errorf("repo is configured but not synced: %s", syncErr)
}

return fmt.Errorf("repo is configured but not synced")
}

// workspaceSyncLoop periodically fetches the user's workspaces from the API
// and registers runtimes for any new ones.
func (d *Daemon) workspaceSyncLoop(ctx context.Context) {
Expand Down Expand Up @@ -285,21 +419,17 @@ func (d *Daemon) syncWorkspacesFromAPI(ctx context.Context) error {
d.logger.Info("registered runtime", "workspace_id", id, "runtime_id", rt.ID, "provider", rt.Provider)
}
d.mu.Lock()
d.workspaces[id] = &workspaceState{workspaceID: id, runtimeIDs: runtimeIDs}
d.workspaces[id] = newWorkspaceState(id, runtimeIDs, resp.ReposVersion, resp.Repos)
for _, rt := range resp.Runtimes {
d.runtimeIndex[rt.ID] = rt
}
d.mu.Unlock()

if d.repoCache != nil && len(resp.Repos) > 0 {
go func(wsID string, repos []RepoData) {
if err := d.repoCache.Sync(wsID, repoDataToInfo(repos)); err != nil {
d.logger.Warn("repo cache sync failed", "workspace_id", wsID, "error", err)
}
}(id, resp.Repos)
go d.syncWorkspaceRepos(id, resp.Repos)
}

d.logger.Info("watching workspace", "workspace_id", id, "name", name, "runtimes", len(resp.Runtimes))
d.logger.Info("watching workspace", "workspace_id", id, "name", name, "runtimes", len(resp.Runtimes), "repos", len(resp.Repos))
registered++
}

Expand Down
Loading
Loading