Skip to content

Commit

Permalink
afwatch: job tasks window supports appending blocks&tasks
Browse files Browse the repository at this point in the history
References: #514.
  • Loading branch information
timurhai committed Aug 6, 2021
1 parent 75359a8 commit 8d0942b
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 34 deletions.
3 changes: 3 additions & 0 deletions afanasy/src/server/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,9 @@ bool Block::appendTasks(Action & i_action, const JSON & i_operation)
// Set new tasks ready
m_job->checkStatesOnAppend();

// Emit an event for monitors (afwatch ListTasks)
i_action.monitors->addBlock(af::Msg::TBlocks, m_data);

// Return new tasks ids:
i_action.answer = "{\"task_ids\":[";
for (int i = old_tasks_num; i < m_data->getTasksNum(); i++)
Expand Down
9 changes: 7 additions & 2 deletions afanasy/src/server/jobaf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1418,8 +1418,9 @@ af::Msg * JobAf::writeBlocks( std::vector<int32_t> i_block_ids, std::vector<std:
{
af::MCAfNodes mcblocks;
for( int b = 0; b < i_block_ids.size(); b++)
mcblocks.addNode( m_blocks_data[i_block_ids[b]]);
return new af::Msg( af::BlockData::DataModeFromString( i_modes[0]), &mcblocks);
mcblocks.addNode(m_blocks_data[i_block_ids[b]]);

return new af::Msg(af::BlockData::DataModeFromString(i_modes[0]), &mcblocks);
}

std::ostringstream str;
Expand Down Expand Up @@ -1696,6 +1697,9 @@ void JobAf::appendBlocks(Action & i_action, const JSON & i_operation)
m_blocks[b]->storeTasks();
m_blocks[b]->setUser( m_user);

// Emit an event for monitors (afwatch ListTasks)
i_action.monitors->addBlock(af::Msg::TBlocks, m_blocks[b]->m_data);

if (b != old_blocks_num)
i_action.answer += ",";
i_action.answer += af::itos(b);
Expand All @@ -1705,5 +1709,6 @@ void JobAf::appendBlocks(Action & i_action, const JSON & i_operation)
checkDepends();
checkStatesOnAppend();

// Emit an event for monitors (afwatch ListJobs)
i_action.monitors->addJobEvent(af::Monitor::EVT_jobs_change, getId(), getUid());
}
2 changes: 1 addition & 1 deletion afanasy/src/server/threadprocessjson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ af::Msg * threadProcessJSON( ThreadArgs * i_args, af::Msg * i_msg)

af::Msg * o_msg_response = NULL;

JSON & getObj = document["get"];
const JSON & getObj = document["get"];
if( getObj.IsObject())
{
std::string type, mode;
Expand Down
110 changes: 79 additions & 31 deletions afanasy/src/watch/listtasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,36 +80,61 @@ void ListTasks::construct(af::Job * i_job)
return;
}

m_tasks.resize(i_job->getBlocksNum());
int row = 0;

for (int b = 0; b < i_job->getBlocksNum(); b++)
{
const af::BlockData * block = i_job->getBlock(b);
ItemJobBlock * item_block = new ItemJobBlock(block, this);
m_blocks.append(item_block);

int tasks_num = block->getTasksNum();
item_block->tasksHidded = ((i_job->getBlocksNum() > 1) && (tasks_num > 1));
m_model->addItem(item_block);
appendBlock(i_job->getBlock(b));

row++;

for (int t = 0; t < tasks_num; t++)
// If the job has several blocks, we hide tasks if block more than one task
if (m_blocks.size() > 1)
{
int row = 0;
for (int b = 0; b < m_blocks.size(); b++)
{
ItemJobTask * item_task = new ItemJobTask(this, item_block, t, block);
m_model->addItem(item_task);
if(item_block->tasksHidded)
m_view->setRowHidden(row , true);

m_blocks[b]->tasksHidded = m_tasks[b].size() > 1;
row++;
m_tasks[b].append(item_task);

for (int t = 0; t < m_tasks[b].size(); t++)
{
if (m_blocks[b]->tasksHidded)
m_view->setRowHidden(row, true);
row++;
}
}
}

m_view->viewport()->show();
}

void ListTasks::appendBlock(af::BlockData * i_block)
{
ItemJobBlock * item_block = new ItemJobBlock(i_block, this);
m_blocks.append(item_block);
m_model->addItem(item_block);

appendTasks(i_block, item_block);
}

void ListTasks::appendTasks(af::BlockData * i_block, ItemJobBlock * i_item_block)
{
int start_task_num = 0;
if (m_tasks.size() <= i_block->getBlockNum())
{
// A block was just created and has no task items
m_tasks.resize(i_block->getBlockNum() + 1);
}
else
{
// We appending tasks to an existing block item
start_task_num = m_tasks[i_block->getBlockNum()].size();
}

for (int t = start_task_num; t < i_block->getTasksNum(); t++)
{
ItemJobTask * item_task = new ItemJobTask(this, i_item_block, t, i_block);
m_model->addItem(item_task);
m_tasks[i_block->getBlockNum()].append(item_task);
}
}

ListTasks::~ListTasks()
{
for (int i = 0; i < m_wndtasks.size(); i++)
Expand Down Expand Up @@ -318,7 +343,7 @@ void ListTasks::generateMenu(QMenu &o_menu, Item * i_item)

bool ListTasks::v_caseMessage(af::Msg * msg)
{
switch( msg->type())
switch (msg->type())
{
case af::Msg::TJob:
{
Expand Down Expand Up @@ -379,12 +404,37 @@ bool ListTasks::v_caseMessage(af::Msg * msg)
{
af::BlockData * block = static_cast<af::BlockData*>(mcblocks.getNode(b));
if (block->getJobId() != m_job_id)
{
AF_ERR << "ListTasks::v_caseMessage: block->getJobId() != m_job_id: "
<< block->getJobId() << "!=" << m_job_id;
continue;
}

int blocknum = block->getBlockNum();
if (blocknum >= m_blocks.size())
continue;
{
if (msg->type() == af::Msg::TBlocks)
{
// A new block(s) was appended to the job
appendBlock(block);
}
else
{
AF_ERR << "ListTasks::v_caseMessage: blocknum >= m_blocks.size(): "
<< blocknum << ">=" << m_blocks.size();
}
}
else
{
m_blocks[blocknum]->update(block, msg->type());

m_blocks[blocknum]->update(block, msg->type());
if ((msg->type() == af::Msg::TBlocks) &&
(block->getTasksNum() > m_tasks[blocknum].size()))
{
// New tasks were appended to block
appendTasks(block, m_blocks[blocknum]);
}
}

if (msg->type() == af::Msg::TBlocks)
m_model->emit_dataChanged();
Expand All @@ -407,7 +457,7 @@ bool ListTasks::v_caseMessage(af::Msg * msg)

bool ListTasks::v_processEvents( const af::MonitorEvents & i_me)
{
bool founded = false;
bool found = false;

if( i_me.m_tp.size())
{
Expand All @@ -418,15 +468,14 @@ bool ListTasks::v_processEvents( const af::MonitorEvents & i_me)

updateTasks( i_me.m_tp[j].blocks, i_me.m_tp[j].tasks, i_me.m_tp[j].tp);

founded = true;
found = true;

break;
}
}

if( i_me.m_bids.size())
{
std::vector<int32_t> job_ids;
std::vector<int32_t> block_ids;
std::vector<std::string> modes;

Expand All @@ -435,17 +484,16 @@ bool ListTasks::v_processEvents( const af::MonitorEvents & i_me)
if( i_me.m_bids[j].job_id != m_job_id )
continue;

job_ids.push_back( m_job_id);
block_ids.push_back( i_me.m_bids[j].block_num);

modes.push_back( af::BlockData::DataModeFromMsgType( i_me.m_bids[j].mode));
}

if( block_ids.size())
{
Watch::get( "jobs", job_ids, modes, block_ids);
Watch::get("jobs", std::vector<int>(1, m_job_id), modes, block_ids);

founded = true;
found = true;
}
}

Expand All @@ -455,14 +503,14 @@ bool ListTasks::v_processEvents( const af::MonitorEvents & i_me)
{
if( i_me.m_events[af::Monitor::EVT_jobs_del][i] == m_job_id )
{
founded = true;
found = true;
displayWarning( "The job does not exist any more.");
m_parentWindow->close();
break;
}
}

return founded;
return found;
}

int ListTasks::getRow(int i_block, int i_task)
Expand Down
2 changes: 2 additions & 0 deletions afanasy/src/watch/listtasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ private slots:

private:
void construct(af::Job * i_job);
void appendBlock(af::BlockData * i_block);
void appendTasks(af::BlockData * i_block, ItemJobBlock * i_item_block);

bool updateProgress(const af::JobProgress * i_job_progress);
bool updateTasks(
Expand Down

0 comments on commit 8d0942b

Please sign in to comment.