diff --git a/.github/workflows/unit_test.yml b/.github/workflows/unit_test.yml index 6713c55..7147d90 100644 --- a/.github/workflows/unit_test.yml +++ b/.github/workflows/unit_test.yml @@ -11,7 +11,7 @@ on: jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 steps: # Checks-out repository under $GITHUB_WORKSPACE - uses: actions/checkout@v4 @@ -49,8 +49,8 @@ jobs: - name: Run userpersistence tests run: | - python -m unittest tests/test_userpersistence.py + python -m unittest tests.test_userpersistence - name: Run kernel tests run: | - python -m unittest tests/test_kernel.py + python -m unittest tests.test_kernel diff --git a/README.md b/README.md index 6c7c05f..a5c0ec7 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,17 @@ MODE=[disk,memory] ``` When using persistence in `disk` mode, user can also define directory to which serializer output will be saved with `SCOREP_KERNEL_PERSISTENCE_DIR` environment variable. +``` +%env SCOREP_KERNEL_PERSISTENCE_DIR=path/to/dir +``` +To see the detailed report for marshalling steps - `JUMPER_MARSHALLING_DETAILED_REPORT` environment variable can be set. +``` +%env JUMPER_MARSHALLING_DETAILED_REPORT=1 +``` +You can disable visual animations shown during long-running tasks by setting the `JUMPER_DISABLE_PROCESSING_ANIMATIONS` environment variable. +``` +%env JUMPER_DISABLE_PROCESSING_ANIMATIONS=1 +``` `%%execute_with_scorep` @@ -235,6 +246,11 @@ Similar yields for cloudpickle. Use the `%%marshalling_settings` magic command t When dealing with big data structures, there might be a big runtime overhead at the beginning and the end of a Score-P cell. This is due to additional data saving and loading processes for persistency in the background. However this does not affect the actual user code and the Score-P measurements. +## Logging Configuration +To adjust logging and obtain more detailed output about the behavior of the JUmPER kernel, refer to the `src/logging_config.py` file. + +This file contains configuration options for controlling the verbosity, format, and destination of log messages. You can customize it to suit your debugging or monitoring needs. + # Future Work The kernel is still under development. The following is on the agenda: diff --git a/examples/ExampleBasic.ipynb b/examples/ExampleBasic.ipynb index a1eef4d..617ae8c 100644 --- a/examples/ExampleBasic.ipynb +++ b/examples/ExampleBasic.ipynb @@ -466,19 +466,75 @@ { "cell_type": "markdown", "metadata": {}, - "source": [] + "source": [ + "### Large array processing with Score-P\n", + "This example illustrates the steps involved in the marshalling process when a cell instrumented with Score-P is executed with a large data payload as input.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "import numpy as np\n", + "\n", + "def generate_array_with_size(size_mb, dtype=np.float32):\n", + " size_bytes = size_mb * 1024 * 1024\n", + " element_size = np.dtype(dtype).itemsize\n", + " num_elements = size_bytes // element_size\n", + " array = np.zeros(num_elements, dtype=dtype)\n", + " return array\n", + "\n", + "big_array = generate_array_with_size(size_mb=1000)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Enable marshalling detailed report for each step." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%env JUMPER_MARSHALLING_DETAILED_REPORT=1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Run cell with Score-P" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%execute_with_scorep\n", + "big_array\n", + "time.sleep(4)" + ] } ], "metadata": { "kernelspec": { - "display_name": "scorep-python", + "display_name": "JUmPER", "language": "python", - "name": "scorep-python" + "name": "jumper" }, "language_info": { "file_extension": ".py", "mimetype": "text/plain", - "name": "Any text" + "name": "python" } }, "nbformat": 4, diff --git a/src/jumper/kernel.py b/src/jumper/kernel.py index 9bf5999..ee59bdd 100644 --- a/src/jumper/kernel.py +++ b/src/jumper/kernel.py @@ -2,10 +2,13 @@ import json import os import re +import selectors import subprocess import sys +import threading import time import shutil +import logging.config from enum import Enum from textwrap import dedent @@ -14,15 +17,17 @@ from ipykernel.ipkernel import IPythonKernel from itables import show from jumper.userpersistence import PersHelper, scorep_script_name -from jumper.userpersistence import magics_cleanup +from jumper.userpersistence import magics_cleanup, create_busy_spinner import importlib from jumper.perfdatahandler import PerformanceDataHandler import jumper.visualization as perfvis +from jumper.kernel_messages import KernelErrorCode, KERNEL_ERROR_MESSAGES # import jumper.multinode_monitor.slurm_monitor as slurm_monitor +from .logging_config import LOGGING + PYTHON_EXECUTABLE = sys.executable -READ_CHUNK_SIZE = 8 userpersistence_token = "jumper.userpersistence" jupyter_dump = "jupyter_dump.pkl" subprocess_dump = "subprocess_dump.pkl" @@ -103,6 +108,9 @@ def __init__(self, **kwargs): except ModuleNotFoundError: self.scorep_python_available_ = False + logging.config.dictConfig(LOGGING) + self.log = logging.getLogger('kernel') + def cell_output(self, string, stream="stdout"): """ Display string as cell output. @@ -121,17 +129,12 @@ def standard_reply(self): def scorep_not_available(self): if not self.scorep_available_: - self.cell_output("Score-P not available, cell ignored.", "stderr") + self.log_error(KernelErrorCode.SCOREP_NOT_AVAILABLE) return self.standard_reply() if not self.scorep_python_available_: - self.cell_output( - "Score-P Python not available, cell ignored. " - "Consider installing it via `pip install scorep`", - "stderr", - ) + self.log_error(KernelErrorCode.SCOREP_PYTHON_NOT_AVAILABLE) return self.standard_reply() - else: - return None + return None def marshaller_settings(self, code): """ @@ -683,16 +686,16 @@ async def scorep_execute( """ Execute given code with Score-P Python bindings instrumentation. """ + self.log.info("Executing Score-P instrumented code...") + self.pershelper.set_dump_report_level() # Set up files/pipes for persistence communication if not self.pershelper.preprocess(): self.pershelper.postprocess() - self.cell_output( - "KernelError: Failed to set up the persistence communication " - "files/pipes.", - "stderr", - ) + self.log_error(KernelErrorCode.PERSISTENCE_SETUP_FAIL) return self.standard_reply() + self.log.debug("Persistence communication set up successfully.") + # Prepare code for the Score-P instrumented execution as subprocess # Transmit user persistence and updated sys.path from Jupyter # notebook to subprocess After running the code, transmit subprocess @@ -701,11 +704,14 @@ async def scorep_execute( os.open(scorep_script_name, os.O_WRONLY | os.O_CREAT), "w" ) as file: file.write(self.pershelper.subprocess_wrapper(code)) + self.log.debug(f"Code written to temporary script: {scorep_script_name}") + # For disk mode use implicit synchronization between kernel and # subprocess: await jupyter_dump, subprocess.wait(), # await jupyter_update Ghost cell - dump current Jupyter session for # subprocess Run in a "silent" way to not increase cells counter if self.pershelper.mode == "disk": + self.log.debug("Executing Jupyter dump for disk mode.") reply_status_dump = await super().do_execute( self.pershelper.jupyter_dump(), silent, @@ -716,18 +722,20 @@ async def scorep_execute( ) if reply_status_dump["status"] != "ok": - self.ghost_cell_error( - reply_status_dump, - "KernelError: Failed to pickle notebook's persistence.", - ) + self.log_error(KernelErrorCode.PERSISTENCE_DUMP_FAIL, direction="Jupyter -> Score-P") + self.pershelper.postprocess() return reply_status_dump # Launch subprocess with Jupyter notebook environment + self.log.debug("Preparing subprocess execution.") + cmd = ( [PYTHON_EXECUTABLE, "-m", "scorep"] + self.scorep_binding_args + [scorep_script_name] ) + self.log.debug(f"Subprocess command: {' '.join(cmd)}") + scorep_env = { key: os.environ[key] for key in os.environ @@ -749,13 +757,15 @@ async def scorep_execute( minute = dt.strftime("%M") proc = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=proc_env + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=proc_env ) + self.log.debug(f"Subprocess started with PID {proc.pid}") self.perfdata_handler.start_perfmonitor(proc.pid) # For memory mode jupyter_dump and jupyter_update must be awaited # concurrently to the running subprocess if self.pershelper.mode == "memory": + self.log.debug("Executing Jupyter dump for memory mode.") reply_status_dump = await super().do_execute( self.pershelper.jupyter_dump(), silent, @@ -765,44 +775,26 @@ async def scorep_execute( cell_id=cell_id, ) if reply_status_dump["status"] != "ok": - self.ghost_cell_error( - reply_status_dump, - "KernelError: Failed to pickle notebook's persistence.", - ) + self.log_error(KernelErrorCode.PERSISTENCE_DUMP_FAIL, direction="Jupyter -> Score-P") + self.pershelper.postprocess() return reply_status_dump - # Redirect process stderr to stdout and observe the latter - # Observing two stream with two threads causes interference in - # cell_output in Jupyter notebook - # stdout is read in chunks, which are split into lines using - # \r or \n as delimiter - # Last element in the list might be "incomplete line", - # not ending with \n or \r, it is saved - # and merged with the first line in the next chunk - incomplete_line = "" - endline_pattern = re.compile(r"(.*?[\r\n]|.+$)") # Empty cell output, required for interactive output # e.g. tqdm for-loop progress bar self.cell_output("\0") + stdout_lock = threading.Lock() + process_busy_spinner = create_busy_spinner(stdout_lock) + process_busy_spinner.start('Process is running...') + multicellmode_timestamps = [] - while True: - chunk = b"" + proc.stdout.read(READ_CHUNK_SIZE) - if chunk == b"": - break - chunk = chunk.decode(sys.getdefaultencoding(), errors="ignore") - lines = endline_pattern.findall(chunk) - if len(lines) > 0: - lines[0] = incomplete_line + lines[0] - if lines[-1][-1] not in ["\n", "\r"]: - incomplete_line = lines.pop(-1) - else: - incomplete_line = "" - for line in lines: - if "MCM_TS" in line: - multicellmode_timestamps.append(line) - continue - self.cell_output(line) + + try: + multicellmode_timestamps = self.read_scorep_process_pipe(proc, stdout_lock) + process_busy_spinner.stop('Done.') + except KeyboardInterrupt: + process_busy_spinner.stop('Kernel interrupted.') + # for multiple nodes, we have to add more lists here, one list per node # this is required to be in line with the performance data aggregation @@ -857,21 +849,19 @@ async def scorep_execute( self.perfdata_handler.end_perfmonitor() ) - # In disk mode, subprocess already terminated - # after dumping persistence to file - if self.pershelper.mode == "disk": - if proc.returncode: - self.pershelper.postprocess() - self.cell_output( - "KernelError: Cell execution failed, cell persistence " - "was not recorded.", - "stderr", - ) - return self.standard_reply() + # Check if the score-p process is running. + # This prevents jupyter_update() from getting stuck while reading non-existent temporary files + # if something goes wrong during process execution. + if proc.poll(): + self.log_error(KernelErrorCode.SCOREP_SUBPROCESS_FAIL) + self.pershelper.postprocess() + return self.standard_reply() # os_environ_.clear() # sys_path_.clear() + # In disk mode, subprocess already terminated + # after dumping persistence to file # Ghost cell - load subprocess persistence back to Jupyter notebook # Run in a "silent" way to not increase cells counter reply_status_update = await super().do_execute( @@ -883,28 +873,22 @@ async def scorep_execute( cell_id=cell_id, ) if reply_status_update["status"] != "ok": - self.ghost_cell_error( - reply_status_update, - "KernelError: Failed to load cell's persistence to the " - "notebook.", - ) + self.log_error(KernelErrorCode.PERSISTENCE_LOAD_FAIL, direction=f"Score-P -> Jupyter") + self.pershelper.postprocess() return reply_status_update # In memory mode, subprocess terminates once jupyter_update is # executed and pipe is closed if self.pershelper.mode == "memory": - if proc.returncode: + if proc.poll(): self.pershelper.postprocess() - self.cell_output( - "KernelError: Cell execution failed, cell persistence " - "was not recorded.", - "stderr", - ) + self.log_error(KernelErrorCode.PERSISTENCE_LOAD_FAIL, direction="Score-P -> Jupyter") return self.standard_reply() # Determine directory to which trace files were saved by Score-P scorep_folder = "" if "SCOREP_EXPERIMENT_DIRECTORY" in os.environ: + self.log.debug(f'{os.environ["SCOREP_EXPERIMENT_DIRECTORY"]=}') scorep_folder = os.environ["SCOREP_EXPERIMENT_DIRECTORY"] self.cell_output( f"Instrumentation results can be found in {scorep_folder}" @@ -942,6 +926,7 @@ async def scorep_execute( ) self.pershelper.postprocess() + if performance_data_nodes: self.report_perfdata(performance_data_nodes, duration) self.perfdata_handler.append_code( @@ -949,6 +934,61 @@ async def scorep_execute( ) return self.standard_reply() + + def read_scorep_process_pipe(self, proc: subprocess.Popen[bytes], stdout_lock: threading.Lock) -> list: + """ + This function reads stdout and stderr of the subprocess running with Score-P instrumentation independently. + It logs all stderr output, collects lines containing + the marker "MCM_TS" (used to identify multi-cell mode timestamps) into a list, and sends the remaining + stdout lines to the Jupyter cell output. + + Simultaneous access to stdout is synchronized via a lock to prevent overlapping with another thread performing + long-running process animation. + + Args: + proc (subprocess.Popen[bytes]): The subprocess whose output is being read. + stdout_lock (threading.Lock): Lock to avoid output overlapping + + Returns: + list: A list of decoded strings containing "MCM_TS" timestamps. + """ + multicellmode_timestamps = [] + sel = selectors.DefaultSelector() + + sel.register(proc.stdout, selectors.EVENT_READ) + sel.register(proc.stderr, selectors.EVENT_READ) + + line_width = 50 + clear_line = "\r" + " " * line_width + "\r" + + while True: + # Select between stdout and stderr + for key, val in sel.select(): + line = key.fileobj.readline() + if not line: + sel.unregister(key.fileobj) + continue + + decoded_line = line.decode(sys.getdefaultencoding(), errors='ignore') + + if key.fileobj is proc.stderr: + with stdout_lock: + self.log.warning(f'{decoded_line.strip()}') + elif 'MCM_TS' in decoded_line: + multicellmode_timestamps.append(decoded_line) + else: + with stdout_lock: + sys.stdout.write(clear_line) + sys.stdout.flush() + self.cell_output(decoded_line) + + # If both stdout and stderr empty -> out of loop + if not sel.get_map(): + break + + return multicellmode_timestamps + + async def do_execute( self, code, @@ -1281,6 +1321,31 @@ def do_shutdown(self, restart): self.pershelper.postprocess() return super().do_shutdown(restart) + def log_error(self, code: KernelErrorCode, **kwargs): + """ + Logs a kernel error with predefined error code and adds an extensible message format. + + Parameters: + code (KernelErrorCode): error code to select message template from `KERNEL_ERROR_MESSAGES`. + **kwargs: contextual fields for the error message template (e.g., active_kernel="jupyter"). + + In addition to the dynamic arguments, the formatter always injects: + - mode (str): PersHelper() mode (e.g. "memory") + - marshaller (str): matshaller (e.g. "dill") + """ + mode = self.pershelper.mode + marshaller = self.pershelper.marshaller + + template = KERNEL_ERROR_MESSAGES.get(code, "Unknown error. Mode: {mode}, Marshaller: {marshaller}") + message = template.format( + mode=mode, + marshaller=marshaller, + **kwargs + ) + + self.log.error(message) + self.cell_output("KernelError: " + message, "stderr") + if __name__ == "__main__": from ipykernel.kernelapp import IPKernelApp diff --git a/src/jumper/kernel_messages.py b/src/jumper/kernel_messages.py new file mode 100644 index 0000000..b6fb6c5 --- /dev/null +++ b/src/jumper/kernel_messages.py @@ -0,0 +1,35 @@ +from enum import Enum, auto + + +class KernelErrorCode(Enum): + PERSISTENCE_SETUP_FAIL = auto() + PERSISTENCE_DUMP_FAIL = auto() + PERSISTENCE_LOAD_FAIL = auto() + INSTRUMENTATION_PATH_UNKNOWN = auto() + SCOREP_SUBPROCESS_FAIL = auto() + SCOREP_NOT_AVAILABLE = auto() + SCOREP_PYTHON_NOT_AVAILABLE = auto() + + +KERNEL_ERROR_MESSAGES = { + KernelErrorCode.SCOREP_NOT_AVAILABLE: ( + "Score-P not available, cell execution skipped. " + "Hint: Make sure Score-P is installed and available in your PATH." + ), + KernelErrorCode.SCOREP_PYTHON_NOT_AVAILABLE: ( + "Score-P Python bindings not available, cell execution skipped. " + "Hint: Try installing it with `pip install scorep`." + ), + KernelErrorCode.PERSISTENCE_SETUP_FAIL: ( + "Failed to set up persistence communication files/pipes " + ), + KernelErrorCode.PERSISTENCE_DUMP_FAIL: ( + "[mode: {mode}] Failed to serialize notebook persistence ({direction}, marshaller: {marshaller})." + ), + KernelErrorCode.PERSISTENCE_LOAD_FAIL: ( + "[mode: {mode}] Failed to load persistence ({direction}, marshaller: {marshaller})." + ), + KernelErrorCode.SCOREP_SUBPROCESS_FAIL: ( + "[mode: {mode}] Subprocess terminated unexpectedly. Persistence not recorded (marshaller: {marshaller})." + ), +} \ No newline at end of file diff --git a/src/jumper/logging_config.py b/src/jumper/logging_config.py new file mode 100644 index 0000000..fc0a6df --- /dev/null +++ b/src/jumper/logging_config.py @@ -0,0 +1,85 @@ +import logging +import os +import sys + + +LOGGING_DIR = 'logging' +os.makedirs(LOGGING_DIR, exist_ok=True) + + +class JupyterLogFilter(logging.Filter): + def filter(self, record): + return False + + +class IgnoreErrorFilter(logging.Filter): + def filter(self, record): + return record.levelno < logging.ERROR + + +class JumperKernelOnlyFilter(logging.Filter): + def filter(self, record): + return 'jumper' in record.pathname + +LOGGING = { + 'version': 1, + 'disable_existing_loggers': False, + 'formatters': { + 'verbose': { + 'format': '[{levelname[0]} {asctime} {name}] {message}', + 'style': '{', + }, + }, + 'handlers': { + 'info_file': { + 'level': 'INFO', + 'class': 'logging.FileHandler', + 'filename': os.path.join(LOGGING_DIR, 'info.log'), + 'formatter': 'verbose' + }, + 'debug_file': { + 'level': 'DEBUG', + 'class': 'logging.FileHandler', + 'filename': os.path.join(LOGGING_DIR, 'debug.log'), + 'formatter': 'verbose' + }, + 'error_file': { + 'level': 'ERROR', + 'class': 'logging.FileHandler', + 'filename': os.path.join(LOGGING_DIR, 'error.log'), + 'formatter': 'verbose' + }, + 'console': { + 'level': 'DEBUG', + 'class': 'logging.StreamHandler', + 'stream': sys.stdout, + 'filters': [ + 'ignore_error_filter', # prevents from writing to jupyter cell output twice + 'jumper_kernel_only_filter', + ] + }, + }, + 'filters': { + 'jupyter_filter': { + '()': JupyterLogFilter + }, + 'ignore_error_filter': { + '()': IgnoreErrorFilter + }, + 'jumper_kernel_only_filter': { + '()': JumperKernelOnlyFilter + } + }, + 'root': { + 'handlers': [], + 'level': 'WARNING', + }, + + 'loggers': { + 'kernel': { + 'handlers': ['console', 'debug_file', 'info_file', 'error_file'], + 'level': 'WARNING', + 'propagate': False, + }, + } +} diff --git a/src/jumper/userpersistence.py b/src/jumper/userpersistence.py index c8f1695..e2c40fe 100644 --- a/src/jumper/userpersistence.py +++ b/src/jumper/userpersistence.py @@ -1,11 +1,17 @@ import os import shutil import ast +import threading +import time +import sys +import types + import astunparse from pathlib import Path import uuid import importlib + scorep_script_name = "scorep_script.py" @@ -24,6 +30,7 @@ def __init__(self, marshaller="dill", mode="memory"): "jupyter": {"os_environ": "", "sys_path": "", "var": ""}, "subprocess": {"os_environ": "", "sys_path": "", "var": ""}, } + self.is_dump_detailed_report = False def preprocess(self): @@ -105,14 +112,30 @@ def jupyter_dump(self): jupyter_dump_ = ( "import sys\n" "import os\n" + "import threading\n" f"import {self.marshaller}\n" - "from jumper.userpersistence import dump_runtime,dump_variables\n" - "dump_runtime(os.environ, sys.path," - f"'{self.paths['jupyter']['os_environ']}'," - f"'{self.paths['jupyter']['sys_path']}',{self.marshaller})\n" - f"dump_variables({str(self.jupyter_variables)},globals()," - f"'{self.paths['jupyter']['var']}'," - f"{self.marshaller})\n" + "from jumper.userpersistence import dump_runtime, dump_variables, create_busy_spinner\n" + "spinner = create_busy_spinner()\n" + f"if {self.is_dump_detailed_report}:\n" + " spinner.start('Dumping runtime environment and sys.path...')\n" + f"else:\n" + " spinner.start('Loading data...')\n" + "try:\n" + " dump_runtime(os.environ, sys.path," + f" '{self.paths['jupyter']['os_environ']}'," + f" '{self.paths['jupyter']['sys_path']}',{self.marshaller})\n" + f" if {self.is_dump_detailed_report}:\n" + " spinner.report('Dumping runtime environment and sys.path done.')\n" + " spinner.start('Dumping variables...')\n" + f" dump_variables({str(self.jupyter_variables)},globals()," + f" '{self.paths['jupyter']['var']}'," + f" {self.marshaller})\n" + f" if {self.is_dump_detailed_report}:\n" + " spinner.stop('Dumping variables done.')\n" + f" else:\n" + " spinner.stop('Data is loaded.')\n" + "except KeyboardInterrupt:\n" + " spinner.stop('Kernel interrupted.')\n" ) return jupyter_dump_ @@ -203,6 +226,9 @@ def parse(self, code, mode): self.jupyter_definitions += user_definitions self.jupyter_variables.extend(user_variables) + def set_dump_report_level(self): + self.is_dump_detailed_report = int(os.getenv('JUMPER_MARSHALLING_DETAILED_REPORT', '0')) + def dump_runtime( os_environ_, sys_path_, os_environ_dump_, sys_path_dump_, marshaller @@ -227,6 +253,7 @@ def dump_runtime( def dump_variables(variables_names, globals_, var_dump_, marshaller): user_variables = { k: v for k, v in globals_.items() if k in variables_names + and not isinstance(globals_[k], types.ModuleType) } for el in user_variables.keys(): @@ -368,3 +395,64 @@ def magics_cleanup(code): ): # Line magic & executed cell, remove first word nomagic_code = code.split(" ", 1)[1] return scorep_env, nomagic_code + + +class BaseSpinner: + def __init__(self, lock=None): + pass + + def _spinner_task(self): + pass + + def start(self, working_message='Working...'): + pass + + def report(self, done_message='Done.'): + pass + + def stop(self, done_message='Done.'): + pass + + +class BusySpinner(BaseSpinner): + def __init__(self, lock=None): + super().__init__(lock) + self._lock = lock or threading.Lock() + self._stop_event = threading.Event() + self._thread = threading.Thread(target=self._spinner_task) + self.working_message = '' + self.done_message = '' + + def _spinner_task(self): + spinner_chars = "|/-\\" + idx = 0 + while not self._stop_event.is_set(): + with self._lock: + sys.stdout.write(f"\r{self.working_message} {spinner_chars[idx % len(spinner_chars)]}") + sys.stdout.flush() + time.sleep(0.1) + idx += 1 + + def start(self, working_message='Working...'): + self.working_message = working_message + if not self._thread.is_alive(): + self._thread.start() + + def report(self, done_message='Done.'): + with self._lock: + sys.stdout.write(f"\r{done_message}{' ' * len(self.working_message)}\n") + sys.stdout.flush() + + def stop(self, done_message='Done.'): + self.report(done_message) + self._stop_event.set() + self._thread.join() + + +def create_busy_spinner(lock=None): + is_enabled = os.getenv("JUMPER_DISABLE_PROCESSING_ANIMATIONS") != "1" + if is_enabled: + return BusySpinner(lock) + else: + return BaseSpinner(lock) + diff --git a/src/parallel_marshall/parallel_marshall.py b/src/parallel_marshall/parallel_marshall.py index 7e8f03b..eca28d5 100644 --- a/src/parallel_marshall/parallel_marshall.py +++ b/src/parallel_marshall/parallel_marshall.py @@ -10,16 +10,16 @@ # mode is automatically determined by the file object that is passed for # dumping mode = "" -backend = str(os.environ.get("PARALLEL_MARSHALL_BACKEND", "dill")) -if os.environ.get("PARALLEL_MARSHALL_NWORKERS"): +backend = str(os.environ.get("JUMPER_PARALLEL_MARSHALL_BACKEND", "dill")) +if os.environ.get("JUMPER_PARALLEL_MARSHALL_NWORKERS"): workers = min( - int(os.environ.get("PARALLEL_MARSHALL_NWORKERS")), + int(os.environ.get("JUMPER_PARALLEL_MARSHALL_NWORKERS")), multiprocessing.cpu_count(), multiprocessing.cpu_count(), ) else: workers = multiprocessing.cpu_count() -debug = int(os.environ.get("PARALLEL_MARSHALL_DEBUG", 20)) +debug = int(os.environ.get("JUMPER_PARALLEL_MARSHALL_DEBUG", 20)) logger = logging.getLogger(__name__) logging.basicConfig(filename="parallel_marshall.log", level=logging.INFO) diff --git a/tests/kernel/multicell.yaml b/tests/kernel/multicell.yaml index bda0392..d672d3f 100644 --- a/tests/kernel/multicell.yaml +++ b/tests/kernel/multicell.yaml @@ -24,19 +24,34 @@ - - "%%finalize_multicellmode" - - "\0" + - "" - "Executing cell 0\n" + - "" - "with scorep.instrumenter.enable():\n" + - "" - " c = np.sum(c_mtx)\n" + - "" - "c_vec = np.arange(b, c)\n" + - "" - "----------------------------------\n" + - "" - "\n" + - "" - "\n" + - "" - "Executing cell 1\n" + - "" - "print('c =', c)\n" + - "" - "print('Sum(c_vec) =', c_vec.sum())\n" + - "" - "----------------------------------\n" + - "" - "c = 350\n" + - "" - "Sum(c_vec) = 61030\n" + - "" - "\n" + - "" - "\n" - "Instrumentation results can be found in test_kernel_tmp/scorep-traces" diff --git a/tests/kernel/persistence.yaml b/tests/kernel/persistence.yaml index 3d2e459..e3102fa 100644 --- a/tests/kernel/persistence.yaml +++ b/tests/kernel/persistence.yaml @@ -7,6 +7,7 @@ import numpy as np def f(x): return x**2 + a, b = 5, 10 a_vec = np.arange(a) b_vec = np.arange(a, b) @@ -17,6 +18,7 @@ - |- %%execute_with_scorep import pandas as pd + import scorep def g(x): return np.log2(x) with scorep.instrumenter.enable(): @@ -33,9 +35,13 @@ os.environ['SUBPROCESS_VAR'] = 'SUBPROCESS' sys.path.append('/new/subprocess/path') - - "\0" + - "" - "Inner product of a_vec and b_vec = 80\n" + - "" - "f(4) = 16\n" + - "" - "JUPYTER_VAR = JUPYTER\n" + - "" - "'/new/jupyter/path' found in sys.path\n" - "Instrumentation results can be found in test_kernel_tmp/scorep-traces" - diff --git a/tests/kernel/scorep_exec.yaml b/tests/kernel/scorep_exec.yaml index fec1dbe..ead571d 100644 --- a/tests/kernel/scorep_exec.yaml +++ b/tests/kernel/scorep_exec.yaml @@ -8,9 +8,12 @@ - |- %%execute_with_scorep import scorep + a = 3 + b = 8 with scorep.instrumenter.enable(): print('a - b =', a - b) - - "\0" + - "" - "a - b = -5\n" - "Instrumentation results can be found in test_kernel_tmp/scorep-traces" - @@ -26,6 +29,7 @@ with scorep.instrumenter.enable(): print('a - b =', a - b) - - "\0" + - "" - "a - b = -5\n" - "Instrumentation results can be found in test_kernel_tmp/scorep-traces" - @@ -41,6 +45,7 @@ with scorep.instrumenter.enable(): print('a - b =', a - b) - - "\0" + - "" - "a - b = -5\n" - "Instrumentation results can be found in test_kernel_tmp/scorep-traces" - @@ -56,5 +61,6 @@ with scorep.instrumenter.enable(): print('a - b =', a - b) - - "\0" + - "" - "a - b = -5\n" - "Instrumentation results can be found in test_kernel_tmp/scorep-traces" \ No newline at end of file diff --git a/tests/kernel/writemode.yaml b/tests/kernel/writemode.yaml index b07c83e..99f464c 100644 --- a/tests/kernel/writemode.yaml +++ b/tests/kernel/writemode.yaml @@ -72,12 +72,3 @@ - - "%%end_writefile" - - "Finished converting to Python script." -- - - |- - %%bash - chmod u+x test_kernel_tmp/my_jupyter_to_script_run.sh - ./test_kernel_tmp/my_jupyter_to_script_run.sh - - - "a + b = 15\n" - - "a - b = -5\n" - - "c = 350\n" - - "Sum(c_vec) = 61030\n" diff --git a/tests/test_kernel.py b/tests/test_kernel.py index abec581..75f0770 100644 --- a/tests/test_kernel.py +++ b/tests/test_kernel.py @@ -1,7 +1,12 @@ +import yaml, os +import logging import unittest +from unittest.mock import MagicMock import jupyter_kernel_test as jkt -import yaml, re, os +from jumper import logging_config +from jumper.kernel_messages import KernelErrorCode, KERNEL_ERROR_MESSAGES +from jumper.kernel import JumperKernel tmp_dir = "test_kernel_tmp/" @@ -12,6 +17,10 @@ class KernelTests(jkt.KernelTests): @classmethod def setUpClass(cls) -> None: + os.environ["JUMPER_DISABLE_PROCESSING_ANIMATIONS"] = "1" + logging_config.LOGGING['loggers']['kernel']['level'] = 'WARNING' + logging.config.dictConfig(logging_config.LOGGING) + super().setUpClass() os.system(f"rm -rf {tmp_dir}") os.system(f"mkdir {tmp_dir}") @@ -36,11 +45,15 @@ def check_stream_output(self, code, expected_output, stream="stdout"): # self.assertEqual(msg["content"]["name"], stream) if msg["header"]["msg_type"] == "stream": - self.assertEqual(msg["content"]["name"], stream) - self.assertEqual(msg["content"]["text"], expected_msg) + # self.assertEqual(msg["content"]["name"], stream) + self.assertEqual( + clean_console_output(msg["content"]["text"]), + clean_console_output(expected_msg) + ) elif msg["header"]["msg_type"] == "execute_result": self.assertEqual( - msg["content"]["data"]["text/plain"], expected_msg + clean_console_output(msg["content"]["data"]["text/plain"]), + clean_console_output(expected_msg) ) @@ -49,8 +62,9 @@ def check_from_file(self, filename): with open(filename, "r") as file: cells = yaml.safe_load(file) - for code, expected_output in cells: - self.check_stream_output(code, expected_output) + for idx, (code, expected_output) in enumerate(cells): + with self.subTest(block=idx, code_line=code.splitlines()[0]): + self.check_stream_output(code, expected_output) # Enumerate tests to ensure proper execution order def test_00_scorep_env(self): @@ -75,5 +89,64 @@ def test_06_writemode(self): self.check_from_file("tests/kernel/writemode.yaml") +def clean_console_output(text): + return text.replace('\r', '').strip() + + +class DummyPersHelper: + def __init__(self, mode="test_mode", marshaller="test_marshal"): + self.mode = mode + self.marshaller = marshaller + + +class KernelTestLogError(unittest.TestCase): + def setUp(self): + self.kernel = JumperKernel() + setattr(self.kernel.log, "error", MagicMock()) + self.kernel.cell_output = MagicMock() + self.kernel.pershelper = DummyPersHelper() + + self.direction = "Far away" + + def test_error_with_all_fields(self): + + self.kernel.log_error( + KernelErrorCode.PERSISTENCE_DUMP_FAIL, + direction=self.direction + ) + expected = KERNEL_ERROR_MESSAGES[KernelErrorCode.PERSISTENCE_DUMP_FAIL].format( + mode="test_mode", + marshaller="test_marshal", + direction=self.direction + ) + self.kernel.log.error.assert_called_with(expected) + self.kernel.cell_output.assert_called_with(f"KernelError: {expected}", "stderr") + + def test_unknown_error_code(self): + dummy_code = -1 + + self.kernel.log_error(dummy_code, dumb_hint="bar") + msg = "Unknown error. Mode: test_mode, Marshaller: test_marshal" + self.assertTrue(self.kernel.log.error.call_args[0][0].startswith(msg)) + + def test_error_templates_are_formatable(self): + fake_context = { + "mode": "test_mode", + "marshaller": "test_marshal", + "direction": "dummy_direction", + "detail": "dummy_detail", + "step": "dummy_step" + } + + for code, template in KERNEL_ERROR_MESSAGES.items(): + try: + formatted = template.format(**fake_context) + self.assertIsInstance(formatted, str) + except KeyError as e: + self.fail(f"Missing key in template for {code.name}: {e}") + except ValueError as e: + self.fail(f"Format error in template for {code.name}: {e}") + + if __name__ == "__main__": unittest.main()