Skip to content

Commit

Permalink
Incremented version to 0.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
AvivC committed Feb 16, 2019
1 parent f64bb1b commit 669084c
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 82 deletions.
154 changes: 73 additions & 81 deletions build/lib/monki/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import collections
import re
import inspect
from types import CodeType, ModuleType
Expand All @@ -12,42 +13,7 @@ def patch(func, start='', end='', insert_lines=None, indent_lines=None, indent_i
Easily modify a function's code at runtime.
Can be called at most once on a single function.
Consider we want to patch the following function:
def func():
print('First') # line 0
print('Second') # line 1
print('Third') # line 2
Examples of different ways to patch it:
# Wrap the function with start and end code
monki.patch(func, start="print('Starting')", end="print('Ending')")
func()
>>> 'Starting'
>>> 'First'
>>> 'Second'
>>> 'Third'
>>> 'Ending'
# Inject lines at any offset
monki.patch(func, insert={1: "print('Injected line')", 2: "print('Another injection')"})
func()
>>> 'First'
>>> 'Injected line'
>>> 'Second'
>>> 'Another injection'
>>> 'Third'
# Indent existing lines. Let's use that along with injection in order to create a loop!
# This injects the `for` before line 1, and indents line 1 so it's inside the loop.
monki.patch(func, insert={1: "for i in range(3):"}, indent_lines=[1])
func()
>>> 'First'
>>> 'Second'
>>> 'Second'
>>> 'Second'
>>> 'Third'
Consult the documentation for usage examples.
:param func: The function to patch.
:param start: Code to inject in the beginning, right after the signature.
Expand Down Expand Up @@ -120,8 +86,9 @@ def _modify_closure_function(func, modified_source, throwaway_module):
func_freevars = func.__code__.co_freevars
freevars_declarations = '\n '.join('{} = None'.format(varname) for varname in func_freevars)

closure_signature, closure_body = _divide_source(modified_source)
closure_body = _indent_code(closure_body, indent_level=1, indent_string=_INDENT_STRING)
closure_signature, closure_body_lines = _divide_source(modified_source)
closure_body_lines = _indent_source_lines(closure_body_lines, indent_level=1)
closure_body = '\n'.join(line.code for line in closure_body_lines)
closure_source = closure_signature + closure_body

wrapper_name = '_wrapper'
Expand All @@ -146,42 +113,65 @@ def _create_function_in_inner_module(function_source, module):
raise ValueError('There\'s a problem with the indentation. Maybe forgot to set indent_inner?') from e


def _modify_source(func, start='', end='', before_lines=None, indent_inner=0, indent_lines=None):
if not any([start, end, before_lines]):
raise ValueError('Must supply code to inject or indent.')
_SourceLine = collections.namedtuple('SourceLine', ['code', 'number'])


before_lines = before_lines or {}
indent_lines = indent_lines or {}
def _modify_source(func, start, end, insert_lines, indent_inner, indent_lines):
if not any([start, end, insert_lines, indent_lines]):
raise ValueError('Must supply code to inject or indent.')

func_source = _process_function_source(func)
end_indented, start_indented, before_lines = _process_injected_code(end, start, before_lines)
func_signature, func_body = _divide_source(func_source)
func_body = _process_body(before_lines, func_body, indent_lines)
func_source = _get_function_source(func)
func_signature, func_body_lines = _divide_source(func_source)

_put_wrappers_in_insert_lines(start, end, func_body_lines, insert_lines)
if indent_inner:
func_body = _indent_code(func_body, indent_inner, _INDENT_STRING)
func_body_lines = _indent_source_lines(func_body_lines, indent_inner)

modified_source = func_signature \
+ start_indented \
+ func_body \
+ end_indented
insert_lines = _indent_inserted_lines(insert_lines)
func_body = _process_body(insert_lines, func_body_lines, indent_lines)

modified_source = func_signature + func_body
return modified_source


def _process_body(before_lines, func_body, indent_lines):
body_lines = list(filter(lambda line: line, func_body.splitlines())) # remove empty lines
def _put_wrappers_in_insert_lines(start, end, func_body_lines, insert_lines):
_validate_no_collisions_between_wrappers_and_inserts(end, func_body_lines, insert_lines, start)

if start:
insert_lines[0] = start
if end:
# if we want to add code at the end, we need a blank line to add the code _before_ of
last_line_index = len(func_body_lines)
func_body_lines.append(_SourceLine('', last_line_index))
insert_lines[last_line_index] = end


def _indent_source_lines(func_body_lines, indent_level):
return [_SourceLine(_indent_code(line.code, indent_level), line.number)
for line in func_body_lines]


def _validate_no_collisions_between_wrappers_and_inserts(end, func_body, insert_lines, start):
# TODO: Probably refactor to have this in the central validator function
if (0 in insert_lines) and start:
raise ValueError('Can\'t both insert line at index 0 and set \'start\' argument')
if (len(func_body) in insert_lines) and end:
raise ValueError('Can\'t both insert line on the last index (+1) and set \'end\' argument')


def _process_body(insert_lines, func_body_lines, indent_lines):
# body_lines = list(filter(lambda line: line, func_body.splitlines())) # remove empty lines
processed_lines = []

for linenum, line in enumerate(body_lines):
for line, linenum in func_body_lines:
indent_line = linenum in indent_lines
if indent_line:
indent_level = indent_lines[linenum]
line = _indent_code(line, indent_level, _INDENT_STRING)
line = _indent_code(line, indent_level)

inject_line = linenum in before_lines
inject_line = linenum in insert_lines
if inject_line:
line_to_inject = before_lines[linenum]
line_to_inject = insert_lines[linenum]
processed_lines.append(line_to_inject)

processed_lines.append(line)
Expand All @@ -197,46 +187,48 @@ def _divide_source(func_source):

func_signature = func_source[:func_sig_end_index]
if func_signature[-1].isspace():
raise RuntimeError('This shouldn\'t happen - the regex should never return a signature ending with whitespace.')

func_body = func_source[func_sig_end_index:].strip('\n')
raise AssertionError('This shouldn\'t happen - the regex should never return a signature ending with whitespace.')

func_signature = func_signature + '\n'
func_body = '\n' + func_body + '\n'

return func_signature, func_body
func_body = func_source[func_sig_end_index:].strip('\n')
# func_body = '\n' + func_body + '\n'
func_body = func_body
body_lines = [_SourceLine(code, linenum) for linenum, code in enumerate(func_body.splitlines())]

return func_signature, body_lines
# return func_signature, '\n'.join(line.code for line in body_lines)
# return func_signature, func_body


def _process_injected_code(end, start, before_lines):
def _indent_inserted_lines(insert_lines):
# added code will always have an indentation level of 1 - immediately under the function
start_indented = _indent_injected_code(start)
end_indented = _indent_injected_code(end)
before_lines = {linenum: _indent_injected_code(code) for linenum, code in before_lines.items()}
insert_lines = {linenum: _indent_code(code, indent_level=1) for linenum, code in insert_lines.items()}

return end_indented, start_indented, before_lines
return insert_lines


def _indent_injected_code(start):
# TODO: probably pretty much the same code and _indent_lines, consider merging the two
if start:
start_indented = '\n'
start_indented += _INDENT_STRING + ('\n' + _INDENT_STRING).join(start.splitlines())
start_indented += '\n'
else:
start_indented = start
return start_indented
# def _indent_line(code):
# # TODO: probably pretty much the same code and _indent_lines, consider merging the two
# if code:
# start_indented = '\n'
# start_indented += _INDENT_STRING + ('\n' + _INDENT_STRING).join(code.splitlines())
# start_indented += '\n'
# else:
# start_indented = code
# return start_indented


def _process_function_source(func):
def _get_function_source(func):
func_source = inspect.getsource(func)
func_source = _unindent_source(func_source)
func_source = _strip_leading_decorators(func_source)
return func_source


def _indent_code(code, indent_level, indent_string):
relative_indent_string = indent_level * indent_string
return relative_indent_string + ('\n' + relative_indent_string).join(code.splitlines())
def _indent_code(code, indent_level):
relative_indent_string = indent_level * _INDENT_STRING
return '\n' + relative_indent_string + ('\n' + relative_indent_string).join(code.splitlines()) + '\n'


def _unindent_source(func_source):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="monki",
version="0.1.0",
version="0.1.1",
description="Patch functions at runtime, the easy way.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 669084c

Please sign in to comment.