Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX: Restore generate_gantt_chart functionality #3290

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .zenodo.json
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,11 @@
{
"name": "Tambini, Arielle"
},
{
"affiliation": "Child Mind Institute",
"name": "Clucas, Jon",
"orcid": "0000-0001-7590-5806"
},
{
"affiliation": "Weill Cornell Medicine",
"name": "Xie, Xihe",
Expand Down
42 changes: 42 additions & 0 deletions nipype/pipeline/plugins/tests/test_callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import nipype.interfaces.utility as niu
import nipype.pipeline.engine as pe

try:
import pandas

has_pandas = True
except ImportError:
has_pandas = False


def func():
return
Expand Down Expand Up @@ -61,3 +68,38 @@ def test_callback_exception(tmpdir, plugin, stop_on_first_crash):

sleep(0.5) # Wait for callback to be called (python 2.7)
assert so.statuses == [("f_node", "start"), ("f_node", "exception")]


@pytest.mark.parametrize("plugin", ["Linear", "MultiProc", "LegacyMultiProc"])
shnizzedy marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.skipif(not has_pandas, reason="Test requires pandas")
def test_callback_gantt(tmpdir, plugin):
import logging

from os import path

from nipype.utils.profiler import log_nodes_cb
from nipype.utils.draw_gantt_chart import generate_gantt_chart

log_filename = path.join(tmpdir, "callback.log")
logger = logging.getLogger("callback")
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(log_filename)
logger.addHandler(handler)

# create workflow
wf = pe.Workflow(name="test", base_dir=tmpdir.strpath)
f_node = pe.Node(
niu.Function(function=func, input_names=[], output_names=[]), name="f_node"
)
wf.add_nodes([f_node])
wf.config["execution"] = {"crashdump_dir": wf.base_dir, "poll_sleep_duration": 2}

plugin_args = {"status_callback": log_nodes_cb}
if plugin != "Linear":
plugin_args["n_procs"] = 8
wf.run(plugin=plugin, plugin_args=plugin_args)

generate_gantt_chart(
path.join(tmpdir, "callback.log"), 1 if plugin == "Linear" else 8
)
assert path.exists(path.join(tmpdir, "callback.log.html"))
38 changes: 31 additions & 7 deletions nipype/utils/draw_gantt_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import simplejson as json

from collections import OrderedDict
from warnings import warn

# Pandas
try:
Expand Down Expand Up @@ -69,9 +70,9 @@ def create_event_dict(start_time, nodes_list):
finish_delta = (node["finish"] - start_time).total_seconds()

# Populate dictionary
if events.get(start_delta) or events.get(finish_delta):
if events.get(start_delta):
err_msg = "Event logged twice or events started at exact same time!"
raise KeyError(err_msg)
warn(err_msg, category=Warning)
events[start_delta] = start_node
events[finish_delta] = finish_node

Expand Down Expand Up @@ -139,12 +140,18 @@ def calculate_resource_timeseries(events, resource):
# Iterate through the events
for _, event in sorted(events.items()):
if event["event"] == "start":
if resource in event and event[resource] != "Unknown":
all_res += float(event[resource])
if resource in event:
try:
all_res += float(event[resource])
except ValueError:
continue
current_time = event["start"]
elif event["event"] == "finish":
if resource in event and event[resource] != "Unknown":
all_res -= float(event[resource])
if resource in event:
try:
all_res -= float(event[resource])
except ValueError:
continue
current_time = event["finish"]
res[current_time] = all_res

Expand Down Expand Up @@ -292,7 +299,7 @@ def draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, co
"offset": offset,
"scale_duration": scale_duration,
"color": color,
"node_name": node["name"],
"node_name": node.get("name", node.get("id", "")),
"node_dur": node["duration"] / 60.0,
"node_start": node_start.strftime("%Y-%m-%d %H:%M:%S"),
"node_finish": node_finish.strftime("%Y-%m-%d %H:%M:%S"),
Expand Down Expand Up @@ -513,6 +520,23 @@ def generate_gantt_chart(
# Read in json-log to get list of node dicts
nodes_list = log_to_dict(logfile)

# Only include nodes with timing information, and covert timestamps
# from strings to datetimes
nodes_list = [
{
k: datetime.datetime.strptime(i[k], "%Y-%m-%dT%H:%M:%S.%f")
if k in {"start", "finish"}
else i[k]
for k in i
}
for i in nodes_list
if "start" in i and "finish" in i
]

for node in nodes_list:
if "duration" not in node:
node["duration"] = (node["finish"] - node["start"]).total_seconds()

# Create the header of the report with useful information
start_node = nodes_list[0]
last_node = nodes_list[-1]
Expand Down