From a2f7a3205e7cd3ce7dfff97180981638b002747e Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Sun, 7 Jun 2026 11:45:34 +0200 Subject: [PATCH] fix(scrapy)!: Serialize requests and HTTP cache as JSON instead of pickle Scrapy requests stored in the Apify request queue and responses stored in the HTTP cache were serialized with pickle. Those storages hold JSON, while pickle (de)serializes a Python object graph, so both paths now use a single shared JSON serializer. Serialization: - Add scrapy/_serialization.py: a shared JSON (de)serializer used by both the request converter and the HTTP cache. Binary fields (body and the bytes-keyed headers) are base64-encoded and pydantic models (e.g. Crawlee's UserData) are dumped to plain dicts; no in-band sentinel is used, so no user value can collide with the encoding. - requests: (de)serialize via the shared serializer and, when reconstructing a request, only honor a `_class` that is already imported and is a scrapy.Request subclass instead of importing the dotted path. - httpcache: store and load cached responses as gzip-compressed JSON. Resilience and correctness: - requests: a non-JSON-serializable meta/cb_kwargs is logged with a traceback and the request is skipped (returns None per the function's contract) instead of being silently dropped or crashing; the header conversion is guarded so a request with non-UTF-8 header values is no longer dropped (its headers are preserved in the serialized request). - scheduler: reconstruct the request inside a try/except in next_request, so a malformed queue entry is logged and skipped instead of crashing the run. - httpcache: treat a malformed or legacy (pickle-format) cache entry as a cache miss so it is re-fetched and re-stored; make the cleanup item cap configurable via APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS and fix its off-by-one. Misc: - proxy middleware: fix an f-string so the TunnelError reason is interpolated, drop a stale docstring argument, and import get_basic_auth_header from utils. - logging: install the Scrapy configure_logging monkey-patch at most once. - async thread: make the run_coro default timeout configurable. - tests: regenerate the pinned fixtures for the JSON format and add coverage for binary body/headers round-trips, the sentinel-collision case, the `_class` checks, and rejection of pickle payloads. BREAKING CHANGE: Scrapy requests and HTTP cache entries are now stored as JSON instead of pickle. Entries written by a previous version (pickle format) can no longer be read; such requests are skipped and such cache entries are treated as a miss. Values in a request's `meta` and `cb_kwargs` must be JSON-serializable. --- src/apify/scrapy/_async_thread.py | 23 +- src/apify/scrapy/_logging_config.py | 32 ++- src/apify/scrapy/_serialization.py | 119 +++++++++ src/apify/scrapy/extensions/_httpcache.py | 50 +++- src/apify/scrapy/middlewares/apify_proxy.py | 5 +- src/apify/scrapy/requests.py | 90 ++++++- src/apify/scrapy/scheduler.py | 10 +- .../unit/scrapy/extensions/test_httpcache.py | 35 ++- .../scrapy/requests/test_to_apify_request.py | 18 ++ .../scrapy/requests/test_to_scrapy_request.py | 243 +++++++++++++++++- 10 files changed, 572 insertions(+), 53 deletions(-) create mode 100644 src/apify/scrapy/_serialization.py diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 79de1162d..16665b6f2 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -14,13 +14,18 @@ class AsyncThread: - """Class for running an asyncio event loop in a separate thread. - - This allows running asynchronous coroutines from synchronous code by executingthem on an event loop - that runs in its own dedicated thread. + """Run an asyncio event loop in a dedicated background thread. + + This lets synchronous Scrapy callbacks drive asynchronous Apify and Crawlee coroutines. Each + consumer (the scheduler and the HTTP cache storage) owns its own `AsyncThread`, so the request + queue and the key-value store each live entirely on a single, separate event loop and are never + shared across loops. They do read the same global `Configuration`, which is read-only here, so + the isolation holds. A single shared loop would also work but would couple the otherwise + independent lifecycles of those Scrapy components. """ - def __init__(self) -> None: + def __init__(self, default_timeout: timedelta = timedelta(seconds=60)) -> None: + self._default_timeout = default_timeout self._eventloop = asyncio.new_event_loop() # Start the event loop in a dedicated daemon thread. @@ -33,7 +38,7 @@ def __init__(self) -> None: def run_coro( self, coro: Coroutine, - timeout: timedelta = timedelta(seconds=60), + timeout: timedelta | None = None, ) -> Any: """Run a coroutine on an event loop running in a separate thread. @@ -42,7 +47,8 @@ def run_coro( Args: coro: The coroutine to run. - timeout: The maximum number of seconds to wait for the coroutine to finish. + timeout: The maximum time to wait for the coroutine to finish. Defaults to the + `default_timeout` passed to the constructor. Returns: The result returned by the coroutine. @@ -52,6 +58,9 @@ def run_coro( TimeoutError: If the coroutine does not complete within the timeout. Exception: Any exception raised during coroutine execution. """ + if timeout is None: + timeout = self._default_timeout + if not self._eventloop.is_running(): raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.') diff --git a/src/apify/scrapy/_logging_config.py b/src/apify/scrapy/_logging_config.py index fca3d4f7b..bcdb0e7ef 100644 --- a/src/apify/scrapy/_logging_config.py +++ b/src/apify/scrapy/_logging_config.py @@ -13,6 +13,12 @@ _SUPPLEMENTAL_LOGGERS = ['filelock', 'hpack', 'httpcore', 'protego', 'twisted'] _ALL_LOGGERS = _PRIMARY_LOGGERS + _SUPPLEMENTAL_LOGGERS +# Mutable module state shared with the Scrapy logging monkey-patch installed by `initialize_logging`. +# `initialize_logging` refreshes `level`/`handler` on each call, and the patch (installed at most +# once) reads them so it always re-applies the latest configuration instead of values captured the +# first time it ran. Stored in a dict so the patch can read them without rebinding module globals. +_state: dict[str, Any] = {'level': 'INFO', 'handler': None, 'patched': False} + def _configure_logger(name: str | None, logging_level: str, handler: logging.Handler) -> None: """Clear and reconfigure the logger.""" @@ -23,26 +29,40 @@ def _configure_logger(name: str | None, logging_level: str, handler: logging.Han logger.propagate = False +def _configure_all_loggers() -> None: + """Apply the Apify handler and level to the root logger and all defined loggers.""" + handler = _state['handler'] + if handler is None: + return + for logger_name in [None, *_ALL_LOGGERS]: + _configure_logger(logger_name, _state['level'], handler) + + def initialize_logging() -> None: """Configure logging for Apify Actors and adjust Scrapy's logging settings.""" # Retrieve Scrapy project settings and determine the logging level. settings = get_project_settings() - logging_level = settings.get('LOG_LEVEL', 'INFO') # Default to INFO. + _state['level'] = settings.get('LOG_LEVEL', 'INFO') # Default to INFO. # Create a custom handler with the Apify log formatter. handler = logging.StreamHandler() handler.setFormatter(ActorLogFormatter(include_logger_name=True)) + _state['handler'] = handler # Configure the root logger and all other defined loggers. - for logger_name in [None, *_ALL_LOGGERS]: - _configure_logger(logger_name, logging_level, handler) + _configure_all_loggers() + + # Monkey-patch Scrapy's logging configuration to re-apply our settings whenever Scrapy + # reconfigures logging. Install the wrapper at most once; wrapping again on every call would + # nest wrappers on top of each other. + if _state['patched']: + return - # Monkey-patch Scrapy's logging configuration to re-apply our settings. original_configure_logging = scrapy_logging.configure_logging def new_configure_logging(*args: Any, **kwargs: Any) -> None: original_configure_logging(*args, **kwargs) - for logger_name in [None, *_ALL_LOGGERS]: - _configure_logger(logger_name, logging_level, handler) + _configure_all_loggers() scrapy_logging.configure_logging = new_configure_logging # ty: ignore[invalid-assignment] + _state['patched'] = True diff --git a/src/apify/scrapy/_serialization.py b/src/apify/scrapy/_serialization.py new file mode 100644 index 000000000..e9c49be78 --- /dev/null +++ b/src/apify/scrapy/_serialization.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import base64 +import json +from typing import Any + +from pydantic import BaseModel + +# Scrapy persists requests and cached responses by serializing the dict produced by +# `Request.to_dict()` (and a small response payload for the HTTP cache). The Apify integration +# stores that payload inside the request queue and the key-value store and reads it back later. +# Those storages hold JSON, so the payload is serialized as JSON here rather than as a pickled +# Python object graph: JSON is a plain, portable, interoperable data format and carries no +# executable object state. +# +# JSON cannot represent everything Scrapy emits. Of the values that actually appear, only two are +# not natively JSON-serializable, and both sit at fixed, known keys with a known type: +# - `body`: `bytes` +# - `headers`: a `{bytes: [bytes]}` mapping (header name -> list of values) +# These are base64-encoded in place. Pydantic models (e.g. Crawlee's `UserData`, which the Apify +# integration injects into `meta['userData']`) are converted via `model_dump()`. Everything else — +# notably the user-controlled `meta` and `cb_kwargs` — must already be JSON-serializable; if it is +# not, serialization fails with a clear error rather than silently dropping the request. No in-band +# sentinel is used for user data, so no legitimate value can collide with the encoding scheme. + + +def encode_to_json(data: dict[str, Any]) -> str: + """Serialize a Scrapy request/response dict to a JSON string. + + The binary `body` and `headers` fields are base64-encoded in place. All other fields must be + JSON-serializable; pydantic models are dumped to plain dicts. A clear `TypeError` is raised if + any remaining value (typically something in `meta` or `cb_kwargs`) cannot be JSON-encoded. + + Args: + data: The dict to serialize, e.g. the output of `scrapy.Request.to_dict()`. + + Returns: + The JSON-encoded string. + """ + if not isinstance(data, dict): + raise TypeError(f'Expected a dict to serialize, got {type(data)}') + + safe = dict(data) + + if isinstance(safe.get('body'), bytes): + safe['body'] = base64.b64encode(safe['body']).decode('ascii') + + if isinstance(safe.get('headers'), dict): + safe['headers'] = _encode_headers(safe['headers']) + + try: + return json.dumps(safe, default=_json_default) + except TypeError as exc: + raise TypeError( + 'Failed to JSON-serialize a Scrapy request/response for storage on the Apify platform. ' + 'All values in `meta` and `cb_kwargs` must be JSON-serializable (str, int, float, bool, None, ' + 'list, dict, or a pydantic model).' + ) from exc + + +def decode_from_json(text: str) -> Any: + """Reconstruct a Scrapy request/response dict from a string produced by `encode_to_json`. + + The base64-encoded `body` and `headers` fields are decoded back to their `bytes` representation. + + Args: + text: The JSON-encoded string. + + Returns: + The decoded object (a dict for valid request/response payloads). + """ + data = json.loads(text) + if not isinstance(data, dict): + return data + + if isinstance(data.get('body'), str): + data['body'] = base64.b64decode(data['body']) + + if isinstance(data.get('headers'), dict): + data['headers'] = _decode_headers(data['headers']) + + return data + + +def _json_default(obj: Any) -> Any: + """Fallback for values `json.dumps` cannot serialize on its own. + + Only pydantic models are accepted (and dumped to plain dicts); anything else raises, which + `encode_to_json` turns into an actionable error. + """ + if isinstance(obj, BaseModel): + return obj.model_dump(by_alias=True) + raise TypeError(f'Object of type {type(obj).__name__} is not JSON-serializable') + + +def _encode_headers(headers: dict[Any, Any]) -> dict[str, list[str]]: + """Encode a Scrapy `{bytes: [bytes]}` headers mapping to a JSON-safe `{str: [base64-str]}`.""" + encoded: dict[str, list[str]] = {} + for key, value in headers.items(): + str_key = key.decode('latin-1') if isinstance(key, bytes) else key + values = value if isinstance(value, (list, tuple)) else [value] + encoded[str_key] = [_b64encode_value(item) for item in values] + return encoded + + +def _decode_headers(headers: dict[str, Any]) -> dict[bytes, list[bytes]]: + """Reverse `_encode_headers`, restoring the `{bytes: [bytes]}` mapping Scrapy expects.""" + decoded: dict[bytes, list[bytes]] = {} + for key, value in headers.items(): + bytes_key = key.encode('latin-1') if isinstance(key, str) else key + values = value if isinstance(value, list) else [value] + decoded[bytes_key] = [base64.b64decode(item) for item in values] + return decoded + + +def _b64encode_value(value: Any) -> str: + """Base64-encode a single header value, coercing non-bytes values to bytes first.""" + raw = value if isinstance(value, bytes) else str(value).encode('utf-8') + return base64.b64encode(raw).decode('ascii') diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 14d8753d3..062d315b1 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -2,7 +2,6 @@ import gzip import io -import pickle import re import struct from logging import getLogger @@ -14,6 +13,7 @@ from apify import Configuration from apify.scrapy._async_thread import AsyncThread +from apify.scrapy._serialization import decode_from_json, encode_to_json from apify.storage_clients import ApifyStorageClient from apify.storages import KeyValueStore @@ -36,7 +36,10 @@ class ApifyCacheStorage: """ def __init__(self, settings: BaseSettings) -> None: - self._expiration_max_items = 100 + # Upper bound on how many keys the per-spider-close cleanup sweeps. Configurable because the + # cleanup is bounded and best-effort (see `close_spider`); the default keeps the historical + # behavior. + self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') self._spider: Spider | None = None self._kvs: KeyValueStore | None = None @@ -79,8 +82,15 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None: async def expire_kvs() -> None: if self._kvs is None: raise ValueError('Key value store not initialized') - i = 0 + # Best-effort, bounded cleanup: at most `_expiration_max_items` keys are swept per + # spider close and `iterate_keys()` order is not guaranteed, so stale entries may + # linger across runs. This only reclaims storage; correctness is handled at read + # time, where `retrieve_response` treats an expired entry as a cache miss. + processed = 0 async for item in self._kvs.iterate_keys(): + if processed >= self._expiration_max_items: + break + processed += 1 value = await self._kvs.get_value(item.key) try: gzip_time = read_gzip_time(value) @@ -93,9 +103,6 @@ async def expire_kvs() -> None: await self._kvs.set_value(item.key, None) else: logger.debug(f'Valid cache item {item.key}') - if i == self._expiration_max_items: - break - i += 1 self._async_thread.run_coro(expire_kvs()) @@ -127,11 +134,18 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non if current_time is None: current_time = int(time()) - if 0 < self._expiration_secs < current_time - read_gzip_time(value): - logger.debug('Cache expired', extra={'request': request}) + + # A malformed or legacy (e.g. pickle-format) cache entry must not crash retrieval. Treat it + # as a cache miss so Scrapy re-fetches and re-stores it in the current format. + try: + if 0 < self._expiration_secs < current_time - read_gzip_time(value): + logger.debug('Cache expired', extra={'request': request}) + return None + data = from_gzip(value) + except Exception as exc: + logger.warning(f'Ignoring malformed cache entry {key!r}: {exc}', extra={'request': request}) return None - data = from_gzip(value) url = data['url'] status = data['status'] headers = Headers(data['headers']) @@ -162,18 +176,26 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non def to_gzip(data: dict, mtime: int | None = None) -> bytes: - """Dump a dictionary to a gzip-compressed byte stream.""" + """Dump a dictionary to a gzip-compressed byte stream as JSON. + + Cached entries are stored in and read back from the Apify key-value store, which holds JSON, so + the payload is serialized as JSON rather than pickled. See `apify.scrapy._serialization` for the + encoding details. + """ + payload = encode_to_json(data).encode('utf-8') with io.BytesIO() as byte_stream: with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file: - pickle.dump(data, gzip_file, protocol=4) + gzip_file.write(payload) return byte_stream.getvalue() def from_gzip(gzip_bytes: bytes) -> dict: - """Load a dictionary from a gzip-compressed byte stream.""" + """Load a dictionary from a gzip-compressed JSON byte stream.""" with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file: - data: dict = pickle.load(gzip_file) - return data + data = decode_from_json(gzip_file.read().decode('utf-8')) + if not isinstance(data, dict): + raise TypeError(f'Expected a dict from the cached payload, got {type(data)}') + return data def read_gzip_time(gzip_bytes: bytes) -> int: diff --git a/src/apify/scrapy/middlewares/apify_proxy.py b/src/apify/scrapy/middlewares/apify_proxy.py index acaf7d86e..2f2f22cc0 100644 --- a/src/apify/scrapy/middlewares/apify_proxy.py +++ b/src/apify/scrapy/middlewares/apify_proxy.py @@ -7,7 +7,7 @@ from scrapy.exceptions import NotConfigured from apify import Actor, ProxyConfiguration -from apify.scrapy import get_basic_auth_header +from apify.scrapy.utils import get_basic_auth_header if TYPE_CHECKING: from scrapy import Request, Spider @@ -30,7 +30,6 @@ def __init__(self, proxy_settings: dict) -> None: Args: proxy_settings: Dictionary containing proxy settings, provided by the Actor input. - auth_encoding: Encoding for basic authentication (default is 'latin-1'). """ self._proxy_settings = proxy_settings self._proxy_cfg_internal: ProxyConfiguration | None = None @@ -111,7 +110,7 @@ def process_exception( if isinstance(exception, TunnelError): Actor.log.warning( f'ApifyHttpProxyMiddleware: TunnelError occurred for request="{request}", ' - 'reason="{exception}", skipping...' + f'reason="{exception}", skipping...' ) async def _get_new_proxy_url(self) -> ParseResult: diff --git a/src/apify/scrapy/requests.py b/src/apify/scrapy/requests.py index dd40482d4..2724107a4 100644 --- a/src/apify/scrapy/requests.py +++ b/src/apify/scrapy/requests.py @@ -1,7 +1,7 @@ from __future__ import annotations import codecs -import pickle +import sys from logging import getLogger from typing import Any, cast @@ -13,11 +13,41 @@ from crawlee._request import UserData from crawlee._types import HttpHeaders +from ._serialization import decode_from_json, encode_to_json from apify import Request as ApifyRequest logger = getLogger(__name__) +def _ensure_known_request_class(request_dict: dict[str, Any]) -> None: + """Validate the optional `_class` entry before `request_from_dict` resolves it. + + `scrapy.utils.request.request_from_dict` resolves a `_class` entry via `load_object`, which + imports the dotted path it is given. To keep reconstruction self-contained — importing nothing + that the running spider has not already imported — we only accept a `_class` that is already + present in `sys.modules` and is a `scrapy.Request` subclass. + + A spider that reads its own requests always has its request classes imported by the time the + requests are reconstructed, so this does not restrict legitimate use. + """ + class_path = request_dict.get('_class') + if class_path is None: + return + + if not isinstance(class_path, str): + raise TypeError(f'Invalid scrapy_request `_class`, expected a string, got {type(class_path)}') + + module_name, _, class_name = class_path.rpartition('.') + module = sys.modules.get(module_name) if module_name else None + request_cls = getattr(module, class_name, None) if module is not None else None + + if not (isinstance(request_cls, type) and issubclass(request_cls, ScrapyRequest)): + raise TypeError( + f'Refusing to reconstruct a Scrapy request of type {class_path!r}: it is not an already-imported ' + f'scrapy.Request subclass.' + ) + + def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequest | None: """Convert a Scrapy request to an Apify request. @@ -35,6 +65,13 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ logger.debug(f'to_apify_request was called (scrapy_request={scrapy_request})...') # Configuration to behave as similarly as possible to Scrapy's default RFPDupeFilter. + # + # `payload` carries the request body, which is used both for platform processing and for + # computing the extended unique key. The body is also part of the serialized Scrapy request + # stored further below, where it is needed to faithfully reconstruct the request. Both copies + # originate from `scrapy_request.body` and are kept intentionally: dropping `payload` would + # change deduplication, and dropping the serialized copy would couple reconstruction to the + # Apify payload. request_kwargs: dict[str, Any] = { 'url': scrapy_request.url, 'method': scrapy_request.method, @@ -67,28 +104,48 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ request_kwargs['user_data'] = user_data if isinstance(user_data, dict) else {} - # Convert Scrapy's headers to a HttpHeaders and store them in the apify_request + # Convert Scrapy's headers to HttpHeaders and store them on the apify_request. This is only + # the Apify-platform-level view of the headers; the authoritative copy, with exact bytes, + # travels inside the serialized scrapy_request below. `to_unicode_dict()` decodes as UTF-8 + # and raises on non-UTF-8 header values, so it is guarded: a request with binary headers + # keeps them in the serialized payload instead of being dropped entirely. if isinstance(scrapy_request.headers, Headers): - request_kwargs['headers'] = HttpHeaders(dict(scrapy_request.headers.to_unicode_dict())) + try: + request_kwargs['headers'] = HttpHeaders(dict(scrapy_request.headers.to_unicode_dict())) + except UnicodeDecodeError: + logger.warning( + 'Could not represent Scrapy request headers as Apify request headers (non-UTF-8 values); ' + 'they are preserved in the serialized request instead.' + ) else: logger.warning( f'Invalid scrapy_request.headers type, not scrapy.http.headers.Headers: {scrapy_request.headers}' ) apify_request = ApifyRequest.from_url(**request_kwargs) - - # Serialize the Scrapy ScrapyRequest and store it in the apify_request. - # - This process involves converting the Scrapy ScrapyRequest object into a dictionary, encoding it to base64, - # and storing it as 'scrapy_request' within the 'userData' dictionary of the apify_request. - # - The serialization process can be referenced at: https://stackoverflow.com/questions/30469575/. scrapy_request_dict = scrapy_request.to_dict(spider=spider) - scrapy_request_dict_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode() - apify_request.user_data['scrapy_request'] = scrapy_request_dict_encoded except Exception as exc: logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}') return None + # Serialize the Scrapy request and store it (base64-encoded JSON) under 'scrapy_request' in the + # Apify request's user data. This is intentionally outside the broad except above so that a + # non-JSON-serializable value in `meta`/`cb_kwargs` is reported loudly rather than hidden as a + # generic warning. The failure is logged with a full traceback and the request is skipped (None + # is returned, honoring this function's contract) instead of crashing the whole crawl. See + # `_serialization` for the encoding details. + try: + scrapy_request_json = encode_to_json(scrapy_request_dict) + except TypeError: + logger.exception( + f'Failed to serialize Scrapy request {scrapy_request} for storage on the Apify platform; skipping it. ' + 'Ensure all values in `meta` and `cb_kwargs` are JSON-serializable.' + ) + return None + + apify_request.user_data['scrapy_request'] = codecs.encode(scrapy_request_json.encode('utf-8'), 'base64').decode() + logger.debug(f'scrapy_request was converted to the apify_request={apify_request}') return apify_request @@ -101,14 +158,15 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ spider: The Scrapy spider that the request is associated with. Raises: - TypeError: If the Apify request is not an instance of the `ApifyRequest` class. - ValueError: If the Apify request does not contain the required keys. + TypeError: If `apify_request` is not an `ApifyRequest`, if the stored Scrapy request payload + is malformed, or if its `_class` does not refer to an already-imported `scrapy.Request` + subclass. Returns: The converted Scrapy request. """ if not isinstance(cast('Any', apify_request), ApifyRequest): - raise TypeError('apify_request must be a crawlee.ScrapyRequest instance') + raise TypeError('apify_request must be an apify.Request instance') logger.debug(f'to_scrapy_request was called (apify_request={apify_request})...') @@ -123,10 +181,14 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ if not isinstance(scrapy_request_dict_encoded, str): raise TypeError('scrapy_request_dict_encoded must be a string') - scrapy_request_dict = pickle.loads(codecs.decode(scrapy_request_dict_encoded.encode(), 'base64')) + scrapy_request_json = codecs.decode(scrapy_request_dict_encoded.encode(), 'base64').decode('utf-8') + scrapy_request_dict = decode_from_json(scrapy_request_json) if not isinstance(scrapy_request_dict, dict): raise TypeError('scrapy_request_dict must be a dictionary') + # Validate any `_class` entry before request_from_dict resolves and imports it. + _ensure_known_request_class(scrapy_request_dict) + scrapy_request = request_from_dict(scrapy_request_dict, spider=spider) if not isinstance(scrapy_request, ScrapyRequest): raise TypeError('scrapy_request must be an instance of the ScrapyRequest class') diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index dc3add93f..5aa721ddd 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -170,6 +170,14 @@ def next_request(self) -> Request | None: traceback.print_exc() raise - scrapy_request = to_scrapy_request(apify_request, spider=self.spider) + # Reconstruct the Scrapy request. A malformed queue entry (e.g. an unknown `_class` or a + # payload that no longer parses) must not crash the whole run: it has already been marked + # handled above, so log it and skip it by returning None instead of propagating. + try: + scrapy_request = to_scrapy_request(apify_request, spider=self.spider) + except Exception: + logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.') + return None + logger.debug(f'Converted to scrapy_request: {scrapy_request}') return scrapy_request diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index 8e250aa05..4cb52fc2a 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -1,3 +1,7 @@ +import gzip +import io +import json +import pickle from time import time import pytest @@ -6,9 +10,10 @@ FIXTURE_DICT = {'name': 'Alice'} +# Gzip-compressed JSON (the pickle-free format) of FIXTURE_DICT with mtime=0. FIXTURE_BYTES = ( - b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s' - b'S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00' + b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xabV\xcaK\xccMU\xb2RPr\xcc\xc9LNU' + b'\xaa\x05\x00\x03\x9a\x9d\xb0\x11\x00\x00\x00' ) @@ -39,6 +44,32 @@ def test_read_gzip_time_non_zero() -> None: assert read_gzip_time(data_bytes) == current_time +def test_gzip_round_trips_binary_response() -> None: + """A cached response with a binary body and Scrapy-style bytes headers round-trips.""" + data = { + 'status': 200, + 'url': 'https://example.com', + 'headers': {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']}, + 'body': b'\xff\xfe', + } + + assert from_gzip(to_gzip(data)) == data + + +def test_from_gzip_rejects_pickle_payload() -> None: + """Cache entries are stored as gzip-compressed JSON; a pickle payload is not valid JSON. + + The loader must reject such a payload rather than load it. + """ + with io.BytesIO() as byte_stream: + with gzip.GzipFile(fileobj=byte_stream, mode='wb') as gzip_file: + pickle.dump({'status': 200, 'body': b'x'}, gzip_file, protocol=4) + pickle_payload = byte_stream.getvalue() + + with pytest.raises((UnicodeDecodeError, json.JSONDecodeError, ValueError)): + from_gzip(pickle_payload) + + @pytest.mark.parametrize( ('spider_name', 'expected'), [ diff --git a/tests/unit/scrapy/requests/test_to_apify_request.py b/tests/unit/scrapy/requests/test_to_apify_request.py index d3aef6b2a..19013af48 100644 --- a/tests/unit/scrapy/requests/test_to_apify_request.py +++ b/tests/unit/scrapy/requests/test_to_apify_request.py @@ -1,5 +1,7 @@ from __future__ import annotations +import logging + import pytest from scrapy import Request, Spider from scrapy.http.headers import Headers @@ -92,6 +94,22 @@ def test_invalid_scrapy_request_returns_none(spider: Spider) -> None: assert apify_request is None +def test_non_json_serializable_meta_is_skipped(spider: Spider, caplog: pytest.LogCaptureFixture) -> None: + """A non-JSON-serializable value in meta is skipped loudly instead of crashing the crawl. + + The serializer requires `meta`/`cb_kwargs` to be JSON-serializable. When they are not, the request + is skipped (None is returned, honoring the function's contract) and the failure is logged with a + full traceback, rather than the request being silently lost or the whole run crashing. + """ + scrapy_request = Request(url='https://example.com', meta={'tags': {'a', 'b'}}) + + with caplog.at_level(logging.ERROR, logger='apify.scrapy.requests'): + apify_request = to_apify_request(scrapy_request, spider) + + assert apify_request is None + assert any('JSON-serializable' in record.getMessage() for record in caplog.records) + + def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) -> None: """Reproduce: CrawleeRequestData() argument after ** must be a mapping, not CrawleeRequestData. diff --git a/tests/unit/scrapy/requests/test_to_scrapy_request.py b/tests/unit/scrapy/requests/test_to_scrapy_request.py index fadb25249..ffe1065d8 100644 --- a/tests/unit/scrapy/requests/test_to_scrapy_request.py +++ b/tests/unit/scrapy/requests/test_to_scrapy_request.py @@ -1,14 +1,17 @@ from __future__ import annotations import binascii +import codecs +import json +import pickle import pytest -from scrapy import Request, Spider +from scrapy import FormRequest, Request, Spider from crawlee._types import HttpHeaders from apify import Request as ApifyRequest -from apify.scrapy.requests import to_scrapy_request +from apify.scrapy.requests import to_apify_request, to_scrapy_request class DummySpider(Spider): @@ -21,6 +24,21 @@ def spider() -> DummySpider: return DummySpider() +# Base64-encoded JSON fixture (the pickle-free format) for a GET request to https://apify.com. +_SCRAPY_REQUEST_JSON_ENCODED = ( + 'eyJ1cmwiOiAiaHR0cHM6Ly9hcGlmeS5jb20iLCAiY2FsbGJhY2siOiBudWxsLCAiZXJyYmFjayI6\n' + 'IG51bGwsICJoZWFkZXJzIjogeyJBY2NlcHQiOiBbImRHVjRkQzlvZEcxc0xHRndjR3hwWTJGMGFX\n' + 'OXVMM2hvZEcxc0szaHRiQ3hoY0hCc2FXTmhkR2x2Ymk5NGJXdzdjVDB3TGprc0tpOHFPM0U5TUM0\n' + 'NCJdLCAiQWNjZXB0LUxhbmd1YWdlIjogWyJaVzQ9Il0sICJVc2VyLUFnZW50IjogWyJVMk55WVhC\n' + 'NUx6SXVNVEV1TUNBb0syaDBkSEJ6T2k4dmMyTnlZWEI1TG05eVp5az0iXSwgIkFjY2VwdC1FbmNv\n' + 'ZGluZyI6IFsiWjNwcGNDd2daR1ZtYkdGMFpRPT0iXX0sICJib2R5IjogIiIsICJjb29raWVzIjog' + 'e30sICJtZXRhIjogeyJhcGlmeV9yZXF1ZXN0X2lkIjogImZ2d3NjTzJVSkxkcjEwQiIsICJhcGlm' + 'eV9yZXF1ZXN0X3VuaXF1ZV9rZXkiOiAiaHR0cHM6Ly9hcGlmeS5jb20ifSwgImVuY29kaW5nIjog' + 'InV0Zi04IiwgImZsYWdzIjogW10sICJjYl9rd2FyZ3MiOiB7fSwgImRvbnRfZmlsdGVyIjogZmFs' + 'c2UsICJtZXRob2QiOiAiR0VUIiwgInByaW9yaXR5IjogMH0=\n' +) + + def test_without_reconstruction(spider: Spider) -> None: # Without reconstruction of encoded Scrapy request apify_request = ApifyRequest( @@ -62,13 +80,13 @@ def test_without_reconstruction_with_optional_fields(spider: Spider) -> None: def test_with_reconstruction(spider: Spider) -> None: - # With reconstruction of encoded Scrapy request + # With reconstruction of JSON-encoded Scrapy request apify_request = ApifyRequest( url='https://apify.com', method='GET', unique_key='https://apify.com', user_data={ - 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501 + 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED, }, ) @@ -82,7 +100,7 @@ def test_with_reconstruction(spider: Spider) -> None: def test_with_reconstruction_with_optional_fields(spider: Spider) -> None: - # With reconstruction of encoded Scrapy request + # With reconstruction of JSON-encoded Scrapy request apify_request = ApifyRequest( url='https://apify.com', method='GET', @@ -90,7 +108,7 @@ def test_with_reconstruction_with_optional_fields(spider: Spider) -> None: headers=HttpHeaders({'Authorization': 'Bearer access_token'}), user_data={ 'some_user_data': 'hello', - 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501 + 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED, }, ) @@ -119,3 +137,216 @@ def test_invalid_request_for_reconstruction(spider: Spider) -> None: with pytest.raises(binascii.Error): to_scrapy_request(apify_request, spider) + + +def test_pickle_payload_rejected(spider: Spider) -> None: + """Data stored under 'scrapy_request' is JSON; a pickle-encoded payload is not valid JSON. + + The reconstruction path must reject such a payload rather than deserialize it. + """ + # Build a pickle payload like the old code produced. + scrapy_request_dict = { + 'url': 'https://example.com', + 'callback': None, + 'errback': None, + 'headers': {}, + 'method': 'GET', + 'body': b'', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + } + pickle_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode() + + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': pickle_encoded}, + ) + + # The JSON-based reconstruction must reject the pickle payload. + with pytest.raises((json.JSONDecodeError, UnicodeDecodeError, ValueError)): + to_scrapy_request(apify_request, spider) + + +def test_roundtrip_serialization(spider: Spider) -> None: + """Verify that to_apify_request -> to_scrapy_request roundtrip works with JSON encoding.""" + original_request = Request( + url='https://example.com/test', + method='POST', + body=b'test body content', + headers={'Content-Type': 'application/json', 'X-Custom': 'value'}, + meta={'userData': {'custom_key': 'custom_value'}}, + ) + + apify_request = to_apify_request(original_request, spider) + assert apify_request is not None + + # Verify the encoded data is valid JSON (not pickle) + encoded = apify_request.user_data['scrapy_request'] + assert isinstance(encoded, str) + decoded_bytes = codecs.decode(encoded.encode(), 'base64') + decoded_json = json.loads(decoded_bytes.decode('utf-8')) + assert isinstance(decoded_json, dict) + assert decoded_json['url'] == 'https://example.com/test' + + # Reconstruct the Scrapy request + restored = to_scrapy_request(apify_request, spider) + assert isinstance(restored, Request) + assert restored.url == original_request.url + assert restored.method == original_request.method + assert restored.body == original_request.body + + +def test_no_pickle_in_serialized_output(spider: Spider) -> None: + """Confirm that to_apify_request never produces pickle-serialized output.""" + scrapy_request = Request(url='https://example.com') + apify_request = to_apify_request(scrapy_request, spider) + assert apify_request is not None + + encoded = apify_request.user_data['scrapy_request'] + assert isinstance(encoded, str) + raw_bytes = codecs.decode(encoded.encode(), 'base64') + + # Pickle protocol 4 starts with b'\x80\x04'; JSON starts with b'{' + assert not raw_bytes.startswith(b'\x80'), 'Output must not be pickle-serialized' + # Verify it's valid JSON + json.loads(raw_bytes.decode('utf-8')) + + +def _encode_request_dict(request_dict: dict) -> str: + """Encode a raw request dict the same way `to_apify_request` does (base64-encoded JSON).""" + return codecs.encode(json.dumps(request_dict).encode('utf-8'), 'base64').decode() + + +def test_binary_body_round_trips(spider: Spider) -> None: + """Non-UTF-8 bytes in the request body survive the JSON roundtrip (base64-encoded).""" + original = Request( + url='https://example.com', + method='POST', + body=b'\x00\x01\x02\xff\xfe binary', + headers={'Content-Type': 'application/octet-stream'}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert restored.body == b'\x00\x01\x02\xff\xfe binary' + assert restored.headers.get('Content-Type') == b'application/octet-stream' + + +def test_binary_headers_round_trip_and_request_not_dropped(spider: Spider) -> None: + """A request with non-UTF-8 header values is not dropped; its headers survive the roundtrip. + + The Apify-request-level headers can only hold UTF-8-decodable values, so binary header values are + preserved inside the serialized Scrapy request instead. The conversion must still succeed (return + a request, not None) and the exact header bytes must come back. + """ + original = Request( + url='https://example.com', + headers={b'Accept': b'text/html', b'X-Bin': b'\xff\xfe\x00'}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None # must not be dropped + + restored = to_scrapy_request(apify_request, spider) + assert restored.headers.get(b'X-Bin') == b'\xff\xfe\x00' + assert restored.headers.get(b'Accept') == b'text/html' + + +def test_userdata_with_b64_sentinel_key_round_trips(spider: Spider) -> None: + """A user dict that happens to look like a bytes wrapper must round-trip unchanged. + + The encoder uses no in-band sentinel for user data, so an arbitrary value such as + ``{"__b64__": "..."}`` in `meta` is preserved exactly instead of being reinterpreted. + """ + original = Request( + url='https://example.com', + meta={'userData': {}, 'looks_like_sentinel': {'__b64__': 'not really base64 !!!'}}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert restored.meta['looks_like_sentinel'] == {'__b64__': 'not really base64 !!!'} + + +def test_already_imported_request_subclass_round_trips(spider: Spider) -> None: + """A `_class` referring to an already-imported `scrapy.Request` subclass is reconstructed.""" + original = FormRequest(url='https://example.com', formdata={'key': 'value'}) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert isinstance(restored, FormRequest) + assert restored.method == 'POST' + + +def test_non_request_class_is_rejected(spider: Spider) -> None: + """A `_class` that resolves to something other than a `scrapy.Request` subclass is rejected. + + `scrapy.utils.request.request_from_dict` resolves `_class` via `load_object`, which imports the + dotted path it is given. Reconstruction only accepts an already-imported `scrapy.Request` + subclass; anything else (here a plain `dict`) is rejected. + """ + request_dict = { + 'url': 'https://example.com', + 'callback': None, + 'errback': None, + 'headers': {}, + 'body': '', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': 'builtins.dict', + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + with pytest.raises(TypeError, match='not an already-imported'): + to_scrapy_request(apify_request, spider) + + +def test_class_referring_to_unimported_module_is_rejected(spider: Spider) -> None: + """A `_class` whose module is not already imported is rejected without importing it.""" + request_dict = { + 'url': 'https://example.com', + 'headers': {}, + 'body': '', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': 'definitely_not_imported_pkg.some_module.SomeRequest', + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + with pytest.raises(TypeError, match='not an already-imported'): + to_scrapy_request(apify_request, spider)