Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 fix: eliminate zombie jobs #96

Merged
merged 10 commits into from
Dec 16, 2024
40 changes: 21 additions & 19 deletions client/network_chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

type ChainBaseClient struct {
baseURL string
httpClient *http.Client
baseURL string
chainEndpoint string
httpClient *http.Client
}

type ChainBaseDatasetListItem struct {
Expand All @@ -20,23 +21,23 @@ type ChainBaseDatasetListItem struct {
}

type ChainResponse struct {
Code int `json:"code"`
Message string `json:"message"`
GraphData []struct {
Chain struct {
ID string `json:"id"`
Name string `json:"name"`
DatabaseName string `json:"databaseName"`
DataDictionary map[string][]TableInfo `json:"dataDictionary"`
} `json:"chain"`
} `json:"graphData"`
TransactionLogs *[]TransactionLog `json:"transactionLogs,omitempty"`
Code int `json:"code"`
Message string `json:"message"`
GraphData []struct {
Chain struct {
ID string `json:"id"`
Name string `json:"name"`
DatabaseName string `json:"databaseName"`
DataDictionary map[string][]TableInfo `json:"dataDictionary"`
} `json:"chain"`
} `json:"graphData"`
TransactionLogs *[]TransactionLog `json:"transactionLogs,omitempty"`
}

type TransactionLog struct {
Timestamp string `json:"timestamp"`
Action string `json:"action"`
Details string `json:"details"`
Timestamp string `json:"timestamp"`
Action string `json:"action"`
Details string `json:"details"`
}

type TableInfo struct {
Expand All @@ -45,17 +46,18 @@ type TableInfo struct {
Description string `json:"description"`
}

func NewChainBaseClient(baseURL string) *ChainBaseClient {
func NewChainBaseClient(baseURL string, chainEndpoint string) *ChainBaseClient {
return &ChainBaseClient{
baseURL: baseURL,
baseURL: baseURL,
chainEndpoint: chainEndpoint,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
}
}

func (c *ChainBaseClient) GetChainBaseDatasetList() ([]*ChainBaseDatasetListItem, error) {
url := fmt.Sprintf("%s/api/v1/metadata/network_chains", c.baseURL)
url := fmt.Sprintf("%s%s", c.baseURL, c.chainEndpoint)

resp, err := c.httpClient.Get(url)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion client/network_chains_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestGetChainBaseDatasetList(t *testing.T) {
}))
defer mockServer.Close()

client := NewChainBaseClient(mockServer.URL)
client := NewChainBaseClient(mockServer.URL, "/api/v1/metadata/network_chains")

chains, err := client.GetChainBaseDatasetList()

Expand Down
31 changes: 26 additions & 5 deletions commands/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package commands

import (
"fmt"
"log"
"manuscript-core/pkg"
"os"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -47,8 +49,9 @@ You'll be prompted to select:
}

var jobListCmd = &cobra.Command{
Use: "list",
Short: "List all manuscript jobs",
Use: "list [directory]",
Aliases: []string{"ls"},
Short: "List all manuscript jobs",
Long: `📋 View all running manuscript jobs

Each job shows:
Expand All @@ -60,10 +63,28 @@ Each job shows:
Status indicators:
🟢 Running - Job is active and processing data
🟡 Warning - Job needs attention
⚪️ Other - Various other states`,
Example: `>> manuscript-cli list`,
🔴 Failed - Job encountered an error
⚫ Stopped - Job was stopped

Usage:
- Run without arguments to check default directory
- Specify a directory path to check manuscripts in that location`,
Example: `>> manuscript-cli ls
>> manuscript-cli list /path/to/manuscripts`,
Args: cobra.MaximumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ListJobs()
var dir string
// if no args, use default manuscript directory
if len(args) == 0 {
config, err := pkg.LoadConfig(manuscriptConfig)
if err != nil {
log.Fatalf("Error: Failed to load config: %v", err)
}
dir = fmt.Sprintf("%s/%s", config.BaseDir, manuscriptBaseName)
} else {
dir = args[0] // use specified directory if user input
}
ListJobs(dir)
},
}

Expand Down
68 changes: 58 additions & 10 deletions commands/init_manuscript.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
)

const (
manuscriptBaseName = "manuscript"
manuscriptBaseDir = "$HOME"
manuscriptConfig = "$HOME/.manuscript_config.ini"
networkChainURL = "https://api.chainbase.com"
defaultDatabase = "zkevm"
defaultTable = "blocks"
defaultSink = "postgres"
graphQLImage = "repository.chainbase.com/manuscript-node/graphql-engine:latest"
graphQLARMImage = "repository.chainbase.com/manuscript-node/graphql-engine-arm64:latest"
manuscriptBaseName = "manuscript"
manuscriptBaseDir = "$HOME"
manuscriptConfig = "$HOME/.manuscript_config.ini"
networkChainURL = "https://api.chainbase.com"
networkChainEndpoint = "/api/v1/metadata/network_chains"
defaultDatabase = "zkevm"
defaultTable = "blocks"
defaultSink = "postgres"
graphQLImage = "repository.chainbase.com/manuscript-node/graphql-engine:latest"
graphQLARMImage = "repository.chainbase.com/manuscript-node/graphql-engine-arm64:latest"
)

func executeInitManuscript(ms pkg.Manuscript) {
Expand Down Expand Up @@ -91,6 +92,10 @@ func InitManuscript() {
fmt.Printf("\033[32m✓ Manuscript base directory set to: %s\033[0m\n\n", manuscriptDir)

manuscriptName := promptInput("🏂 2. Enter your manuscript name (default is demo)\u001B[0m: ", "demo")
if err := checkExistingManuscript(manuscriptName); err != nil {
logErrorAndReturn(fmt.Sprintf("Cannot create manuscript: %v", err), nil)
return
}
if checkDockerContainerExists(manuscriptName) {
logErrorAndReturn(fmt.Sprintf("Manuscript with name [ %s ] already exists. Please choose a different name.", manuscriptName), nil)
}
Expand Down Expand Up @@ -320,10 +325,53 @@ func checkDockerContainerExists(manuscriptName string) bool {
return false
}

func checkExistingManuscript(name string) error {
// Check if manuscript directory exists
msConfig, err := pkg.LoadConfig(manuscriptConfig)
if err != nil {
return fmt.Errorf("failed to load manuscript config: %w", err)
}

manuscriptPath := filepath.Join(msConfig.BaseDir, manuscriptBaseName, name)
if _, err := os.Stat(manuscriptPath); !os.IsNotExist(err) {
return fmt.Errorf("manuscript directory already exists at %s.", manuscriptPath)
}

// Check if manuscript containers are running
dockers, err := pkg.RunDockerPs()
if err != nil {
return fmt.Errorf("failed to check running containers: %w", err)
}

containerNames := []string{
fmt.Sprintf("%s-jobmanager-1", name),
fmt.Sprintf("%s-taskmanager-1", name),
fmt.Sprintf("%s-postgres-1", name),
fmt.Sprintf("%s-hasura-1", name),
}

for _, docker := range dockers {
for _, containerName := range containerNames {
if docker.Name == containerName {
return fmt.Errorf("manuscript containers for '%s' already exist. Please stop and remove them first", name)
}
}
}

// Check if manuscript is in config
for _, ms := range msConfig.Manuscripts {
if ms.Name == name {
return fmt.Errorf("manuscript '%s' already exists in configuration. \n Consider cleaning %s", name, manuscriptConfig)
}
}

return nil
}

func fetchChainBaseDatasets() ([]*client.ChainBaseDatasetListItem, error) {
var chains []*client.ChainBaseDatasetListItem
err := pkg.ExecuteStepWithLoading("Checking Datasets From Network", false, func() error {
c := client.NewChainBaseClient(networkChainURL)
c := client.NewChainBaseClient(networkChainURL, networkChainEndpoint)
var err error
chains, err = c.GetChainBaseDatasetList()
if err != nil {
Expand Down
145 changes: 113 additions & 32 deletions commands/jobs_manuscript.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"encoding/json"
"fmt"
"log"
"manuscript-core/client"
"manuscript-core/pkg"
"net/http"
"os"
"path/filepath"
"time"
)

Expand Down Expand Up @@ -40,48 +41,128 @@ func formatDurationToMinutes(durationMs int64) string {
return fmt.Sprintf("%d minutes", durationMinutes)
}

func ListJobs() {
func ListJobs(dir string) {
_ = pkg.ExecuteStepWithLoading("Checking jobs", false, func() error {
dockers, err := pkg.RunDockerPs()
// Step 1: Check for running Docker containers
dockers, err := getRunningContainers()
if err != nil {
log.Fatalf("Error: Failed to get docker ps: %v", err)
return err
}

// Always show if no containers are running
if len(dockers) == 0 {
fmt.Println("\r🟡 There are no jobs running...")
fmt.Println("\r📍 There are no jobs running...")
}

// Step 2: Get manuscripts based on source (config or directory)
manuscripts, err := getManuscripts(dir)
if err != nil {
return fmt.Errorf("failed to get manuscripts: %w", err)
}

if len(manuscripts) == 0 {
if dir != "" {
fmt.Printf("\r⚠️ No manuscript files found in %s\n", dir)
}
return nil
}

jobNumber := 0
manuscripts, err := pkg.LoadConfig(manuscriptConfig)
for _, m := range manuscripts.Manuscripts {
if m.Port != 0 {
c := client.NewFlinkUiClient(fmt.Sprintf("http://localhost:%d", m.Port))
jobs, err := c.GetJobsList()
// Step 3: Check and display state for each manuscript
displayManuscriptStates(manuscripts, dockers)
return nil
})
}

// getRunningContainers retrieves all running Docker containers
func getRunningContainers() ([]pkg.ContainerInfo, error) {
dockers, err := pkg.RunDockerPs()
if err != nil {
return nil, fmt.Errorf("failed to get docker processes: %w", err)
}
return dockers, nil
}

// getManuscripts retrieves manuscript configurations from either config file or directory
func getManuscripts(dir string) ([]pkg.Manuscript, error) {
if dir == "" {
return getManuscriptsFromConfig()
}
return getManuscriptsFromDirectory(dir)
}

// getManuscriptsFromConfig loads manuscripts from the config file
func getManuscriptsFromConfig() ([]pkg.Manuscript, error) {
config, err := pkg.LoadConfig(manuscriptConfig)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
}
return config.Manuscripts, nil
}

// getManuscriptsFromDirectory scans directory for manuscript.yaml files
func getManuscriptsFromDirectory(dir string) ([]pkg.Manuscript, error) {
// Check if directory exists
if _, err := os.Stat(dir); os.IsNotExist(err) {
return nil, fmt.Errorf("directory does not exist: %s", dir)
}

var manuscripts []pkg.Manuscript
// Read only the immediate subdirectories
entries, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("failed to read directory: %w", err)
}

for _, entry := range entries {
if entry.IsDir() {
// Check for manuscript.yaml in each immediate subdirectory
manuscriptPath := filepath.Join(dir, entry.Name(), "manuscript.yaml")
if _, err := os.Stat(manuscriptPath); err == nil {
manuscript, err := pkg.ParseYAML(manuscriptPath)
if err != nil {
fmt.Printf("\r🟡 Manuscript: \u001B[34m%s\u001B[0m | State: \033[33mInitializing...(may wait for 2 minutes)\033[0m\n", m.Name)
}
if len(jobs) == 0 && err == nil {
fmt.Printf("\r🟡 Manuscript: \u001B[34m%s\u001B[0m | State: \033[33mInitializing...\033[0m\n", m.Name)
}
for _, job := range jobs {
jobNumber++
startTime := formatTimestamp(job.StartTime)
duration := formatDurationToMinutes(job.Duration)

switch job.State {
case "RUNNING":
trackHasuraTable(m.Name)
fmt.Printf("\r🟢 %d: Manuscript: \033[32m%s\033[0m | State: \033[32m%s\033[0m | Start Time: %s | Duration: %v | GraphQL: http://127.0.0.1:%d\n", jobNumber, m.Name, job.State, startTime, duration, m.GraphQLPort)
case "CANCELED":
fmt.Printf("\r🟡 %d: Manuscript: %s | State: \033[33m%s\033[0m | Start Time: %s | Duration: %v\n", jobNumber, m.Name, job.State, startTime, duration)
default:
fmt.Printf("\r⚪️ %d: Manuscript: %s | State: %s | Start Time: %s | Duration: %v\n", jobNumber, m.Name, job.State, startTime, duration)
}
log.Printf("Warning: Failed to parse %s: %v", manuscriptPath, err)
continue
}
manuscripts = append(manuscripts, *manuscript)
}
}
return err
})
}

return manuscripts, nil
}

// displayManuscriptStates checks and displays the state of each manuscript
func displayManuscriptStates(manuscripts []pkg.Manuscript, dockers []pkg.ContainerInfo) {
for i, m := range manuscripts {
detector := pkg.NewStateDetector(&m, dockers)
state, err := detector.DetectState()
if err != nil {
log.Printf("Warning: Failed to detect state for %s: %v", m.Name, err)
state = pkg.StateUnknown
}

displayJobStatus(i+1, &m, state)
}
}

func displayJobStatus(jobNumber int, m *pkg.Manuscript, state pkg.ManuscriptState) {
switch state {
case pkg.StateRunning:
fmt.Printf("\r🟢 %d: Manuscript: \033[32m%s\033[0m | State: \033[32m%s\033[0m | GraphQL: http://127.0.0.1:%d\n",
jobNumber, m.Name, state, m.GraphQLPort)
case pkg.StateInitializing:
fmt.Printf("\r🟡 %d: Manuscript: \033[34m%s\033[0m | State: \033[33m%s\033[0m\n",
jobNumber, m.Name, state)
case pkg.StateFailed:
fmt.Printf("\r🔴 %d: Manuscript: %s | State: \033[31m%s\033[0m\n",
jobNumber, m.Name, state)
case pkg.StateStopped:
fmt.Printf("\r⚫ %d: Manuscript: %s | State: \033[90m%s\033[0m\n",
jobNumber, m.Name, state)
default:
fmt.Printf("\r⚪ %d: Manuscript: %s | State: %s\n",
jobNumber, m.Name, state)
}
}

func JobLogs(jobName string) {
Expand Down
Loading