Skip to content

Code snippets

Thor Whalen edited this page Dec 7, 2021 · 4 revisions

Segment a flattened stream

Note: Put this in issue: https://github.com/i2mint/creek/issues/3

What we need is a function segment_indices(indices: Iterable, intervals: Iterable[Typle[BT, TT]]) that yields groups of indices according to containment of the indices in the intervals. To be clear, both indices and intervals should be iterated over once only.

The target use case: Making slabs out of merged/flattened streams.

See we have:

from creek.multi_streams import MergedStreams

streams_map = {
         'hello': [(2, 'two'), (3, 'three'), (5, 'five')],
         'world': [(0, 'zero'), (1, 'one'), (3, 'three'), (6, 'six')]
     }
flattened_multi_stream = list(MergedStreams(streams_map))
assert flattened_multi_stream == [
    ('world', (0, 'zero')),
    ('world', (1, 'one')),
    ('hello', (2, 'two')),
    ('hello', (3, 'three')),
    ('world', (3, 'three')),
    ('hello', (5, 'five')),
    ('world', (6, 'six'))
]

Given an index/timestamp (e.g. the 0 of (0, 'zero')) segmentation definition (example, chunked intervals, or an (monotonous) iterable of intervals) we want to get an iterator/generator of groups of data items, one per segment.

This works for chunking:

from itertools import groupby

def make_slabs_from_chunked_indices(flattened_multi_stream, chk_size):
    grouped_by_chunk = groupby(flattened_multi_stream, 
                               key=lambda x: x[1][0] // 2)
    return map(lambda x: list(x[1]), grouped_by_chunk)

assert list(make_slabs_from_chunked_indices(flattened_multi_stream, 2)) == [
    [('world', (0, 'zero')), ('world', (1, 'one'))],
    [('hello', (2, 'two')), ('hello', (3, 'three')), ('world', (3, 'three'))],
    [('hello', (5, 'five'))],
    [('world', (6, 'six'))]
]

Something like this could work for intervals in general, but is not very clean.

from creek.tools import dynamically_index, IndexUpdater, segment_overlaps
from typing import Iterator
inf = float('inf')

intervals_index_update: IndexUpdater


def intervals_index_update(current_index, data_item, intervals):
    # make sure intervals is an iterator (needs to be "consumed")
    assert isinstance(
        intervals, Iterator), "intervals needs to be an iterator (feed me iter(intervals))"
    bt, tt = current_index  # we're indexing with intervals here!
    if bt <= data_item < tt:
        return current_index
    else:
        return next(intervals, None)


flattened_multi_stream = list(MergedStreams(streams_map))

indices = list(map(lambda x: x[1][0], flattened_multi_stream))
intervals = [(0, 2), (2, 4), (4, 6), (6, 8)]

assert list(dynamically_index(indices, start=(inf, inf),
                              idx_updater=partial(intervals_index_update,
                                                  intervals=iter(intervals)))) == [
    ((inf, inf), 0),
    ((0, 2), 1),
    ((0, 2), 2),
    ((2, 4), 3),
    ((2, 4), 3),
    ((2, 4), 5),
    ((4, 6), 6)
]

Perhaps the dynamical indexing setup should be revised?

Make a segment reader from an iterable source

TODO: I don't like using nonlocal when I don't need to. I don't think I need to here. Revise.

from itertools import islice


def segmenter(src):
    """Makes a segment reader from an iterable source.
    
    Yields ``(idx, segment)`` pairs.
    
    >>> src = [1, 2, 3, 4, 5, 6]
    >>> get_next_segment = segmenter(src)  # try a subscriptable src
    >>> assert get_next_segment(2) == (2, [1, 2])
    >>> assert get_next_segment(3) == (5, [3, 4, 5])
    >>> assert get_next_segment(4) == (9, [6])
    >>>
    >>> get_next_segment = segmenter(iter(src))  # try a non-subscriptable src
    >>> assert get_next_segment(2) == (2, [1, 2])
    >>> assert get_next_segment(3) == (5, [3, 4, 5])
    >>> assert get_next_segment(4) == (9, [6])
    """
    i = 0
    if hasattr(src, '__getitem__'):
        def get_next_segment(n):
            nonlocal i
            seg = src[i:(i + n)]
            i += n  # Warning: Should use i += len(src) if src is "live"
            return i, seg
    else:
        src = iter(src)

        def get_next_segment(n):
            nonlocal i
            seg = list(islice(src, n))
            i += n  # Warning: Should use i += len(src) if src is "live"
            return i, seg
    return get_next_segment

Try it out:

src = [1, 2, 3, 4, 5, 6]
get_next_segment = segmenter(src)  # try a subscriptable src
assert get_next_segment(2) == (2, [1, 2])
assert get_next_segment(3) == (5, [3, 4, 5])
assert get_next_segment(4) == (9, [6])

get_next_segment = segmenter(iter(src))  # try a non-subscriptable src
assert get_next_segment(2) == (2, [1, 2])
assert get_next_segment(3) == (5, [3, 4, 5])
assert get_next_segment(4) == (9, [6])