diff --git a/DDFacet/Data/ClassVisServer.py b/DDFacet/Data/ClassVisServer.py index d7035403..ceb71fb5 100644 --- a/DDFacet/Data/ClassVisServer.py +++ b/DDFacet/Data/ClassVisServer.py @@ -593,7 +593,8 @@ def _handler_LoadVisChunk(self, dictname, iMS, iChunk): print(ModColor.Str("This chunk is all flagged or has zero weight."), file=log) return - if DATA["sort_index"] is not None: # and DATA["Weights"] is not 1: # OMS 2023/12 they're not "1" ever and this seems a bug + if DATA["sort_index"] is not None and isinstance(DATA["Weights"],np.ndarray): + # a value of '1' is used as a flag by GetVisWeights, but we can't compare with 1 because the weights could be and generally will be an array... DATA["Weights"] = DATA["Weights"][DATA["sort_index"]] self.computeBDAInBackground(dictname, ms, DATA, diff --git a/DDFacet/Other/AsyncProcessPool.py b/DDFacet/Other/AsyncProcessPool.py index 3876a39f..5bde4c41 100644 --- a/DDFacet/Other/AsyncProcessPool.py +++ b/DDFacet/Other/AsyncProcessPool.py @@ -166,6 +166,11 @@ def __init__ (self): self._events = {} self._results_map = {} self._job_counters = JobCounterPool() + # record these on first call + self._cpucount = psutil.cpu_count() + self._process = psutil.Process() + self.inherited_affinity = self._process.cpu_affinity() + self.available_cores = len(self.inherited_affinity) def __del__(self): self.shutdown() @@ -185,27 +190,43 @@ def init(self, ncpu=None, affinity=None, parent_affinity=0, num_io_processes=1, Returns: """ + # Take 0 to disable affinity + if affinity == 0: + print(ModColor.Str("Affinity 0 requested, interpreting as disable"),file=log) + affinity = None + self.affinity = affinity self.verbose = verbose self.pause_on_start = pause_on_start if isinstance(self.affinity, int): - self.cpustep = abs(self.affinity) or 1 - maxcpu = psutil.cpu_count() // self.cpustep - self.ncpu = ncpu or maxcpu + # check whether we are in an environment (e.g. Slurm) where affinity is + # already set. If we are, ignore arguments and DTRT + if self.available_cores < self._cpucount: + print(ModColor.Str("Warning: inherited affinity is for %d CPUs out of %d only" % (self.available_cores,self._cpucount)), file=log) + if ncpu and ncpu>self.available_cores: + raise RuntimeError("NCPU requested is %d but only %d threads are available" % (ncpu,self.available_cores)) + self.ncpu = ncpu or self.available_cores + maxcpu = self.ncpu + self.affinity = self.inherited_affinity[:self.ncpu] + parent_affinity = self.inherited_affinity[0] + else: + # Assume we have all of the machine available + self.cpustep = abs(self.affinity) or 1 + maxcpu = self._cpucount // self.cpustep + self.ncpu = ncpu or maxcpu self.parent_affinity = parent_affinity elif isinstance(self.affinity, list): if any(map(lambda x: x < 0, self.affinity)): raise RuntimeError("Affinities must be list of positive numbers") - if psutil.cpu_count() < max(self.affinity): - raise RuntimeError("There are %d virtual threads on this system. Some elements of the affinity map are " - "higher than this. Check parset." % psutil.cpu_count()) - self.ncpu = ncpu or len(self.affinity) - if self.ncpu != len(self.affinity): - print(ModColor.Str("Warning: NCPU does not match affinity list length. Falling back to " - "NCPU=%d" % len(self.affinity)), file=log) - self.ncpu = self.ncpu if self.ncpu == len(self.affinity) else len(self.affinity) + if set(self.affinity)<=set(self.inherited_affinity): + if self.ncpu != len(self.affinity): + print(ModColor.Str("Warning: NCPU does not match affinity list length. Falling back to " + "NCPU=%d" % len(self.affinity)), file=log) + self.ncpu=len(self.affinity) + else: + raise RuntimeError("Requested affinity %s is not a subset of available affinity %s" % (str(self.affinity),str(self.inherited_affinity))) maxcpu = max(self.affinity) + 1 # zero indexed list self.parent_affinity = parent_affinity elif isinstance(self.affinity, str) and str(self.affinity) == "enable_ht": @@ -260,11 +281,11 @@ def init(self, ncpu=None, affinity=None, parent_affinity=0, num_io_processes=1, self.parent_affinity = 0 # none unused (HT is probably disabled BIOS level) else: self.parent_affinity = unused[0] # grab the first unused vthread - elif isinstance(self.affinity, str) and str(self.affinity) == "disable": + elif self.affinity is None or isinstance(self.affinity, str) and str(self.affinity) == "disable": self.affinity = None self.parent_affinity = None self.cpustep = 1 - maxcpu = psutil.cpu_count() + maxcpu = len(self.inherited_affinity) self.ncpu = ncpu or maxcpu else: raise RuntimeError("Invalid option for Parallel.Affinity. Expected cpu step (int), list, " @@ -272,8 +293,10 @@ def init(self, ncpu=None, affinity=None, parent_affinity=0, num_io_processes=1, if self.parent_affinity is None: print("Parent and I/O affinities not specified, leaving unset", file=log) else: + if parent_affinity not in self.inherited_affinity: + raise RuntimeError("Parent affinity requested (%d) is not in available list of cores" % parent_affinity) print(ModColor.Str("Fixing parent process to vthread %d" % self.parent_affinity, col="green"), file=log) - psutil.Process().cpu_affinity(range(self.ncpu) if not self.parent_affinity else [self.parent_affinity]) + #self._process.cpu_affinity([self.parent_affinity]) # if NCPU is 0, set to number of CPUs on system if not self.ncpu: @@ -803,13 +826,13 @@ def shutdown(self): print("shutdown complete", file=log) @staticmethod - def _start_worker (object, proc_id, affinity, worker_queue, pause_on_start=False): + def _start_worker (self, proc_id, affinity, worker_queue, pause_on_start=False): """ - Helper method for worker process startup. ets up affinity, and calls _run_worker method on - object with the specified work queue. + Helper method for worker process startup. sets up affinity, and calls _run_worker method on + self with the specified work queue. Args: - object: + self: proc_id: affinity: work_queue: @@ -824,10 +847,14 @@ def _start_worker (object, proc_id, affinity, worker_queue, pause_on_start=False _pyArrays.pySetOMPDynamicNumThreads(1) AsyncProcessPool.proc_id = proc_id logger.subprocess_id = proc_id - if affinity: + if affinity and self.affinity: # shouldn't mess with affinity if it was disabled + if self.verbose: + print(ModColor.Str("setting worker pid %d affinity to %s"% (os.getpid(),str(affinity))), file=log) psutil.Process().cpu_affinity(affinity) - object._run_worker(worker_queue) - if object.verbose: + if self.verbose: + print(ModColor.Str("worker pid %d affinity is now %s"% (os.getpid(),str(psutil.Process().cpu_affinity()))), file=log) + self._run_worker(worker_queue) + if self.verbose: print(ModColor.Str("exiting worker pid %d"%os.getpid()), file=log) @@ -931,7 +958,7 @@ def _init_default(): global APP if APP is None: APP = AsyncProcessPool() - APP.init(psutil.cpu_count(), affinity=0, num_io_processes=1, verbose=0) + APP.init(len(APP.inherited_affinity), affinity=0, num_io_processes=1, verbose=0) _init_default() diff --git a/README.rst b/README.rst index e4e2a217..ecf2e31e 100644 --- a/README.rst +++ b/README.rst @@ -16,7 +16,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. (Users / Recommended - Docker based) Run via. Stimela >= 0.2.9 -========================================================== +=============================================================== We recommend running the imaging package through the Stimela framework , built on a widely supported containerization framework, called Docker. This package is on PiPY and and is purely python-based, requiring no dependencies other than Docker. It gives the user instantanious access to other commonly used packages such as Meqtrees, CASA, etc. @@ -107,12 +107,12 @@ in an empty new virtual environment.** deactivate (Users/Optional) FitsBeam, Montblanc, Moresane, Killms support -========================================================== +================================================================ Optional requirements like the FITS beam can be installed by specifying them in brackets. E.g. pip install "/src/DDFacet/[dft-support,moresane-support,testing-requirements,fits-beam-support,kms-support]" (Users/Troubleshooting) Configure max shared memory -========================================================== +================================================================ Running DDFacet on large images requires a lot of shared memory. Most systems limit the amount of shared memory to about 10%. To increase this limit add the following line to your ``/etc/default/tmpfs`` file:: SHM_SIZE=100% @@ -137,21 +137,20 @@ cmake.define = {ENABLE_NATIVE_TUNING = "ON", ENABLE_FAST_MATH = "ON", ENABLE_PYT (Developers/Recommended): setting up your dev environment ========================================================== -**NOTE:Setup your virtual environment just as specified in the user section above. Ensure you activate! - WARNING: you may encounter issues if you have previously installed numpy in the environment - build isolation - will fail if the numpy you have installed is older than the build system pulls during isolation. - You may need to use --no-build-isolation when installing or, even better, ensure that you install DDF and KillMS - in an empty new virtual environment. -** +**NOTE:Setup your virtual environment just as specified in the user section above. Ensure you activate!** +**WARNING: you may encounter issues if you have previously installed numpy in the environment - build isolation** +**will fail if the numpy you have installed is older than the build system pulls during isolation.** +**You may need to use --no-build-isolation when installing or, even better, ensure that you install DDF and KillMS** +**in an empty new virtual environment.** To setup your local development environment navigate clone DDFacet and run:: (ddfvenv) $ git clone https://github.com/cyriltasse/DDFacet (ddfvenv) $ pip install -e DDFacet/ -**IMPORTANT NOTE: You may need to remove the development version before running PIP when installing. If you -are switching between release and debug versions of the backend -- or recompiling in a different configuration -- -you should remove the DDFacet/DDFacet/cbuild directory and everything in it** +**IMPORTANT NOTE: You may need to remove the development version before running PIP when installing. If you** +**are switching between release and debug versions of the backend -- or recompiling in a different configuration -- ** +**you should remove the DDFacet/DDFacet/cbuild directory and everything in it** Note that Python3.8 support is deprecated and editable installation is only tested to work on Python 3.10. @@ -170,13 +169,12 @@ Note that Python3.8 support is deprecated and editable installation is only test **Important: if you ran ``git submodule update --init --recursive`` before you may need to remove the cached SkyModel before building the docker image with ``git rm --cached SkyModel``** (Developers/Debugging) Build a few libraries (by hand with custom flags) -========================================================== +========================================================================= You can build against custom versions of libraries such is libPython and custom numpy versions. To do this modify pyproject.toml. Find and modify the following lines:: ``` cmake.build-type = "ReleaseWithDebugSymbols" # can be set to Debug e.g. -cmake.define = {ENABLE_NATIVE_TUNING = "OFF", ENABLE_FAST_MATH = "ON", ENABLE_PYTHON_2 = "OFF", ENABLE_PYTHON_3 = "ON"} # can be tuned to enable processor - # specific marching +cmake.define = {ENABLE_NATIVE_TUNING = "OFF", ENABLE_FAST_MATH = "ON", ENABLE_PYTHON_2 = "OFF", ENABLE_PYTHON_3 = "ON"} # can be tuned to enable processor specific marching ``` You can also specify path settings for other libraries if you have custom built, e.g. numpy through these ```cmake.define``` @@ -190,7 +188,7 @@ Add this to your ``.bashrc``:: export DDFACET_TEST_OUTPUT_DIR=[folder where you want the acceptance test output to be dumped] To test your branch against the master branch using Jenkins ---------------------------------------------------------- +------------------------------------------------------------ Most of the core use cases will in the nearby future have reference images and an automated acceptance test. Please **do not** commit against cyriltasse/master. The correct strategy is to branch/fork and do a pull request on Github diff --git a/pyproject.toml b/pyproject.toml index c363df2b..2fbb9d41 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,7 +51,7 @@ authors = [{ name = "Cyril Tasse", email = "cyril.tasse@obspm.fr" }] maintainers = [{ name = "Benjamin Hugo", email = "bhugo@sarao.ac.za" }] urls = { Homepage = "http://github.com/saopicc/DDFacet"} classifiers = [ - "Development Status :: 5 - Production / Stable", + "Development Status :: 5 - Production/Stable", "Environment :: Console", "Intended Audience :: Science/Research", "License :: OSI Approved :: GNU General Public License v2 (GPLv2)", @@ -80,7 +80,7 @@ classifiers = [ dft-support = ["montblanc >= 0.6.1, <= 0.7.3.1; python_version >= '3.8' and python_version < '3.11'"] moresane-support = ["pymoresane >= 0.3.0; python_version >= '3.8' and python_version < '3.11'"] fits-beam-support = ["meqtrees-cattery <= 1.7.9; python_version >= '3.8' and python_version < '3.11'"] -kms-support = ["bdsf > 1.8.15,<=1.10.1; python_version >= '3.8' and python_version < '3.11'"] +kms-support = ["bdsf > 1.8.15; python_version >= '3.8' and python_version < '3.11'"] alternate-data-backends = ["dask-ms[xarray]<=0.2.20; python_version >= '3.8' and python_version < '3.11'", "xarray<=2023.12.0; python_version >= '3.8' and python_version < '3.11'"] testing-requirements = ["nose >= 1.3.7; python_version >= '3' and python_version < '3.9'",