Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion DDFacet/Data/ClassVisServer.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,8 @@ def _handler_LoadVisChunk(self, dictname, iMS, iChunk):
print(ModColor.Str("This chunk is all flagged or has zero weight."), file=log)
return

if DATA["sort_index"] is not None: # and DATA["Weights"] is not 1: # OMS 2023/12 they're not "1" ever and this seems a bug
if DATA["sort_index"] is not None and isinstance(DATA["Weights"],np.ndarray):
# a value of '1' is used as a flag by GetVisWeights, but we can't compare with 1 because the weights could be and generally will be an array...
DATA["Weights"] = DATA["Weights"][DATA["sort_index"]]

self.computeBDAInBackground(dictname, ms, DATA,
Expand Down
71 changes: 49 additions & 22 deletions DDFacet/Other/AsyncProcessPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ def __init__ (self):
self._events = {}
self._results_map = {}
self._job_counters = JobCounterPool()
# record these on first call
self._cpucount = psutil.cpu_count()
self._process = psutil.Process()
self.inherited_affinity = self._process.cpu_affinity()
self.available_cores = len(self.inherited_affinity)

def __del__(self):
self.shutdown()
Expand All @@ -185,27 +190,43 @@ def init(self, ncpu=None, affinity=None, parent_affinity=0, num_io_processes=1,
Returns:

"""
# Take 0 to disable affinity
if affinity == 0:
print(ModColor.Str("Affinity 0 requested, interpreting as disable"),file=log)
affinity = None

self.affinity = affinity
self.verbose = verbose

self.pause_on_start = pause_on_start

if isinstance(self.affinity, int):
self.cpustep = abs(self.affinity) or 1
maxcpu = psutil.cpu_count() // self.cpustep
self.ncpu = ncpu or maxcpu
# check whether we are in an environment (e.g. Slurm) where affinity is
# already set. If we are, ignore arguments and DTRT
if self.available_cores < self._cpucount:
print(ModColor.Str("Warning: inherited affinity is for %d CPUs out of %d only" % (self.available_cores,self._cpucount)), file=log)
if ncpu and ncpu>self.available_cores:
raise RuntimeError("NCPU requested is %d but only %d threads are available" % (ncpu,self.available_cores))
self.ncpu = ncpu or self.available_cores
maxcpu = self.ncpu
self.affinity = self.inherited_affinity[:self.ncpu]
parent_affinity = self.inherited_affinity[0]
else:
# Assume we have all of the machine available
self.cpustep = abs(self.affinity) or 1
maxcpu = self._cpucount // self.cpustep
self.ncpu = ncpu or maxcpu
self.parent_affinity = parent_affinity
elif isinstance(self.affinity, list):
if any(map(lambda x: x < 0, self.affinity)):
raise RuntimeError("Affinities must be list of positive numbers")
if psutil.cpu_count() < max(self.affinity):
raise RuntimeError("There are %d virtual threads on this system. Some elements of the affinity map are "
"higher than this. Check parset." % psutil.cpu_count())
self.ncpu = ncpu or len(self.affinity)
if self.ncpu != len(self.affinity):
print(ModColor.Str("Warning: NCPU does not match affinity list length. Falling back to "
"NCPU=%d" % len(self.affinity)), file=log)
self.ncpu = self.ncpu if self.ncpu == len(self.affinity) else len(self.affinity)
if set(self.affinity)<=set(self.inherited_affinity):
if self.ncpu != len(self.affinity):
print(ModColor.Str("Warning: NCPU does not match affinity list length. Falling back to "
"NCPU=%d" % len(self.affinity)), file=log)
self.ncpu=len(self.affinity)
else:
raise RuntimeError("Requested affinity %s is not a subset of available affinity %s" % (str(self.affinity),str(self.inherited_affinity)))
maxcpu = max(self.affinity) + 1 # zero indexed list
self.parent_affinity = parent_affinity
elif isinstance(self.affinity, str) and str(self.affinity) == "enable_ht":
Expand Down Expand Up @@ -260,20 +281,22 @@ def init(self, ncpu=None, affinity=None, parent_affinity=0, num_io_processes=1,
self.parent_affinity = 0 # none unused (HT is probably disabled BIOS level)
else:
self.parent_affinity = unused[0] # grab the first unused vthread
elif isinstance(self.affinity, str) and str(self.affinity) == "disable":
elif self.affinity is None or isinstance(self.affinity, str) and str(self.affinity) == "disable":
self.affinity = None
self.parent_affinity = None
self.cpustep = 1
maxcpu = psutil.cpu_count()
maxcpu = len(self.inherited_affinity)
self.ncpu = ncpu or maxcpu
else:
raise RuntimeError("Invalid option for Parallel.Affinity. Expected cpu step (int), list, "
"'enable_ht', 'disable_ht', 'disable'")
if self.parent_affinity is None:
print("Parent and I/O affinities not specified, leaving unset", file=log)
else:
if parent_affinity not in self.inherited_affinity:
raise RuntimeError("Parent affinity requested (%d) is not in available list of cores" % parent_affinity)
print(ModColor.Str("Fixing parent process to vthread %d" % self.parent_affinity, col="green"), file=log)
psutil.Process().cpu_affinity(range(self.ncpu) if not self.parent_affinity else [self.parent_affinity])
#self._process.cpu_affinity([self.parent_affinity])

# if NCPU is 0, set to number of CPUs on system
if not self.ncpu:
Expand Down Expand Up @@ -803,13 +826,13 @@ def shutdown(self):
print("shutdown complete", file=log)

@staticmethod
def _start_worker (object, proc_id, affinity, worker_queue, pause_on_start=False):
def _start_worker (self, proc_id, affinity, worker_queue, pause_on_start=False):
"""
Helper method for worker process startup. ets up affinity, and calls _run_worker method on
object with the specified work queue.
Helper method for worker process startup. sets up affinity, and calls _run_worker method on
self with the specified work queue.

Args:
object:
self:
proc_id:
affinity:
work_queue:
Expand All @@ -824,10 +847,14 @@ def _start_worker (object, proc_id, affinity, worker_queue, pause_on_start=False
_pyArrays.pySetOMPDynamicNumThreads(1)
AsyncProcessPool.proc_id = proc_id
logger.subprocess_id = proc_id
if affinity:
if affinity and self.affinity: # shouldn't mess with affinity if it was disabled
if self.verbose:
print(ModColor.Str("setting worker pid %d affinity to %s"% (os.getpid(),str(affinity))), file=log)
psutil.Process().cpu_affinity(affinity)
object._run_worker(worker_queue)
if object.verbose:
if self.verbose:
print(ModColor.Str("worker pid %d affinity is now %s"% (os.getpid(),str(psutil.Process().cpu_affinity()))), file=log)
self._run_worker(worker_queue)
if self.verbose:
print(ModColor.Str("exiting worker pid %d"%os.getpid()), file=log)


Expand Down Expand Up @@ -931,7 +958,7 @@ def _init_default():
global APP
if APP is None:
APP = AsyncProcessPool()
APP.init(psutil.cpu_count(), affinity=0, num_io_processes=1, verbose=0)
APP.init(len(APP.inherited_affinity), affinity=0, num_io_processes=1, verbose=0)

_init_default()

Expand Down
30 changes: 14 additions & 16 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

(Users / Recommended - Docker based) Run via. Stimela >= 0.2.9
==========================================================
===============================================================
We recommend running the imaging package through the Stimela framework <https://github.com/SpheMakh/Stimela>, built on a
widely supported containerization framework, called Docker. This package is on PiPY and and is purely python-based, requiring no dependencies other than Docker. It gives the user instantanious access to other commonly used packages such as Meqtrees, CASA, etc.

Expand Down Expand Up @@ -107,12 +107,12 @@ in an empty new virtual environment.**
deactivate

(Users/Optional) FitsBeam, Montblanc, Moresane, Killms support
==========================================================
================================================================
Optional requirements like the FITS beam can be installed by specifying them in brackets.
E.g. pip install "/src/DDFacet/[dft-support,moresane-support,testing-requirements,fits-beam-support,kms-support]"

(Users/Troubleshooting) Configure max shared memory
==========================================================
================================================================
Running DDFacet on large images requires a lot of shared memory. Most systems limit the amount of shared memory to about 10%. To increase this limit add the following line to your ``/etc/default/tmpfs`` file::

SHM_SIZE=100%
Expand All @@ -137,21 +137,20 @@ cmake.define = {ENABLE_NATIVE_TUNING = "ON", ENABLE_FAST_MATH = "ON", ENABLE_PYT

(Developers/Recommended): setting up your dev environment
==========================================================
**NOTE:Setup your virtual environment just as specified in the user section above. Ensure you activate!
WARNING: you may encounter issues if you have previously installed numpy in the environment - build isolation
will fail if the numpy you have installed is older than the build system pulls during isolation.
You may need to use --no-build-isolation when installing or, even better, ensure that you install DDF and KillMS
in an empty new virtual environment.
**
**NOTE:Setup your virtual environment just as specified in the user section above. Ensure you activate!**
**WARNING: you may encounter issues if you have previously installed numpy in the environment - build isolation**
**will fail if the numpy you have installed is older than the build system pulls during isolation.**
**You may need to use --no-build-isolation when installing or, even better, ensure that you install DDF and KillMS**
**in an empty new virtual environment.**

To setup your local development environment navigate clone DDFacet and run::

(ddfvenv) $ git clone https://github.com/cyriltasse/DDFacet
(ddfvenv) $ pip install -e DDFacet/

**IMPORTANT NOTE: You may need to remove the development version before running PIP when installing. If you
are switching between release and debug versions of the backend -- or recompiling in a different configuration --
you should remove the DDFacet/DDFacet/cbuild directory and everything in it**
**IMPORTANT NOTE: You may need to remove the development version before running PIP when installing. If you**
**are switching between release and debug versions of the backend -- or recompiling in a different configuration -- **
**you should remove the DDFacet/DDFacet/cbuild directory and everything in it**

Note that Python3.8 support is deprecated and editable installation is only tested to work on Python 3.10.

Expand All @@ -170,13 +169,12 @@ Note that Python3.8 support is deprecated and editable installation is only test
**Important: if you ran ``git submodule update --init --recursive`` before you may need to remove the cached SkyModel before building the docker image with ``git rm --cached SkyModel``**

(Developers/Debugging) Build a few libraries (by hand with custom flags)
==========================================================
=========================================================================
You can build against custom versions of libraries such is libPython and custom numpy versions.
To do this modify pyproject.toml. Find and modify the following lines::
```
cmake.build-type = "ReleaseWithDebugSymbols" # can be set to Debug e.g.
cmake.define = {ENABLE_NATIVE_TUNING = "OFF", ENABLE_FAST_MATH = "ON", ENABLE_PYTHON_2 = "OFF", ENABLE_PYTHON_3 = "ON"} # can be tuned to enable processor
# specific marching
cmake.define = {ENABLE_NATIVE_TUNING = "OFF", ENABLE_FAST_MATH = "ON", ENABLE_PYTHON_2 = "OFF", ENABLE_PYTHON_3 = "ON"} # can be tuned to enable processor specific marching
```
You can also specify path settings for other libraries if you have custom built, e.g. numpy through these ```cmake.define```

Expand All @@ -190,7 +188,7 @@ Add this to your ``.bashrc``::
export DDFACET_TEST_OUTPUT_DIR=[folder where you want the acceptance test output to be dumped]

To test your branch against the master branch using Jenkins
---------------------------------------------------------
------------------------------------------------------------
Most of the core use cases will in the nearby future have reference images and an automated acceptance test.

Please **do not** commit against cyriltasse/master. The correct strategy is to branch/fork and do a pull request on Github
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ authors = [{ name = "Cyril Tasse", email = "[email protected]" }]
maintainers = [{ name = "Benjamin Hugo", email = "[email protected]" }]
urls = { Homepage = "http://github.com/saopicc/DDFacet"}
classifiers = [
"Development Status :: 5 - Production / Stable",
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: GNU General Public License v2 (GPLv2)",
Expand Down Expand Up @@ -80,7 +80,7 @@ classifiers = [
dft-support = ["montblanc >= 0.6.1, <= 0.7.3.1; python_version >= '3.8' and python_version < '3.11'"]
moresane-support = ["pymoresane >= 0.3.0; python_version >= '3.8' and python_version < '3.11'"]
fits-beam-support = ["meqtrees-cattery <= 1.7.9; python_version >= '3.8' and python_version < '3.11'"]
kms-support = ["bdsf > 1.8.15,<=1.10.1; python_version >= '3.8' and python_version < '3.11'"]
kms-support = ["bdsf > 1.8.15; python_version >= '3.8' and python_version < '3.11'"]
alternate-data-backends = ["dask-ms[xarray]<=0.2.20; python_version >= '3.8' and python_version < '3.11'",
"xarray<=2023.12.0; python_version >= '3.8' and python_version < '3.11'"]
testing-requirements = ["nose >= 1.3.7; python_version >= '3' and python_version < '3.9'",
Expand Down