Skip to content

Commit

Permalink
PDG: Adjustment parameters added.
Browse files Browse the repository at this point in the history
Report work item fail on error.
Block on failed work items.
Use IP address as working item result server.
Tick Period and Max Items Per Tick.

References: #514.
  • Loading branch information
timurhai committed Jul 28, 2021
1 parent 7903f48 commit 0bbb2ae
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
Binary file modified plugins/houdini/otls/afanasyscheduler.hda
Binary file not shown.
43 changes: 24 additions & 19 deletions plugins/houdini/pdg/types/afanasyscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pdg; pdg.TypeRegistry.types().registeredType(pdg.registeredType.Scheduler, "afanasyscheduler").reload()
"""
import json
import logging
import os
import socket
import sys
Expand All @@ -18,8 +17,6 @@
import cgruconfig
import af

logging.basicConfig(level = logging.DEBUG)
logger = logging.getLogger(__name__)

class AfanasyScheduler(CallbackServerMixin, PyScheduler):
"""
Expand Down Expand Up @@ -52,6 +49,10 @@ def _initData(self):
self._local_addr = self._getLocalAddr()


def _log(self, i_msg):
print(self.topNode().name() + ': ' + str(i_msg))


def _getWorkItemServiceAndParser(self, work_item):
# Set the default service values
service = 'hbatch'
Expand Down Expand Up @@ -166,9 +167,11 @@ def _getLocalAddr(self):

def workItemResultServerAddr(self):
# By default it uses local host name.
addr = self._workItemResultServerAddr
# On farm better to use direct IP address.
addr, port = self._workItemResultServerAddr.split(':')
addr = ':'.join([self._local_addr, port])
if self['use_ip_address'].evaluateInt():
addr, port = self._workItemResultServerAddr.split(':')
addr = ':'.join([self._local_addr, port])
return addr

def applicationBin(self, i_app, i_work_item):
Expand All @@ -186,15 +189,15 @@ def onStart(self):
"""
[virtual] Called by PDG when scheduler is first created.
"""
logger.debug("onStart")
self._log('onStart')
return True


def onStop(self):
"""
[virtual] Called by PDG when scheduler is cleaned up.
"""
logger.debug("onStop")
self._log('onStop')
self.stopCallbackServer()
self._deleteJob()
return True
Expand All @@ -206,7 +209,7 @@ def onStartCook(self, static, cook_set):
[virtual] Cook start callback. Starts the job for the cook session.
"""
logger.debug("onStartCook")
self._log('onStartCook')
self._deleteJob()

pdg_workingdir = self["pdg_workingdir"].evaluateString()
Expand All @@ -230,7 +233,7 @@ def onStopCook(self, cancel):
- Strange, with a vary if cancel parameter this function is called in a two completely different cases.
- I think that it will be more clean to create 2 different functions onStopCook and onCancelCook.
"""
logger.debug("onStopCook: cancel = " + str(cancel))
self._log('onStopCook: cancel = ' + str(cancel))
self.stopCallbackServer()

if cancel:
Expand All @@ -248,7 +251,7 @@ def onSchedule(self, work_item):
if len(work_item.command) == 0:
return pdg.scheduleResult.CookSucceeded

logger.debug('onSchedule input: {} {}'.format(work_item.node.name, work_item.name))
self._log('onSchedule input: {} - {}'.format(work_item.node.name, work_item.name))

# Ensure directories exist and serialize the work item
self.createJobDirsAndSerializeWorkItems(work_item)
Expand Down Expand Up @@ -276,8 +279,7 @@ def onSchedule(self, work_item):
# Append a new block to the job
struct = af.Cmd().appendBlocks(self.job_id, [block])
if not 'block_ids' in struct['object']:
print("Error appending block:")
print(struct)
self._log('Error appending block:\n' + str(struct))
return pdg.scheduleResult.Failed

block_id = struct['object']['block_ids'][0]
Expand All @@ -291,8 +293,7 @@ def onSchedule(self, work_item):
block_id = self.job_block_name_id[work_item.node.name]
struct = af.Cmd().appendTasks(self.job_id, block_id, [task])
if not 'task_ids' in struct['object']:
print("Error appending task:")
print(struct)
self._log('Error appending task:\n' + str(struct))
return pdg.scheduleResult.Failed

task_id = struct['object']['task_ids'][0]
Expand All @@ -311,7 +312,7 @@ def onScheduleStatic(self, dependencies, dependents, ready_items):
- Not Supported
"""
logger.debug('onScheduleStatic:')
self._log('onScheduleStatic:')
print('Counts:')
print('len(dependencies) = %d' % len(dependencies))
print('len(dependents) = %d' % len(dependents))
Expand All @@ -334,7 +335,7 @@ def onTick(self):
# get job progress
job_progress = af.Cmd().getJobProgress(self.job_id)
if job_progress is None:
print('Error getting job progress.')
self._log('Error getting job progress.')
return pdg.tickResult.SchedulerBusy

job_progress = job_progress['progress']
Expand All @@ -351,8 +352,12 @@ def onTick(self):
self.onWorkItemStartCook(work_item_name, -1)

elif state.find('ERR') != -1:
self.onWorkItemFailed(work_item_name, -1)
ids_to_del[block_id].append(task_id)
if self['report_fail_on_error'].evaluateInt():
self.onWorkItemFailed(work_item_name, -1)
# If the graph is setup to block on failures, then
# we continue to track the task
if not self.isWaitForFailures:
ids_to_del[block_id].append(task_id)

elif state.find('SKP') != -1:
self.onWorkItemCanceled(work_item_name, -1)
Expand Down Expand Up @@ -383,7 +388,7 @@ def submitAsJob(self, graph_file, node_path):
graph_file Path to a .hip file containing the TOP Network, relative to $PDG_DIR.
node_path Op path to the TOP Network
"""
logger.debug("submitAsJob({},{})".format(graph_file, node_path))
self._log("submitAsJob({},{})".format(graph_file, node_path))

# Constuct a command for hython + topcook script
cmd = 'hython'
Expand Down

0 comments on commit 0bbb2ae

Please sign in to comment.