Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,4 @@ model_display_names.json

# DreamLayer logs and test logs
logs
test-logs
test-logsdream_layer_backend/matrix_runner_state.json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Fix malformed .gitignore entry; split into two lines

The entry concatenated two paths, so neither is ignored. Restore test-logs and add the state file on its own line.

Apply:

- test-logsdream_layer_backend/matrix_runner_state.json
+ test-logs
+ dream_layer_backend/matrix_runner_state.json

Then ensure the state file is untracked:


🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "Context around the malformed line:"
nl -ba .gitignore | sed -n '205,220p'
echo -e "\nIs the state file tracked?"
git ls-files -v dream_layer_backend/matrix_runner_state.json || true
echo -e "\nIf listed, untrack it:"
git rm --cached -f dream_layer_backend/matrix_runner_state.json || true

Length of output: 229


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Matched lines in .gitignore:"
grep -n 'test-logsdream_layer_backend' .gitignore || true

echo -e "\nContext around the malformed entry:"
grep -C2 'test-logsdream_layer_backend' .gitignore || true

echo -e "\nTracking status of the state file:"
git ls-files -v dream_layer_backend/matrix_runner_state.json || echo "Not tracked"

Length of output: 618


Fix malformed .gitignore entry for test logs and state file

The entry at line 213 in .gitignore currently reads as a single concatenated path and therefore ignores neither target. Split it into two separate lines:

 .gitignore
@@ -213,1 +213,2 @@
-test-logsdream_layer_backend/matrix_runner_state.json
+test-logs
+dream_layer_backend/matrix_runner_state.json

Then untrack the state file (it is currently tracked — git ls-files -v shows H dream_layer_backend/matrix_runner_state.json):

git rm --cached -f dream_layer_backend/matrix_runner_state.json
🤖 Prompt for AI Agents
In the .gitignore file at line 213, the entry combines two paths into one,
causing neither to be ignored. Split this line into two separate lines, one for
test-logsdream_layer_backend and one for matrix_runner_state.json. Then, to
untrack the currently tracked state file
dream_layer_backend/matrix_runner_state.json, run the command git rm --cached -f
dream_layer_backend/matrix_runner_state.json.

55 changes: 55 additions & 0 deletions dream_layer_backend/dream_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import json
import subprocess
from dream_layer_backend_utils.random_prompt_generator import fetch_positive_prompt, fetch_negative_prompt
from dream_layer_backend_utils.task_runner import MatrixRunner
from dream_layer_backend_utils.fetch_advanced_models import get_lora_models, get_settings, is_valid_directory, get_upscaler_models, get_controlnet_models

# Add ComfyUI directory to Python path
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
Expand Down Expand Up @@ -120,6 +122,10 @@ def import_comfyui_main():
}
})

# Persist state next to backend code so it survives restarts/page refresh
MATRIX_STATE_FILE = os.path.join(os.path.dirname(__file__), "matrix_runner_state.json")
runner = MatrixRunner(state_file=MATRIX_STATE_FILE)

COMFY_API_URL = "http://127.0.0.1:8188"

def get_available_models():
Expand Down Expand Up @@ -432,6 +438,55 @@ def send_to_img2img():
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500

@app.route('/api/matrix-runner/start', methods=['POST'])
def matrix_runner_start():
try:
data = request.json or {}
# accept lists/ranges for seeds, steps, samplers, etc.
# keep only list-valued keys for the sweep
param_dict = {k: v for k, v in data.items() if isinstance(v, list)}
runner.generate(param_dict) # deterministic expansion
return jsonify({"status":"success","total_jobs": len(runner.jobs)})
Comment on lines +445 to +449
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Only list-valued keys are used for job generation, which may ignore scalar parameters.

Scalar values from the API client are currently excluded from job generation. To ensure all parameters are considered, normalize scalars to single-item lists before processing.

Suggested change
# accept lists/ranges for seeds, steps, samplers, etc.
# keep only list-valued keys for the sweep
param_dict = {k: v for k, v in data.items() if isinstance(v, list)}
runner.generate(param_dict) # deterministic expansion
return jsonify({"status":"success","total_jobs": len(runner.jobs)})
# accept lists/ranges for seeds, steps, samplers, etc.
# normalize scalars to single-item lists for the sweep
param_dict = {k: v if isinstance(v, list) else [v] for k, v in data.items()}
runner.generate(param_dict) # deterministic expansion
return jsonify({"status":"success","total_jobs": len(runner.jobs)})

except Exception as e:
return jsonify({"status":"error","message": str(e)}), 500

Comment on lines +441 to +452
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate inputs and cap the sweep size to avoid OOM

Sanity-check list inputs and limit total jobs to a safe maximum; return 400 if exceeded.

 @app.route('/api/matrix-runner/start', methods=['POST'])
 def matrix_runner_start():
     try:
-        data = request.json or {}
+        data = request.json or {}
         # accept lists/ranges for seeds, steps, samplers, etc.
         # keep only list-valued keys for the sweep
-        param_dict = {k: v for k, v in data.items() if isinstance(v, list)}
-        runner.generate(param_dict)              # deterministic expansion
-        return jsonify({"status":"success","total_jobs": len(runner.jobs)})
+        param_dict = {k: v for k, v in data.items() if isinstance(v, list) and len(v) > 0}
+        if not param_dict:
+            return jsonify({"status":"error","message":"Provide at least one non-empty list parameter"}), 400
+        from math import prod
+        max_jobs = int(os.environ.get("MATRIX_MAX_JOBS", "10000"))
+        total = prod(len(v) for v in param_dict.values())
+        if total > max_jobs:
+            return jsonify({"status":"error","message": f"Grid too large ({total} > {max_jobs})"}), 400
+        runner.generate(param_dict)  # deterministic expansion
+        return jsonify({"status":"success","total_jobs": len(runner.jobs)})
     except Exception as e:
         return jsonify({"status":"error","message": str(e)}), 500

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer.py around lines 441 to 452, the
matrix_runner_start endpoint currently accepts list inputs without validation or
limits, which risks excessive job creation and potential OOM errors. Add input
validation to ensure list parameters are sane, calculate the total number of
jobs generated, and if it exceeds a predefined safe maximum, return a 400
response with an appropriate error message. This prevents resource exhaustion by
capping the sweep size before calling runner.generate.

@app.route('/api/matrix-runner/pause', methods=['POST'])
def matrix_runner_pause():
runner.pause()
return jsonify({"status":"paused"})

@app.route('/api/matrix-runner/resume', methods=['POST'])
def matrix_runner_resume():
runner.resume()
return jsonify({"status":"resumed"})

@app.route('/api/matrix-runner/status', methods=['GET'])
def matrix_runner_status():
jobs = runner.jobs
return jsonify({
"status":"ok",
"total_jobs": len(jobs),
"pending": sum(j["status"]=="pending" for j in jobs),
"running": sum(j["status"]=="running" for j in jobs),
"done": sum(j["status"]=="done" for j in jobs),
"paused": runner.paused
})

@app.route('/api/matrix-runner/next', methods=['POST'])
def matrix_runner_next():
"""Return the next job (and mark it running) so the executor can process it."""
job = runner.next_job()
if not job:
return jsonify({"status":"empty"})
Comment on lines +478 to +480
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): We've found these issues:

return jsonify({"status":"ok","job": job})

@app.route('/api/matrix-runner/complete', methods=['POST'])
def matrix_runner_complete():
data = request.json or {}
job_id = data.get("job_id")
runner.complete_job(job_id)
return jsonify({"status":"ok"})

Comment on lines +483 to +489
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate job_id and return 404 when not found

Currently None (or an invalid ID) silently succeeds. Also align with the updated complete_job bool return.

 @app.route('/api/matrix-runner/complete', methods=['POST'])
 def matrix_runner_complete():
-    data = request.json or {}
-    job_id = data.get("job_id")
-    runner.complete_job(job_id)
-    return jsonify({"status":"ok"})
+    data = request.json or {}
+    job_id = data.get("job_id")
+    if job_id is None:
+        return jsonify({"status":"error","message":"job_id is required"}), 400
+    ok = runner.complete_job(job_id)
+    if not ok:
+        return jsonify({"status":"error","message":f"job_id {job_id} not found"}), 404
+    return jsonify({"status":"ok"})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@app.route('/api/matrix-runner/complete', methods=['POST'])
def matrix_runner_complete():
data = request.json or {}
job_id = data.get("job_id")
runner.complete_job(job_id)
return jsonify({"status":"ok"})
@app.route('/api/matrix-runner/complete', methods=['POST'])
def matrix_runner_complete():
data = request.json or {}
job_id = data.get("job_id")
if job_id is None:
return jsonify({"status":"error","message":"job_id is required"}), 400
ok = runner.complete_job(job_id)
if not ok:
return jsonify({"status":"error","message":f"job_id {job_id} not found"}), 404
return jsonify({"status":"ok"})
🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer.py around lines 483 to 489, the
matrix_runner_complete function does not validate the job_id from the request
JSON and always returns a success status even if job_id is None or invalid.
Update the function to check if job_id is provided and call
runner.complete_job(job_id), which now returns a boolean indicating success. If
the job_id is missing or complete_job returns False, return a 404 response;
otherwise, return the success JSON response.

@app.route('/api/send-to-extras', methods=['POST', 'OPTIONS'])
def send_to_extras():
"""Send image to extras tab"""
Expand Down
89 changes: 89 additions & 0 deletions dream_layer_backend/dream_layer_backend_utils/task_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
# dream_layer_backend/task_runner.py
# Author: Natalia Rodriguez Figueroa
# Email: [email protected]
# Task 2: Implement a matrix/grid image generation workflow.
"""

import os
import json
import random
import logging
import itertools
from flask import Flask, request, jsonify
from flask_cors import CORS
from PIL import Image
import numpy as np

Comment on lines +9 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Remove unused and duplicate imports; keep this module framework-agnostic

Multiple imports are unused and some are duplicated, tripping Ruff (F401/F811) and coupling this utility to Flask/PIL/NumPy unnecessarily.

-import os
-import json
-import random
-import logging
-import itertools
-from flask import Flask, request, jsonify
-from flask_cors import CORS
-from PIL import Image
-import numpy as np
+import os
+import json
+import logging
+import itertools
@@
-import itertools
-import json
-import os

Optionally, use logger or remove import logging + logger = logging.getLogger(__name__) if not needed.

Also applies to: 21-24

🧰 Tools
🪛 Ruff (0.12.2)

11-11: random imported but unused

Remove unused import: random

(F401)


14-14: flask.Flask imported but unused

Remove unused import

(F401)


14-14: flask.request imported but unused

Remove unused import

(F401)


14-14: flask.jsonify imported but unused

Remove unused import

(F401)


15-15: flask_cors.CORS imported but unused

Remove unused import: flask_cors.CORS

(F401)


16-16: PIL.Image imported but unused

Remove unused import: PIL.Image

(F401)


17-17: numpy imported but unused

Remove unused import: numpy

(F401)

🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer_backend_utils/task_runner.py around lines 9
to 18 and 21 to 24, remove all unused imports including flask, flask_cors, PIL,
numpy, and any duplicates to keep the module framework-agnostic and avoid Ruff
warnings. Also, if logging is not used, remove the import logging statement and
any logger initialization. Only keep imports that are actually used in the code.

logger = logging.getLogger(__name__)

import itertools
import json
import os

class MatrixRunner:
def __init__(self, state_file="matrix_jobs.json"):
self.state_file = state_file
self.jobs = []
self.index = 0
self.paused = False
self.load()

# --- Core Job Management ---
Comment on lines +25 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Add thread-safety and atomic persistence to prevent race conditions and file corruption

Concurrent /next//complete calls can return duplicate jobs or corrupt the JSON (partial writes). Guard state with a lock and write atomically.

@@
-class MatrixRunner:
+import threading
+
+class MatrixRunner:
     def __init__(self, state_file="matrix_jobs.json"):
         self.state_file = state_file
         self.jobs = []
         self.index = 0
         self.paused = False
-        self.load()
+        self._lock = threading.RLock()
+        self.load()
@@
-    def generate(self, param_dict):
+    def generate(self, param_dict):
         """Create all jobs from parameter ranges and reset state"""
-        keys = list(param_dict.keys())
-        combos = list(itertools.product(*param_dict.values()))
-        self.jobs = [
-            {"id": i, **dict(zip(keys, combo)), "status": "pending"}
-            for i, combo in enumerate(combos)
-        ]
-        self.index = 0
-        self.paused = False
-        self.save()
+        keys = list(param_dict.keys())
+        combos = list(itertools.product(*param_dict.values()))
+        with self._lock:
+            self.jobs = [
+                {"id": i, **dict(zip(keys, combo)), "status": "pending"}
+                for i, combo in enumerate(combos)
+            ]
+            self.index = 0
+            self.paused = False
+            self.save()
@@
-    def next_job(self):
+    def next_job(self):
         """Get the next pending job and mark it as running"""
-        if self.paused:
-            return None
-        while self.index < len(self.jobs):
-            job = self.jobs[self.index]
-            self.index += 1
-            if job["status"] == "pending":
-                job["status"] = "running"
-                self.save()
-                return job
+        with self._lock:
+            if self.paused:
+                return None
+            while self.index < len(self.jobs):
+                job = self.jobs[self.index]
+                self.index += 1
+                if job.get("status") == "pending":
+                    job["status"] = "running"
+                    self.save()
+                    return job
         return None  # No more jobs
@@
-    def complete_job(self, job_id):
+    def complete_job(self, job_id) -> bool:
         """Mark job as done"""
-        for job in self.jobs:
-            if job["id"] == job_id:
-                job["status"] = "done"
-                break
-        self.save()
+        found = False
+        with self._lock:
+            for job in self.jobs:
+                if job.get("id") == job_id:
+                    job["status"] = "done"
+                    found = True
+                    break
+            self.save()
+        return found
@@
-    def pause(self):
-        self.paused = True
-        self.save()
+    def pause(self):
+        with self._lock:
+            self.paused = True
+            self.save()
@@
-    def resume(self):
-        self.paused = False
-        self.save()
+    def resume(self):
+        with self._lock:
+            self.paused = False
+            self.save()
@@
-    def save(self):
-        with open(self.state_file, "w") as f:
-            json.dump(
-                {"jobs": self.jobs, "index": self.index, "paused": self.paused}, f, indent=2
-            )
+    def save(self):
+        """Atomic write to avoid partial/corrupt files."""
+        state = {"jobs": self.jobs, "index": self.index, "paused": self.paused}
+        tmp = f"{self.state_file}.tmp"
+        with open(tmp, "w") as f:
+            json.dump(state, f, indent=2)
+        os.replace(tmp, self.state_file)
@@
-    def load(self):
-        if os.path.exists(self.state_file):
-            with open(self.state_file, "r") as f:
-                state = json.load(f)
-            self.jobs = state.get("jobs", [])
-            self.index = state.get("index", 0)
-            self.paused = state.get("paused", False)
+    def load(self):
+        if os.path.exists(self.state_file):
+            try:
+                with open(self.state_file, "r") as f:
+                    state = json.load(f)
+                self.jobs = state.get("jobs", [])
+                self.index = state.get("index", 0)
+                self.paused = state.get("paused", False)
+            except Exception as e:
+                logging.getLogger(__name__).warning("Failed to load state (%s); starting fresh", e)
+                self.jobs, self.index, self.paused = [], 0, False

Optional: convert any lingering "running" jobs back to "pending" on load to recover from worker crashes.

Also applies to: 46-57, 59-66, 68-75, 77-82, 83-89

🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer_backend_utils/task_runner.py around lines 25
to 33 and also lines 46-57, 59-66, 68-75, 77-82, and 83-89, the current
implementation lacks thread-safety and atomic file writes, which can cause race
conditions and JSON corruption during concurrent job state updates. To fix this,
add a threading lock to guard all accesses and modifications to the shared job
state and ensure that writes to the state_file are done atomically, for example
by writing to a temporary file and then renaming it. Additionally, modify the
load method to reset any jobs marked as "running" back to "pending" to recover
from worker crashes.

def generate(self, param_dict):
"""Create all jobs from parameter ranges and reset state"""
keys = list(param_dict.keys())
combos = list(itertools.product(*param_dict.values()))
self.jobs = [
{"id": i, **dict(zip(keys, combo)), "status": "pending"}
for i, combo in enumerate(combos)
]
self.index = 0
self.paused = False
self.save()

Comment on lines +34 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard against explosive cartesian products

A broad grid can easily generate millions of jobs, exhausting memory.

Add a cap (env-configurable) and reject requests exceeding it:

  • Pre-compute size via product of lengths, bail out if > MAX_JOBS.
  • Or stream generation instead of materializing the full list.
🤖 Prompt for AI Agents
In dream_layer_backend/dream_layer_backend_utils/task_runner.py around lines 34
to 45, the generate method creates all job combinations at once, which can cause
memory exhaustion for large parameter grids. To fix this, add an
environment-configurable MAX_JOBS limit, pre-compute the total number of
combinations by multiplying the lengths of each parameter list, and if this
exceeds MAX_JOBS, raise an exception or reject the request before generating
jobs. This prevents creating an excessively large list and guards against memory
issues.

def next_job(self):
"""Get the next pending job and mark it as running"""
if self.paused:
return None
while self.index < len(self.jobs):
job = self.jobs[self.index]
self.index += 1
if job["status"] == "pending":
job["status"] = "running"
self.save()
return job
return None # No more jobs

def complete_job(self, job_id):
"""Mark job as done"""
for job in self.jobs:
if job["id"] == job_id:
job["status"] = "done"
break
self.save()
Comment on lines +59 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: complete_job does not handle the case where job_id is not found.

Currently, if job_id is missing, no feedback is given. Adding a warning or exception would improve error visibility.

Suggested change
def complete_job(self, job_id):
"""Mark job as done"""
for job in self.jobs:
if job["id"] == job_id:
job["status"] = "done"
break
self.save()
def complete_job(self, job_id):
"""Mark job as done. Raises ValueError if job_id not found."""
found = False
for job in self.jobs:
if job["id"] == job_id:
job["status"] = "done"
found = True
break
if not found:
raise ValueError(f"Job with id {job_id} not found.")
self.save()


# --- Pause / Resume ---
def pause(self):
self.paused = True
self.save()

def resume(self):
self.paused = False
self.save()

# --- Persistence ---
def save(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: save does not handle file write errors.

Wrap the file write in a try/except block and log errors to prevent process crashes.

Suggested implementation:

    def save(self):
        try:
            with open(self.state_file, "w") as f:
                json.dump(
                    {"jobs": self.jobs, "index": self.index, "paused": self.paused}, f, indent=2
                )
        except Exception as e:
            import logging
            logging.error(f"Failed to save state to {self.state_file}: {e}")

If logging is already imported at the top of the file, you can remove the import logging line inside the except block.

with open(self.state_file, "w") as f:
json.dump(
{"jobs": self.jobs, "index": self.index, "paused": self.paused}, f, indent=2
)

def load(self):
if os.path.exists(self.state_file):
with open(self.state_file, "r") as f:
state = json.load(f)
self.jobs = state.get("jobs", [])
self.index = state.get("index", 0)
self.paused = state.get("paused", False)
48 changes: 48 additions & 0 deletions dream_layer_backend/matrix_runner_state.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"jobs": [
{
"id": 0,
"seeds": 1,
"steps": 10,
"Samplers": "euler",
"status": "done"
},
{
"id": 1,
"seeds": 1,
"steps": 20,
"Samplers": "euler",
"status": "running"
},
{
"id": 2,
"seeds": 2,
"steps": 10,
"Samplers": "euler",
"status": "pending"
},
{
"id": 3,
"seeds": 2,
"steps": 20,
"Samplers": "euler",
"status": "pending"
},
{
"id": 4,
"seeds": 3,
"steps": 10,
"Samplers": "euler",
"status": "pending"
},
{
"id": 5,
"seeds": 3,
"steps": 20,
"Samplers": "euler",
"status": "pending"
}
],
"index": 2,
"paused": false
}
Comment on lines +1 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Do not commit runtime state; remove file and rely on persistence via .gitignore

This JSON is mutable, environment-specific state. Keep it out of VCS to avoid drift, merge conflicts, and accidental resets. Also, key naming is inconsistent (Samplers vs likely samplers).

Recommended actions:

  • Delete from repo and ignore it (see .gitignore fix above).
  • Regenerate at runtime; initialize empty state when absent.
  • Normalize keys to snake_case if you need a seed file in examples.
🤖 Prompt for AI Agents
In dream_layer_backend/matrix_runner_state.json lines 1 to 48, this file
contains mutable runtime state that should not be committed to version control
to prevent conflicts and accidental resets. Remove this file from the repository
and add it to .gitignore to avoid tracking it. Ensure the application
regenerates this state at runtime and initializes it as empty if the file is
missing. Also, if a seed file is needed for examples, normalize all keys to
snake_case, changing "Samplers" to "samplers" for consistency.