-
Notifications
You must be signed in to change notification settings - Fork 616
fix: Protect Request from partial mutations on request handler failure
#1585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dd35256
6b014a7
22a81ba
32e16a5
acdfd99
d0c9a7d
b91ef31
851ca51
f9bd1e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,7 @@ | |
| from crawlee.storages import Dataset, KeyValueStore, RequestQueue | ||
|
|
||
| from ._context_pipeline import ContextPipeline | ||
| from ._context_utils import swaped_context | ||
| from ._logging_utils import ( | ||
| get_one_line_error_summary_if_possible, | ||
| reduce_asyncio_timeout_error_to_relevant_traceback_parts, | ||
|
|
@@ -1321,6 +1322,8 @@ async def _commit_request_handler_result(self, context: BasicCrawlingContext) -> | |
|
|
||
| await self._commit_key_value_store_changes(result, get_kvs=self.get_key_value_store) | ||
|
|
||
| result.apply_request_changes(target=context.request) | ||
|
|
||
| @staticmethod | ||
| async def _commit_key_value_store_changes( | ||
| result: RequestHandlerRunResult, get_kvs: GetKeyValueStoreFromRequestHandlerFunction | ||
|
|
@@ -1386,10 +1389,10 @@ async def __run_task_function(self) -> None: | |
| else: | ||
| session = await self._get_session() | ||
| proxy_info = await self._get_proxy_info(request, session) | ||
| result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store) | ||
| result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store, request=request) | ||
|
|
||
| context = BasicCrawlingContext( | ||
| request=request, | ||
| request=result.request, | ||
| session=session, | ||
| proxy_info=proxy_info, | ||
| send_request=self._prepare_send_request_function(session, proxy_info), | ||
|
|
@@ -1404,10 +1407,12 @@ async def __run_task_function(self) -> None: | |
| self._statistics.record_request_processing_start(request.unique_key) | ||
|
|
||
| try: | ||
| self._check_request_collision(context.request, context.session) | ||
| request.state = RequestState.REQUEST_HANDLER | ||
|
|
||
| try: | ||
| await self._run_request_handler(context=context) | ||
| with swaped_context(context, request): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name is somewhat confusing. It sets the original Request object to the readonly property
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this was done so that Outside of But yes, it is not intuitive and makes the code more difficult to read. |
||
| self._check_request_collision(request, session) | ||
| await self._run_request_handler(context=context) | ||
| except asyncio.TimeoutError as e: | ||
| raise RequestHandlerError(e, context) from e | ||
|
|
||
|
|
@@ -1417,13 +1422,13 @@ async def __run_task_function(self) -> None: | |
|
|
||
| await self._mark_request_as_handled(request) | ||
|
|
||
| if context.session and context.session.is_usable: | ||
| context.session.mark_good() | ||
| if session and session.is_usable: | ||
| session.mark_good() | ||
|
|
||
| self._statistics.record_request_processing_finish(request.unique_key) | ||
|
|
||
| except RequestCollisionError as request_error: | ||
| context.request.no_retry = True | ||
| request.no_retry = True | ||
| await self._handle_request_error(context, request_error) | ||
|
|
||
| except RequestHandlerError as primary_error: | ||
|
|
@@ -1438,7 +1443,7 @@ async def __run_task_function(self) -> None: | |
| await self._handle_request_error(primary_error.crawling_context, primary_error.wrapped_exception) | ||
|
|
||
| except SessionError as session_error: | ||
| if not context.session: | ||
| if not session: | ||
| raise RuntimeError('SessionError raised in a crawling context without a session') from session_error | ||
|
|
||
| if self._error_handler: | ||
|
|
@@ -1448,10 +1453,11 @@ async def __run_task_function(self) -> None: | |
| exc_only = ''.join(traceback.format_exception_only(session_error)).strip() | ||
| self._logger.warning('Encountered "%s", rotating session and retrying...', exc_only) | ||
|
|
||
| context.session.retire() | ||
| if session: | ||
| session.retire() | ||
|
|
||
| # Increment session rotation count. | ||
| context.request.session_rotation_count = (context.request.session_rotation_count or 0) + 1 | ||
| request.session_rotation_count = (request.session_rotation_count or 0) + 1 | ||
|
|
||
| await request_manager.reclaim_request(request) | ||
| await self._statistics.error_tracker_retry.add(error=session_error, context=context) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from contextlib import contextmanager | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Iterator | ||
|
|
||
| from crawlee._request import Request | ||
|
|
||
| from ._basic_crawling_context import BasicCrawlingContext | ||
|
|
||
|
|
||
| @contextmanager | ||
| def swaped_context( | ||
| context: BasicCrawlingContext, | ||
| request: Request, | ||
| ) -> Iterator[None]: | ||
| """Replace context's isolated copies with originals after handler execution.""" | ||
| try: | ||
| yield | ||
| finally: | ||
| # Restore original context state to avoid side effects between different handlers. | ||
| object.__setattr__(context, 'request', request) |
Uh oh!
There was an error while loading. Please reload this page.