diff --git a/build/lib/monki/core.py b/build/lib/monki/core.py index 6a32610..21c55bb 100644 --- a/build/lib/monki/core.py +++ b/build/lib/monki/core.py @@ -1,3 +1,4 @@ +import collections import re import inspect from types import CodeType, ModuleType @@ -12,42 +13,7 @@ def patch(func, start='', end='', insert_lines=None, indent_lines=None, indent_i Easily modify a function's code at runtime. Can be called at most once on a single function. - Consider we want to patch the following function: - - def func(): - print('First') # line 0 - print('Second') # line 1 - print('Third') # line 2 - - Examples of different ways to patch it: - - # Wrap the function with start and end code - monki.patch(func, start="print('Starting')", end="print('Ending')") - func() - >>> 'Starting' - >>> 'First' - >>> 'Second' - >>> 'Third' - >>> 'Ending' - - # Inject lines at any offset - monki.patch(func, insert={1: "print('Injected line')", 2: "print('Another injection')"}) - func() - >>> 'First' - >>> 'Injected line' - >>> 'Second' - >>> 'Another injection' - >>> 'Third' - - # Indent existing lines. Let's use that along with injection in order to create a loop! - # This injects the `for` before line 1, and indents line 1 so it's inside the loop. - monki.patch(func, insert={1: "for i in range(3):"}, indent_lines=[1]) - func() - >>> 'First' - >>> 'Second' - >>> 'Second' - >>> 'Second' - >>> 'Third' + Consult the documentation for usage examples. :param func: The function to patch. :param start: Code to inject in the beginning, right after the signature. @@ -120,8 +86,9 @@ def _modify_closure_function(func, modified_source, throwaway_module): func_freevars = func.__code__.co_freevars freevars_declarations = '\n '.join('{} = None'.format(varname) for varname in func_freevars) - closure_signature, closure_body = _divide_source(modified_source) - closure_body = _indent_code(closure_body, indent_level=1, indent_string=_INDENT_STRING) + closure_signature, closure_body_lines = _divide_source(modified_source) + closure_body_lines = _indent_source_lines(closure_body_lines, indent_level=1) + closure_body = '\n'.join(line.code for line in closure_body_lines) closure_source = closure_signature + closure_body wrapper_name = '_wrapper' @@ -146,42 +113,65 @@ def _create_function_in_inner_module(function_source, module): raise ValueError('There\'s a problem with the indentation. Maybe forgot to set indent_inner?') from e -def _modify_source(func, start='', end='', before_lines=None, indent_inner=0, indent_lines=None): - if not any([start, end, before_lines]): - raise ValueError('Must supply code to inject or indent.') +_SourceLine = collections.namedtuple('SourceLine', ['code', 'number']) + - before_lines = before_lines or {} - indent_lines = indent_lines or {} +def _modify_source(func, start, end, insert_lines, indent_inner, indent_lines): + if not any([start, end, insert_lines, indent_lines]): + raise ValueError('Must supply code to inject or indent.') - func_source = _process_function_source(func) - end_indented, start_indented, before_lines = _process_injected_code(end, start, before_lines) - func_signature, func_body = _divide_source(func_source) - func_body = _process_body(before_lines, func_body, indent_lines) + func_source = _get_function_source(func) + func_signature, func_body_lines = _divide_source(func_source) + _put_wrappers_in_insert_lines(start, end, func_body_lines, insert_lines) if indent_inner: - func_body = _indent_code(func_body, indent_inner, _INDENT_STRING) + func_body_lines = _indent_source_lines(func_body_lines, indent_inner) - modified_source = func_signature \ - + start_indented \ - + func_body \ - + end_indented + insert_lines = _indent_inserted_lines(insert_lines) + func_body = _process_body(insert_lines, func_body_lines, indent_lines) + modified_source = func_signature + func_body return modified_source -def _process_body(before_lines, func_body, indent_lines): - body_lines = list(filter(lambda line: line, func_body.splitlines())) # remove empty lines +def _put_wrappers_in_insert_lines(start, end, func_body_lines, insert_lines): + _validate_no_collisions_between_wrappers_and_inserts(end, func_body_lines, insert_lines, start) + + if start: + insert_lines[0] = start + if end: + # if we want to add code at the end, we need a blank line to add the code _before_ of + last_line_index = len(func_body_lines) + func_body_lines.append(_SourceLine('', last_line_index)) + insert_lines[last_line_index] = end + + +def _indent_source_lines(func_body_lines, indent_level): + return [_SourceLine(_indent_code(line.code, indent_level), line.number) + for line in func_body_lines] + + +def _validate_no_collisions_between_wrappers_and_inserts(end, func_body, insert_lines, start): + # TODO: Probably refactor to have this in the central validator function + if (0 in insert_lines) and start: + raise ValueError('Can\'t both insert line at index 0 and set \'start\' argument') + if (len(func_body) in insert_lines) and end: + raise ValueError('Can\'t both insert line on the last index (+1) and set \'end\' argument') + + +def _process_body(insert_lines, func_body_lines, indent_lines): + # body_lines = list(filter(lambda line: line, func_body.splitlines())) # remove empty lines processed_lines = [] - for linenum, line in enumerate(body_lines): + for line, linenum in func_body_lines: indent_line = linenum in indent_lines if indent_line: indent_level = indent_lines[linenum] - line = _indent_code(line, indent_level, _INDENT_STRING) + line = _indent_code(line, indent_level) - inject_line = linenum in before_lines + inject_line = linenum in insert_lines if inject_line: - line_to_inject = before_lines[linenum] + line_to_inject = insert_lines[linenum] processed_lines.append(line_to_inject) processed_lines.append(line) @@ -197,46 +187,48 @@ def _divide_source(func_source): func_signature = func_source[:func_sig_end_index] if func_signature[-1].isspace(): - raise RuntimeError('This shouldn\'t happen - the regex should never return a signature ending with whitespace.') - - func_body = func_source[func_sig_end_index:].strip('\n') + raise AssertionError('This shouldn\'t happen - the regex should never return a signature ending with whitespace.') func_signature = func_signature + '\n' - func_body = '\n' + func_body + '\n' - return func_signature, func_body + func_body = func_source[func_sig_end_index:].strip('\n') + # func_body = '\n' + func_body + '\n' + func_body = func_body + body_lines = [_SourceLine(code, linenum) for linenum, code in enumerate(func_body.splitlines())] + + return func_signature, body_lines + # return func_signature, '\n'.join(line.code for line in body_lines) + # return func_signature, func_body -def _process_injected_code(end, start, before_lines): +def _indent_inserted_lines(insert_lines): # added code will always have an indentation level of 1 - immediately under the function - start_indented = _indent_injected_code(start) - end_indented = _indent_injected_code(end) - before_lines = {linenum: _indent_injected_code(code) for linenum, code in before_lines.items()} + insert_lines = {linenum: _indent_code(code, indent_level=1) for linenum, code in insert_lines.items()} - return end_indented, start_indented, before_lines + return insert_lines -def _indent_injected_code(start): - # TODO: probably pretty much the same code and _indent_lines, consider merging the two - if start: - start_indented = '\n' - start_indented += _INDENT_STRING + ('\n' + _INDENT_STRING).join(start.splitlines()) - start_indented += '\n' - else: - start_indented = start - return start_indented +# def _indent_line(code): +# # TODO: probably pretty much the same code and _indent_lines, consider merging the two +# if code: +# start_indented = '\n' +# start_indented += _INDENT_STRING + ('\n' + _INDENT_STRING).join(code.splitlines()) +# start_indented += '\n' +# else: +# start_indented = code +# return start_indented -def _process_function_source(func): +def _get_function_source(func): func_source = inspect.getsource(func) func_source = _unindent_source(func_source) func_source = _strip_leading_decorators(func_source) return func_source -def _indent_code(code, indent_level, indent_string): - relative_indent_string = indent_level * indent_string - return relative_indent_string + ('\n' + relative_indent_string).join(code.splitlines()) +def _indent_code(code, indent_level): + relative_indent_string = indent_level * _INDENT_STRING + return '\n' + relative_indent_string + ('\n' + relative_indent_string).join(code.splitlines()) + '\n' def _unindent_source(func_source): diff --git a/setup.py b/setup.py index bde0239..cb1f7b0 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name="monki", - version="0.1.0", + version="0.1.1", description="Patch functions at runtime, the easy way.", long_description=README, long_description_content_type="text/markdown",