Skip to content

Commit

Permalink
Fix accesses of Task.children that did not use proper synchronization
Browse files Browse the repository at this point in the history
Now all accesses to Task.children require that either:
* the caller hold a lock on the scheduler thread, or
* the Task not be added to a RootTask yet.

It is now possible to download the following group without crashing:
* https://minimalistbaker.com/recipes/*/page/#/
  • Loading branch information
davidfstr committed Feb 5, 2024
2 parents 3c4bea9 + 369ea90 commit b03069f
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 262 deletions.
375 changes: 180 additions & 195 deletions src/crystal/browser/tasktree.py

Large diffs are not rendered by default.

166 changes: 133 additions & 33 deletions src/crystal/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from contextlib import AbstractContextManager, nullcontext
from contextlib import AbstractContextManager, contextmanager, nullcontext
import cProfile
from crystal.util.caffeination import Caffeination
from crystal.util.listenable import ListenableMixin
Expand All @@ -16,18 +16,22 @@
from crystal.util.xsqlite3 import is_database_closed_error
from crystal.util.xthreading import (
bg_affinity, bg_call_later, fg_affinity, fg_call_and_wait, fg_call_later,
fg_waiting_calling_thread,
is_foreground_thread, NoForegroundThreadError
)
from functools import wraps
import os
from overrides import overrides
import shutil
import sys
import threading
from time import sleep
import traceback
from typing import (
Any, Callable, cast, final, List, Literal, Iterator, Optional,
Sequence, Tuple, TYPE_CHECKING, Union
Sequence, Tuple, TYPE_CHECKING, TypeVar, Union
)
from typing_extensions import ParamSpec
from weakref import WeakSet

if TYPE_CHECKING:
Expand All @@ -44,6 +48,35 @@
_PROFILE_SCHEDULER = False


_P = ParamSpec('_P')
_R = TypeVar('_R')


# ------------------------------------------------------------------------------
# Scheduler (Early)

def scheduler_affinity(func: Callable[_P, _R]) -> Callable[_P, _R]:
"""
Marks the decorated function as needing to be called from either the
scheduler thread or a task that is synced with the scheduler thread.
Calling the decorated function from an inappropriate context will immediately
raise an AssertionError.
The following kinds of manipulations need to happen on the scheduler thread:
- read/writes to Task.children,
except for the RootTask (which only requires accesses to be on the foreground thread)
"""
if __debug__: # no -O passed on command line?
@wraps(func)
def wrapper(*args, **kwargs):
assert is_synced_with_scheduler_thread()
return func(*args, **kwargs)
return wrapper
else:
return func


# ------------------------------------------------------------------------------
# Task

Expand Down Expand Up @@ -187,9 +220,24 @@ def children(self) -> Sequence[Task]:
Callers should use append_child() instead of modifying the returned list.
Note that some Task subclasses - notably RootTask - require (and enforce)
that accesses to the children list occur on a particular thread.
Task subclasses require (and enforce) that accesses to the
children list occur on a particular thread, usually the scheduler thread.
"""
# For tasks that have been added to the task tree,
# force any children access to synchronize with scheduler thread
if self.parent is not None:
if not is_synced_with_scheduler_thread():
cur_task = self
while cur_task.parent is not None:
cur_task = cur_task.parent
self_within_root_task = isinstance(cur_task, RootTask)

if self_within_root_task:
raise AssertionError(
'Unsafe to access children of a task within a RootTask '
'without the caller being synchronized with '
'the scheduler thread')

return self._children

@final
Expand Down Expand Up @@ -393,8 +441,12 @@ def clear_completed_children(self) -> None:
child_indexes_to_remove = [i for (i, c) in enumerate(self._children) if c.complete] # capture
if len(child_indexes_to_remove) == 0:
return
for child in [c for c in self._children if c.complete]:
child._parent = None
if self._use_extra_listener_assertions:
assert self not in child.listeners
self._children = [c for c in self.children if not c.complete]
self._num_children_complete -= len(child_indexes_to_remove)
self._num_children_complete = 0

# NOTE: Call these listeners also inside the lock
# because they are likely to be updating
Expand Down Expand Up @@ -1513,38 +1565,61 @@ class RootTask(Task):
scheduling_style = SCHEDULING_STYLE_ROUND_ROBIN
all_children_complete_implies_this_task_complete = False

def __init__(self):
def __init__(self) -> None:
super().__init__(title='ROOT')
self.subtitle = 'Running'
self._children_to_add_soon = [] # type: List[Tuple[Task, bool]]

@property # type: ignore[misc]
@fg_affinity # force any access to synchronize with foreground thread
@overrides
def children(self) -> Sequence[Task]:
return super().children
# NOTE: Bypass the usual thread synchronization check in super().children,
# because here the analogous check is handled by @fg_affinity
return self._children

@overrides
def append_child(self, child: Task, *args, **kwargs) -> None:
def append_child(self, child: Task, *, already_complete_ok: bool=False) -> None:
"""
Appends a child to this RootTasks's children, queuing it to be
scheduled soon.
Can be called from any thread.
Raises:
* ProjectClosedError -- if this project is closed
"""
def fg_task() -> None:
assert child not in self.children
assert child not in [c for (c, _) in self._children_to_add_soon]

if self.complete:
from crystal.model import ProjectClosedError
raise ProjectClosedError()

super(RootTask, self).append_child(child, *args, **kwargs)
# NOTE: Must synchronize access to RootTask.children with foreground thread
# Defer append child until next call to RootTask.try_get_next_task_unit(),
# which will have a lock on the scheduler thread (and access to Task.children)
self._children_to_add_soon.append((child, already_complete_ok))
# NOTE: Must synchronize access to {self.children,
# self._children_to_add_soon, self.complete} with foreground thread
fg_call_and_wait(fg_task)

@fg_affinity
@scheduler_affinity
def try_get_next_task_unit(self):
if self.complete:
return None

if len(self._children_to_add_soon) != 0:
children_to_add_soon = list(self._children_to_add_soon) # capture
self._children_to_add_soon.clear()

# Append deferred children
for (child, already_complete_ok) in children_to_add_soon:
super().append_child(child, already_complete_ok=already_complete_ok)
assert len(self._children_to_add_soon) == 0, \
'RootTask._children_to_add_soon was modified concurrently unexpectedly'

# Only the root task is allowed to have no children normally
if len(self.children) == 0:
return None
Expand Down Expand Up @@ -1585,39 +1660,23 @@ def __repr__(self) -> str:


# ------------------------------------------------------------------------------
# Schedule
# Scheduler

# TODO: Eliminate polling by adding logic to sleep appropriately until the
# root task has more children to process.
_ROOT_TASK_POLL_INTERVAL = .1 # secs


# TODO: Move production implementation of scheduling logic from
# start_schedule_forever() to this method and delegate from that
# method to this method. Such movement will reduce logic duplication.
def schedule_forever(task: Task) -> None:
"""
Runs the specified task synchronously until it completes.
This function is intended for testing.
"""
while True:
unit = task.try_get_next_task_unit()
if unit is None:
if task.complete:
break
else:
sleep(_ROOT_TASK_POLL_INTERVAL)
continue
unit()


def start_schedule_forever(task: Task) -> None:
def start_schedule_forever(task: RootTask) -> None:
"""
Asynchronously runs the specified task until it completes,
or until there is no foreground thread remaining.
"""
def bg_task() -> None:
setattr(threading.current_thread(), '_cr_is_scheduler_thread', True)
assert _is_scheduler_thread()
assert is_synced_with_scheduler_thread()

if _PROFILE_SCHEDULER:
profiling_context = cProfile.Profile() # type: AbstractContextManager[Optional[cProfile.Profile]]
else:
Expand All @@ -1627,6 +1686,8 @@ def bg_task() -> None:
try:
with profiling_context as profiler:
while True:
#@fg_affinity
#@scheduler_affinity
def fg_task() -> Tuple[Optional[Callable[[], None]], bool]:
return (task.try_get_next_task_unit(), task.complete)
try:
Expand All @@ -1641,7 +1702,7 @@ def fg_task() -> Tuple[Optional[Callable[[], None]], bool]:
sleep(_ROOT_TASK_POLL_INTERVAL)
continue
try:
unit() # Run unit directly on this bg thread
unit() # Run unit directly on this scheduler thread
except NoForegroundThreadError:
# Probably the app was closed. Ignore error.
return
Expand Down Expand Up @@ -1670,4 +1731,43 @@ def fg_task() -> Tuple[Optional[Callable[[], None]], bool]:
bg_call_later(bg_task, daemon=True)


def is_synced_with_scheduler_thread() -> bool:
"""
Returns whether this thread is a scheduler thread,
or this thread is running a task that a scheduler thread is waiting on.
"""
if _is_scheduler_thread():
return True
if is_foreground_thread():
if _is_scheduler_thread(fg_waiting_calling_thread()):
return True
return False


def _is_scheduler_thread(thread: Optional[threading.Thread]=None) -> bool:
"""
Returns whether this thread is a scheduler thread,
responsible for running Tasks in a Project.
"""
if thread is None:
thread = threading.current_thread()
return getattr(thread, '_cr_is_scheduler_thread', False)


@contextmanager
def scheduler_thread_context() -> Iterator[None]:
"""
Context which executes its contents as if it was on a scheduler thread.
For testing use only.
"""
old_is_scheduler_thread = _is_scheduler_thread() # capture
setattr(threading.current_thread(), '_cr_is_scheduler_thread', True)
try:
assert _is_scheduler_thread()
yield
finally:
setattr(threading.current_thread(), '_cr_is_scheduler_thread', old_is_scheduler_thread)


# ------------------------------------------------------------------------------
10 changes: 8 additions & 2 deletions src/crystal/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from crystal.task import (
ASSUME_RESOURCES_DOWNLOADED_IN_SESSION_WILL_ALWAYS_REMAIN_FRESH,
ProjectFreeSpaceTooLowError, Task
ProjectFreeSpaceTooLowError, scheduler_thread_context, Task
)
from crystal.tests.util.asserts import *
from crystal.tests.util.data import (
Expand Down Expand Up @@ -102,6 +102,7 @@ async def test_some_tasks_may_complete_immediately(subtests) -> None:

load_children_of_drg_task(drg_task, task_added_to_project=False)

# Precondition
# NOTE: The group won't appear to be immediately downloaded yet
# because no code has tried to access the lazily-loaded
# DownloadResourceTask children yet and thus doesn't
Expand All @@ -114,6 +115,12 @@ async def test_some_tasks_may_complete_immediately(subtests) -> None:
drg_task.complete
)

project.add_task(drg_task)
with scheduler_thread_context():
task_unit = project.root_task.try_get_next_task_unit()
assert task_unit is None

# Postcondition
# NOTE: Adding the DownloadResourceGroupTask to the project's
# task tree will cause the TaskTreeNode to start accessing
# the DownloadResourceTask children because it wants to
Expand All @@ -122,7 +129,6 @@ async def test_some_tasks_may_complete_immediately(subtests) -> None:
# to be created and observed as being already complete.
# With all of those children complete the ancestor
# DownloadResourceGroupTask will also be completed.
project.add_task(drg_task)
assert (True, COMIC_G_FINAL_MEMBER_COUNT, True) == (
isinstance(drg_task._download_members_task.children, AppendableLazySequence),
drg_task._download_members_task.children.cached_prefix_len
Expand Down
Loading

0 comments on commit b03069f

Please sign in to comment.