Skip to content

Commit d3dab06

Browse files
committed
Draft
1 parent af01e50 commit d3dab06

File tree

3 files changed

+94
-47
lines changed

3 files changed

+94
-47
lines changed

src/apify/storage_clients/_apify/_request_queue_single_client.py

Lines changed: 58 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata
1111

1212
from apify import Request
13-
from apify.storage_clients._apify._utils import unique_key_to_request_id
13+
from apify.storage_clients._apify._utils import _Request, unique_key_to_request_id
1414

1515
if TYPE_CHECKING:
1616
from collections.abc import Sequence
@@ -105,25 +105,30 @@ async def add_batch_of_requests(
105105
already_present_requests: list[ProcessedRequest] = []
106106

107107
for request in requests:
108+
# Calculate id for request
109+
_request = _Request.model_validate(request.model_dump())
110+
108111
# Check if request is known to be already handled (it has to be present as well.)
109-
if request.unique_key in self._requests_already_handled:
112+
if _request.id in self._requests_already_handled:
110113
already_present_requests.append(
111114
ProcessedRequest.model_validate(
112115
{
113-
'uniqueKey': request.unique_key,
116+
'id': _request.id,
117+
'uniqueKey': _request.unique_key,
114118
'wasAlreadyPresent': True,
115119
'wasAlreadyHandled': True,
116120
}
117121
)
118122
)
119123
# Check if request is known to be already present, but unhandled
120-
elif self._requests_cache.get(request.unique_key):
124+
elif self._requests_cache.get(_request.id):
121125
already_present_requests.append(
122126
ProcessedRequest.model_validate(
123127
{
124-
'uniqueKey': request.unique_key,
128+
'id': _request.id,
129+
'uniqueKey': _request.unique_key,
125130
'wasAlreadyPresent': True,
126-
'wasAlreadyHandled': request.was_already_handled,
131+
'wasAlreadyHandled': _request.was_already_handled,
127132
}
128133
)
129134
)
@@ -132,11 +137,11 @@ async def add_batch_of_requests(
132137
new_requests.append(request)
133138

134139
# Update local caches
135-
self._requests_cache[request.unique_key] = request
140+
self._requests_cache[_request.id] = request
136141
if forefront:
137-
self._head_requests.append(request.unique_key)
142+
self._head_requests.append(_request.id)
138143
else:
139-
self._head_requests.appendleft(request.unique_key)
144+
self._head_requests.appendleft(_request.id)
140145

141146
if new_requests:
142147
# Prepare requests for API by converting to dictionaries.
@@ -155,7 +160,7 @@ async def add_batch_of_requests(
155160
api_response.processed_requests.extend(already_present_requests)
156161
# Remove unprocessed requests from the cache
157162
for unprocessed_request in api_response.unprocessed_requests:
158-
self._requests_cache.pop(unprocessed_request.unique_key, None)
163+
self._requests_cache.pop(unique_key_to_request_id(unprocessed_request.unique_key), None)
159164

160165
else:
161166
api_response = AddRequestsResponse.model_validate(
@@ -181,10 +186,21 @@ async def get_request(self, unique_key: str) -> Request | None:
181186
Returns:
182187
The request or None if not found.
183188
"""
184-
if unique_key in self._requests_cache:
185-
return self._requests_cache[unique_key]
189+
return await self._get_request(id=unique_key_to_request_id(unique_key))
190+
191+
async def _get_request(self, id: str) -> Request | None:
192+
"""Get a request by unique key.
193+
194+
Args:
195+
id: Id of request to get.
196+
197+
Returns:
198+
The request or None if not found.
199+
"""
200+
if id in self._requests_cache:
201+
return self._requests_cache[id]
186202

187-
response = await self._api_client.get_request(unique_key_to_request_id(unique_key))
203+
response = await self._api_client.get_request(id)
188204

189205
if response is None:
190206
return None
@@ -205,13 +221,10 @@ async def fetch_next_request(self) -> Request | None:
205221
await self._ensure_head_is_non_empty()
206222

207223
while self._head_requests:
208-
request_unique_key = self._head_requests.pop()
209-
if (
210-
request_unique_key not in self._requests_in_progress
211-
and request_unique_key not in self._requests_already_handled
212-
):
213-
self._requests_in_progress.add(request_unique_key)
214-
return await self.get_request(request_unique_key)
224+
request_id = self._head_requests.pop()
225+
if request_id not in self._requests_in_progress and request_id not in self._requests_already_handled:
226+
self._requests_in_progress.add(request_id)
227+
return await self._get_request(request_id)
215228
# No request locally and the ones returned from the platform are already in progress.
216229
return None
217230

@@ -236,31 +249,24 @@ async def _list_head(self) -> None:
236249

237250
# Update the cached data
238251
for request_data in response.get('items', []):
239-
request = Request.model_validate(request_data)
252+
request = _Request.model_validate(request_data)
240253

241-
if request.unique_key in self._requests_in_progress:
254+
if request.id in self._requests_in_progress:
242255
# Ignore requests that are already in progress, we will not process them again.
243256
continue
244257
if request.was_already_handled:
245258
# Do not cache fully handled requests, we do not need them. Just cache their unique_key.
246-
self._requests_already_handled.add(request.unique_key)
259+
self._requests_already_handled.add(request.id)
247260
else:
248261
# Only fetch the request if we do not know it yet.
249-
if request.unique_key not in self._requests_cache:
250-
request_id = unique_key_to_request_id(request.unique_key)
251-
complete_request_data = await self._api_client.get_request(request_id)
252-
253-
if complete_request_data is not None:
254-
request = Request.model_validate(complete_request_data)
255-
self._requests_cache[request.unique_key] = request
256-
else:
257-
logger.warning(
258-
f'Could not fetch request data for unique_key=`{request.unique_key}` (id=`{request_id}`)'
259-
)
262+
if request.id not in self._requests_cache:
263+
complete_request_data = await self._api_client.get_request(request_data['id'])
264+
request = _Request.model_validate(complete_request_data)
265+
self._requests_cache[request.id] = request
260266

261267
# Add new requests to the end of the head, unless already present in head
262-
if request.unique_key not in self._head_requests:
263-
self._head_requests.appendleft(request.unique_key)
268+
if request.id not in self._head_requests:
269+
self._head_requests.appendleft(request.id)
264270

265271
async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None:
266272
"""Mark a request as handled after successful processing.
@@ -275,12 +281,14 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
275281
"""
276282
# Set the handled_at timestamp if not already set
277283

284+
_request = _Request.model_validate(request.model_dump())
285+
278286
if request.handled_at is None:
279287
request.handled_at = datetime.now(tz=timezone.utc)
280288
self.metadata.handled_request_count += 1
281289
self.metadata.pending_request_count -= 1
282290

283-
if cached_request := self._requests_cache.get(request.unique_key):
291+
if cached_request := self._requests_cache.get(_request.id):
284292
cached_request.handled_at = request.handled_at
285293

286294
try:
@@ -289,13 +297,13 @@ async def mark_request_as_handled(self, request: Request) -> ProcessedRequest |
289297
# adding to the queue.)
290298
processed_request = await self._update_request(request)
291299
# Remember that we handled this request, to optimize local deduplication.
292-
self._requests_already_handled.add(request.unique_key)
300+
self._requests_already_handled.add(_request.id)
293301
# Remove request from cache. It will most likely not be needed.
294-
self._requests_cache.pop(request.unique_key)
295-
self._requests_in_progress.discard(request.unique_key)
302+
self._requests_cache.pop(_request.id)
303+
self._requests_in_progress.discard(_request.id)
296304

297305
except Exception as exc:
298-
logger.debug(f'Error marking request {request.unique_key} as handled: {exc!s}')
306+
logger.debug(f'Error marking request {_request.unique_key} as handled: {exc!s}')
299307
return None
300308
else:
301309
return processed_request
@@ -319,23 +327,27 @@ async def reclaim_request(
319327
"""
320328
# Check if the request was marked as handled and clear it. When reclaiming,
321329
# we want to put the request back for processing.
330+
331+
_request = _Request.model_validate(request.model_dump())
332+
322333
if request.was_already_handled:
323334
request.handled_at = None
324335

325336
try:
326337
# Make sure request is in the local cache. We might need it.
327-
self._requests_cache[request.unique_key] = request
338+
self._requests_cache[_request.id] = request
328339

329340
# No longer in progress
330-
self._requests_in_progress.discard(request.unique_key)
341+
self._requests_in_progress.discard(_request.id)
331342
# No longer handled
332-
self._requests_already_handled.discard(request.unique_key)
343+
self._requests_already_handled.discard(_request.id)
333344

334345
if forefront:
335346
# Append to top of the local head estimation
336-
self._head_requests.append(request.unique_key)
347+
self._head_requests.append(_request.id)
337348

338349
processed_request = await self._update_request(request, forefront=forefront)
350+
processed_request.id = _request.id
339351
processed_request.unique_key = request.unique_key
340352
# If the request was previously handled, decrement our handled count since
341353
# we're putting it back for processing.
@@ -396,7 +408,7 @@ async def _init_caches(self) -> None:
396408
"""
397409
response = await self._api_client.list_requests(limit=10_000)
398410
for request_data in response.get('items', []):
399-
request = Request.model_validate(request_data)
411+
request = _Request.model_validate(request_data)
400412
if request.was_already_handled:
401413
# Cache just unique_key for deduplication
402414
self._requests_already_handled.add(request.unique_key)

src/apify/storage_clients/_apify/_utils.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@
66
from base64 import b64encode
77
from hashlib import sha256
88
from logging import getLogger
9-
from typing import TYPE_CHECKING, ClassVar
9+
from typing import TYPE_CHECKING, Annotated, ClassVar
10+
11+
from pydantic import Field, model_validator
1012

1113
from apify_client import ApifyClientAsync
14+
from crawlee import Request
1215
from crawlee._utils.crypto import compute_short_hash
1316

1417
from apify._configuration import Configuration
@@ -192,3 +195,13 @@ def unique_key_to_request_id(unique_key: str, *, request_id_length: int = 15) ->
192195

193196
# Truncate the key to the desired length
194197
return url_safe_key[:request_id_length]
198+
199+
200+
class _Request(Request):
201+
id: Annotated[str, Field(default='')]
202+
203+
@model_validator(mode='after')
204+
def calculate_id(self) -> _Request:
205+
if self.id == '':
206+
self.id = unique_key_to_request_id(self.unique_key)
207+
return self

tests/integration/test_request_queue.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from ._utils import generate_unique_resource_name
1414
from apify import Actor
1515
from apify.storage_clients import ApifyStorageClient
16+
from apify.storage_clients._apify._utils import unique_key_to_request_id
1617
from apify.storages import RequestQueue
1718

1819
if TYPE_CHECKING:
@@ -1189,3 +1190,24 @@ async def test_request_queue_has_stats(request_queue_apify: RequestQueue) -> Non
11891190
assert hasattr(metadata, 'stats')
11901191
apify_metadata = cast('ApifyRequestQueueMetadata', metadata)
11911192
assert apify_metadata.stats.write_count == add_request_count
1193+
1194+
1195+
async def test_long_request(request_queue_apify: RequestQueue) -> None:
1196+
request = Request.from_url(
1197+
'https://portal.isoss.gov.cz/irj/portal/anonymous/mvrest?path=/eosm-public-offer&officeLabels=%7B%7D&page=1&pageSize=100000&sortColumn=zdatzvsm&sortOrder=-1',
1198+
use_extended_unique_key=True,
1199+
always_enqueue=True,
1200+
)
1201+
1202+
request_id = unique_key_to_request_id(request.unique_key)
1203+
1204+
processed_request = await request_queue_apify.add_request(request)
1205+
assert processed_request.id == request_id
1206+
1207+
request_obtained = await request_queue_apify.fetch_next_request()
1208+
assert request_obtained is not None
1209+
1210+
await request_queue_apify.mark_request_as_handled(request_obtained)
1211+
1212+
is_finished = await request_queue_apify.is_finished()
1213+
assert is_finished

0 commit comments

Comments
 (0)