Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapper to assign futures to the correct event loop #308

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions scrapy_playwright/_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import inspect
import logging
import platform
import threading
Expand All @@ -8,6 +9,7 @@
from playwright.async_api import Error, Page, Request, Response
from scrapy.http.headers import Headers
from scrapy.settings import Settings
from scrapy.utils.asyncgen import collect_asyncgen
from scrapy.utils.python import to_unicode
from twisted.internet.defer import Deferred
from w3lib.encoding import html_body_declared_encoding, http_content_type_encoding
Expand Down Expand Up @@ -117,6 +119,7 @@
@classmethod
async def _handle_coro(cls, coro, future) -> None:
try:
coro = collect_asyncgen(coro) if inspect.isasyncgen(coro) else coro

Check warning on line 122 in scrapy_playwright/_utils.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/_utils.py#L122

Added line #L122 was not covered by tests
future.set_result(await coro)
except Exception as exc:
future.set_exception(exc)
Expand All @@ -129,10 +132,14 @@
cls._coro_queue.task_done()

@classmethod
def _deferred_from_coro(cls, coro) -> Deferred:
def _ensure_future(cls, coro: Awaitable) -> asyncio.Future:
future: asyncio.Future = asyncio.Future()
asyncio.run_coroutine_threadsafe(cls._coro_queue.put((coro, future)), cls._loop)
return scrapy.utils.defer.deferred_from_coro(future)
return future

Check warning on line 138 in scrapy_playwright/_utils.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/_utils.py#L138

Added line #L138 was not covered by tests

@classmethod
def _deferred_from_coro(cls, coro: Awaitable) -> Deferred:
return scrapy.utils.defer.deferred_from_coro(cls._ensure_future(coro))

Check warning on line 142 in scrapy_playwright/_utils.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/_utils.py#L142

Added line #L142 was not covered by tests

@classmethod
def start(cls, caller_id: int) -> None:
Expand Down
3 changes: 3 additions & 0 deletions scrapy_playwright/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@
self.config = Config.from_settings(crawler.settings)

if self.config.use_threaded_loop:
logger.warning("Starting threaded loop")

Check warning on line 145 in scrapy_playwright/handler.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/handler.py#L145

Added line #L145 was not covered by tests
_ThreadedLoopAdapter.start(id(self))
else:
logger.warning("NOT starting threaded loop")

self.browser_launch_lock = asyncio.Lock()
self.context_launch_lock = asyncio.Lock()
Expand Down
54 changes: 54 additions & 0 deletions scrapy_playwright/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import functools
import inspect
from typing import Callable

from ._utils import _ThreadedLoopAdapter


async def _run_async_gen(asyncgen):
async for item in asyncgen:
yield item

Check warning on line 10 in scrapy_playwright/utils.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/utils.py#L9-L10

Added lines #L9 - L10 were not covered by tests


def use_threaded_loop(callback) -> Callable:
"""Wrap a coroutine callback so that Playwright coroutines are executed in
the threaded event loop.

On windows, Playwright runs in an event loop of its own in a separate thread.
If Playwright coroutines are awaited directly, they are assigned to the main
thread's event loop, resulting in: "ValueError: The future belongs to a
different loop than the one specified as the loop argument"

Usage:
```
from playwright.async_api import Page
from scrapy_playwright.utils import use_threaded_loop

@use_threaded_loop
async def parse(self, response):
page: Page = response.meta["playwright_page"]
await page.screenshot(path="example.png", full_page=True)
```
"""

if not inspect.iscoroutinefunction(callback) and not inspect.isasyncgenfunction(callback):
raise RuntimeError(

Check warning on line 35 in scrapy_playwright/utils.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/utils.py#L35

Added line #L35 was not covered by tests
f"Cannot decorate callback '{callback.__name__}' with 'use_threaded_loop':"
" callback must be a coroutine function or an async generator"
)

@functools.wraps(callback)
async def async_func_wrapper(*args, **kwargs):
future = _ThreadedLoopAdapter._ensure_future(callback(*args, **kwargs))
return await future

Check warning on line 43 in scrapy_playwright/utils.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/utils.py#L42-L43

Added lines #L42 - L43 were not covered by tests

@functools.wraps(callback)
async def async_gen_wrapper(*args, **kwargs):
asyncgen = _run_async_gen(callback(*args, **kwargs))
future = _ThreadedLoopAdapter._ensure_future(asyncgen)
for item in await future:
yield item

Check warning on line 50 in scrapy_playwright/utils.py

View check run for this annotation

Codecov / codecov/patch

scrapy_playwright/utils.py#L47-L50

Added lines #L47 - L50 were not covered by tests

if inspect.isasyncgenfunction(callback):
return async_gen_wrapper
return async_func_wrapper
93 changes: 93 additions & 0 deletions tests/tests_asyncio/test_threaded_loop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import platform
from unittest import TestCase

import pytest
import scrapy
from playwright.async_api import Page
from scrapy import signals
from scrapy.crawler import CrawlerProcess
from scrapy.utils.test import get_crawler
from scrapy_playwright.utils import use_threaded_loop

from tests.mockserver import StaticMockServer


class ThreadedLoopSpider(scrapy.Spider):
name = "threaded_loop"
start_url: str

def start_requests(self):
yield scrapy.Request(
url=self.start_url,
meta={"playwright": True, "playwright_include_page": True},
)

@use_threaded_loop
async def parse(self, response, **kwargs): # pylint: disable=invalid-overridden-method
"""async generator"""
page: Page = response.meta["playwright_page"]
title = await page.title()
await page.close()
yield {"url": response.url, "title": title}
yield scrapy.Request(
url=response.url + "?foo=bar",
meta={"playwright": True, "playwright_include_page": True},
callback=self.parse_2,
)

@use_threaded_loop
async def parse_2(self, response):
page: Page = response.meta["playwright_page"]
title = await page.title()
await page.close()
return {"url": response.url, "title": title}


@pytest.mark.skipif(
platform.system() != "Windows",
reason="Test threaded loop implementation only on Windows",
)
class ThreadedLoopSpiderTestCase(TestCase):
def test_threaded_loop_spider(self):
items: list = []

def collect_items(item):
items.append(item)

with StaticMockServer() as server:
index_url = server.urljoin("/index.html")
crawler = get_crawler(
spidercls=ThreadedLoopSpider,
settings_dict={
"TWISTED_REACTOR": "twisted.internet.asyncioreactor.AsyncioSelectorReactor",
"DOWNLOAD_HANDLERS": {
"http": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
},
"_PLAYWRIGHT_THREADED_LOOP": True,
},
)
crawler.signals.connect(collect_items, signals.item_scraped)
process = CrawlerProcess()
process.crawl(crawler, start_url=index_url)
process.start()

self.assertCountEqual(
items,
[
{"url": index_url, "title": "Awesome site"},
{"url": index_url + "?foo=bar", "title": "Awesome site"},
],
)

def test_use_threaded_loop_non_coroutine_function(self):
with pytest.raises(RuntimeError) as exc_info:

@use_threaded_loop
def not_a_coroutine():
pass

self.assertEqual(
str(exc_info.value),
"Cannot decorate callback 'not_a_coroutine' with 'use_threaded_loop':"
" callback must be a coroutine function or an async generator",
)
Loading