Skip to content

Commit

Permalink
PDG: Job first block control task removed as not needed.
Browse files Browse the repository at this point in the history
Now a new Afanasy job will be created on the first item onSchedule.

Job, block and task creation are in separate functions.

References #514.
  • Loading branch information
timurhai committed Jul 8, 2021
1 parent f7c1b2f commit 24cc075
Showing 1 changed file with 84 additions and 67 deletions.
151 changes: 84 additions & 67 deletions plugins/houdini/pdg/types/afanasyscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,56 @@ def _initData(self):
self.job_tasks_id_name = {}


def _constructJob(self):
job = af.Job(self['job_name'].evaluateString())

job.setBranch(self['job_branch'].evaluateString())
job.setDependMask(self['depend_mask'].evaluateString())
job.setDependMaskGlobal(self['depend_mask_global'].evaluateString())
job.setPriority(self['priority'].evaluateInt())
job.setMaxRunningTasks(self['max_runtasks'].evaluateInt())
job.setMaxRunTasksPerHost(self['maxperhost'].evaluateInt())
job.setHostsMask(self['hosts_mask'].evaluateString())
job.setHostsMaskExclude(self['hosts_mask_exclude'].evaluateString())

return job


def _constructBlock(self, work_item):
block = af.Block(work_item.node.name, 'hbatch_mantra')

return block


def _constructTask(self, work_item):
task = af.Task(work_item.name)
task.setCommand(self.expandCommandTokens(work_item.command, work_item))

# Set Environment Task Variables
task.setEnv('PDG_RESULT_SERVER', str(self.workItemResultServerAddr()))
task.setEnv('PDG_ITEM_NAME', str(work_item.name))
task.setEnv('PDG_DIR', str(self.workingDir(False)))
task.setEnv('PDG_TEMP', str(self.tempDir(False)))
task.setEnv('PDG_SHARED_TEMP', str(self.tempDir(False)))
task.setEnv('PDG_INDEX', str(work_item.index))
task.setEnv('PDG_INDEX4', "{:04d}".format(work_item.index))
task.setEnv('PDG_SCRIPTDIR', str(self.scriptDir(False)))

return task


def _sendJob(self, job):
try:
self.job_id = job.send()
self.job_id = self.job_id[1]['id']
except:
traceback.print_exc()
sys.stderr.flush()
self.job_id = None
return False
return True


def _deleteJob(self):
if self.job_id is not None:
af.Cmd().deleteJobById(self.job_id)
Expand Down Expand Up @@ -94,39 +144,6 @@ def onStartCook(self, static, cook_set):
if not self.isCallbackServerRunning():
self.startCallbackServer()

# Create Job
job = af.Job(self['job_name'].evaluateString())

# Job Parameters
job.setBranch(self['job_branch'].evaluateString())
job.setDependMask(self['depend_mask'].evaluateString())
job.setDependMaskGlobal(self['depend_mask_global'].evaluateString())
job.setPriority(self['priority'].evaluateInt())
job.setMaxRunningTasks(self['max_runtasks'].evaluateInt())
job.setMaxRunTasksPerHost(self['maxperhost'].evaluateInt())
job.setHostsMask(self['hosts_mask'].evaluateString())
job.setHostsMaskExclude(self['hosts_mask_exclude'].evaluateString())

# Create a block
block = af.Block('control', 'hbatch')
block.setCapacity(self['capacity'].evaluateInt())
block.setMaxRunningTasks(0)

# Control task, not supported, yet, empty for now
task = af.Task('control')
#task.setCommand('')
block.tasks.append(task)

job.blocks.append(block)

try:
self.job_id = job.send()
self.job_id = self.job_id[1]['id']
except Exception,err:
traceback.print_exc()
sys.stderr.flush()
raise RuntimeError('Error creating PDG job.')

return True


Expand Down Expand Up @@ -162,47 +179,46 @@ def onSchedule(self, work_item):

logger.debug('onSchedule input: {} {}'.format(work_item.node.name, work_item.name))

# Find a node block or create a new one
block = None
block_id = 0
if work_item.node.name in self.job_block_name_id:
block_id = self.job_block_name_id[work_item.node.name]
else:
block = af.Block(work_item.node.name, 'hbatch_mantra')

# Ensure directories exist and serialize the work item
self.createJobDirsAndSerializeWorkItems(work_item)

# Create Task:
task = af.Task(work_item.name)
task.setCommand(self.expandCommandTokens(work_item.command, work_item))
# Create a task:
task = self._constructTask(work_item)

# Set Environment Task Variables
task.setEnv('PDG_RESULT_SERVER', str(self.workItemResultServerAddr()))
task.setEnv('PDG_ITEM_NAME', str(work_item.name))
task.setEnv('PDG_DIR', str(self.workingDir(False)))
task.setEnv('PDG_TEMP', str(self.tempDir(False)))
task.setEnv('PDG_SHARED_TEMP', str(self.tempDir(False)))
task.setEnv('PDG_INDEX', str(work_item.index))
task.setEnv('PDG_INDEX4', "{:04d}".format(work_item.index))
task.setEnv('PDG_SCRIPTDIR', str(self.scriptDir(False)))

# Append Task to the job:
cmd = af.Cmd()
if block is not None:
# Check that a block for the current TOP node exists:
if not work_item.node.name in self.job_block_name_id:
# We need to append a new block to the job,
# as this is the first work item of a current TOP node.
block = self._constructBlock(work_item)
block.tasks.append(task)
struct = cmd.appendBlocks(self.job_id, [block])
if not 'block_ids' in struct['object']:
print("Error appending block:")
print(struct)
return pdg.scheduleResult.Failed
block_id = 0

if self.job_id is None:
# This is probably the first onSchedule call.
# The job was not created.
job = self._constructJob()
job.blocks.append(block)
if not self._sendJob(job):
return pdg.scheduleResult.CookFailed

else:
# Append a new block to the job
struct = af.Cmd().appendBlocks(self.job_id, [block])
if not 'block_ids' in struct['object']:
print("Error appending block:")
print(struct)
return pdg.scheduleResult.Failed

block_id = struct['object']['block_ids'][0]

block_id = struct['object']['block_ids'][0]
self.job_block_name_id[work_item.node.name] = block_id
self.job_tasks_id_name[block_id] = {}
self.job_tasks_id_name[block_id][0] = work_item.name

else:
struct = cmd.appendTasks(self.job_id, block_id, [task])
# Current TOP node block exists, we need just to append the task
block_id = self.job_block_name_id[work_item.node.name]
struct = af.Cmd().appendTasks(self.job_id, block_id, [task])
if not 'task_ids' in struct['object']:
print("Error appending task:")
print(struct)
Expand Down Expand Up @@ -240,11 +256,12 @@ def onTick(self):

# check that the job was created
if self.job_id is None:
return tickResult.SchedulerCancelCook
# OnTick can be called before onSchedule!
# Before onSchedule job is not created/sent
return pdg.tickResult.SchedulerReady

# get job progress
cmd = af.Cmd()
job_progress = cmd.getJobProgress(self.job_id)
job_progress = af.Cmd().getJobProgress(self.job_id)
if job_progress is None:
print('Error getting job progress.')
return pdg.tickResult.SchedulerBusy
Expand Down

0 comments on commit 24cc075

Please sign in to comment.