diff --git a/src/apify/scrapy/_async_thread.py b/src/apify/scrapy/_async_thread.py index 79de1162d..16665b6f2 100644 --- a/src/apify/scrapy/_async_thread.py +++ b/src/apify/scrapy/_async_thread.py @@ -14,13 +14,18 @@ class AsyncThread: - """Class for running an asyncio event loop in a separate thread. - - This allows running asynchronous coroutines from synchronous code by executingthem on an event loop - that runs in its own dedicated thread. + """Run an asyncio event loop in a dedicated background thread. + + This lets synchronous Scrapy callbacks drive asynchronous Apify and Crawlee coroutines. Each + consumer (the scheduler and the HTTP cache storage) owns its own `AsyncThread`, so the request + queue and the key-value store each live entirely on a single, separate event loop and are never + shared across loops. They do read the same global `Configuration`, which is read-only here, so + the isolation holds. A single shared loop would also work but would couple the otherwise + independent lifecycles of those Scrapy components. """ - def __init__(self) -> None: + def __init__(self, default_timeout: timedelta = timedelta(seconds=60)) -> None: + self._default_timeout = default_timeout self._eventloop = asyncio.new_event_loop() # Start the event loop in a dedicated daemon thread. @@ -33,7 +38,7 @@ def __init__(self) -> None: def run_coro( self, coro: Coroutine, - timeout: timedelta = timedelta(seconds=60), + timeout: timedelta | None = None, ) -> Any: """Run a coroutine on an event loop running in a separate thread. @@ -42,7 +47,8 @@ def run_coro( Args: coro: The coroutine to run. - timeout: The maximum number of seconds to wait for the coroutine to finish. + timeout: The maximum time to wait for the coroutine to finish. Defaults to the + `default_timeout` passed to the constructor. Returns: The result returned by the coroutine. @@ -52,6 +58,9 @@ def run_coro( TimeoutError: If the coroutine does not complete within the timeout. Exception: Any exception raised during coroutine execution. """ + if timeout is None: + timeout = self._default_timeout + if not self._eventloop.is_running(): raise RuntimeError(f'The coroutine {coro} cannot be executed because the event loop is not running.') diff --git a/src/apify/scrapy/_logging_config.py b/src/apify/scrapy/_logging_config.py index fca3d4f7b..bcdb0e7ef 100644 --- a/src/apify/scrapy/_logging_config.py +++ b/src/apify/scrapy/_logging_config.py @@ -13,6 +13,12 @@ _SUPPLEMENTAL_LOGGERS = ['filelock', 'hpack', 'httpcore', 'protego', 'twisted'] _ALL_LOGGERS = _PRIMARY_LOGGERS + _SUPPLEMENTAL_LOGGERS +# Mutable module state shared with the Scrapy logging monkey-patch installed by `initialize_logging`. +# `initialize_logging` refreshes `level`/`handler` on each call, and the patch (installed at most +# once) reads them so it always re-applies the latest configuration instead of values captured the +# first time it ran. Stored in a dict so the patch can read them without rebinding module globals. +_state: dict[str, Any] = {'level': 'INFO', 'handler': None, 'patched': False} + def _configure_logger(name: str | None, logging_level: str, handler: logging.Handler) -> None: """Clear and reconfigure the logger.""" @@ -23,26 +29,40 @@ def _configure_logger(name: str | None, logging_level: str, handler: logging.Han logger.propagate = False +def _configure_all_loggers() -> None: + """Apply the Apify handler and level to the root logger and all defined loggers.""" + handler = _state['handler'] + if handler is None: + return + for logger_name in [None, *_ALL_LOGGERS]: + _configure_logger(logger_name, _state['level'], handler) + + def initialize_logging() -> None: """Configure logging for Apify Actors and adjust Scrapy's logging settings.""" # Retrieve Scrapy project settings and determine the logging level. settings = get_project_settings() - logging_level = settings.get('LOG_LEVEL', 'INFO') # Default to INFO. + _state['level'] = settings.get('LOG_LEVEL', 'INFO') # Default to INFO. # Create a custom handler with the Apify log formatter. handler = logging.StreamHandler() handler.setFormatter(ActorLogFormatter(include_logger_name=True)) + _state['handler'] = handler # Configure the root logger and all other defined loggers. - for logger_name in [None, *_ALL_LOGGERS]: - _configure_logger(logger_name, logging_level, handler) + _configure_all_loggers() + + # Monkey-patch Scrapy's logging configuration to re-apply our settings whenever Scrapy + # reconfigures logging. Install the wrapper at most once; wrapping again on every call would + # nest wrappers on top of each other. + if _state['patched']: + return - # Monkey-patch Scrapy's logging configuration to re-apply our settings. original_configure_logging = scrapy_logging.configure_logging def new_configure_logging(*args: Any, **kwargs: Any) -> None: original_configure_logging(*args, **kwargs) - for logger_name in [None, *_ALL_LOGGERS]: - _configure_logger(logger_name, logging_level, handler) + _configure_all_loggers() scrapy_logging.configure_logging = new_configure_logging # ty: ignore[invalid-assignment] + _state['patched'] = True diff --git a/src/apify/scrapy/_serialization.py b/src/apify/scrapy/_serialization.py new file mode 100644 index 000000000..e9c49be78 --- /dev/null +++ b/src/apify/scrapy/_serialization.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import base64 +import json +from typing import Any + +from pydantic import BaseModel + +# Scrapy persists requests and cached responses by serializing the dict produced by +# `Request.to_dict()` (and a small response payload for the HTTP cache). The Apify integration +# stores that payload inside the request queue and the key-value store and reads it back later. +# Those storages hold JSON, so the payload is serialized as JSON here rather than as a pickled +# Python object graph: JSON is a plain, portable, interoperable data format and carries no +# executable object state. +# +# JSON cannot represent everything Scrapy emits. Of the values that actually appear, only two are +# not natively JSON-serializable, and both sit at fixed, known keys with a known type: +# - `body`: `bytes` +# - `headers`: a `{bytes: [bytes]}` mapping (header name -> list of values) +# These are base64-encoded in place. Pydantic models (e.g. Crawlee's `UserData`, which the Apify +# integration injects into `meta['userData']`) are converted via `model_dump()`. Everything else — +# notably the user-controlled `meta` and `cb_kwargs` — must already be JSON-serializable; if it is +# not, serialization fails with a clear error rather than silently dropping the request. No in-band +# sentinel is used for user data, so no legitimate value can collide with the encoding scheme. + + +def encode_to_json(data: dict[str, Any]) -> str: + """Serialize a Scrapy request/response dict to a JSON string. + + The binary `body` and `headers` fields are base64-encoded in place. All other fields must be + JSON-serializable; pydantic models are dumped to plain dicts. A clear `TypeError` is raised if + any remaining value (typically something in `meta` or `cb_kwargs`) cannot be JSON-encoded. + + Args: + data: The dict to serialize, e.g. the output of `scrapy.Request.to_dict()`. + + Returns: + The JSON-encoded string. + """ + if not isinstance(data, dict): + raise TypeError(f'Expected a dict to serialize, got {type(data)}') + + safe = dict(data) + + if isinstance(safe.get('body'), bytes): + safe['body'] = base64.b64encode(safe['body']).decode('ascii') + + if isinstance(safe.get('headers'), dict): + safe['headers'] = _encode_headers(safe['headers']) + + try: + return json.dumps(safe, default=_json_default) + except TypeError as exc: + raise TypeError( + 'Failed to JSON-serialize a Scrapy request/response for storage on the Apify platform. ' + 'All values in `meta` and `cb_kwargs` must be JSON-serializable (str, int, float, bool, None, ' + 'list, dict, or a pydantic model).' + ) from exc + + +def decode_from_json(text: str) -> Any: + """Reconstruct a Scrapy request/response dict from a string produced by `encode_to_json`. + + The base64-encoded `body` and `headers` fields are decoded back to their `bytes` representation. + + Args: + text: The JSON-encoded string. + + Returns: + The decoded object (a dict for valid request/response payloads). + """ + data = json.loads(text) + if not isinstance(data, dict): + return data + + if isinstance(data.get('body'), str): + data['body'] = base64.b64decode(data['body']) + + if isinstance(data.get('headers'), dict): + data['headers'] = _decode_headers(data['headers']) + + return data + + +def _json_default(obj: Any) -> Any: + """Fallback for values `json.dumps` cannot serialize on its own. + + Only pydantic models are accepted (and dumped to plain dicts); anything else raises, which + `encode_to_json` turns into an actionable error. + """ + if isinstance(obj, BaseModel): + return obj.model_dump(by_alias=True) + raise TypeError(f'Object of type {type(obj).__name__} is not JSON-serializable') + + +def _encode_headers(headers: dict[Any, Any]) -> dict[str, list[str]]: + """Encode a Scrapy `{bytes: [bytes]}` headers mapping to a JSON-safe `{str: [base64-str]}`.""" + encoded: dict[str, list[str]] = {} + for key, value in headers.items(): + str_key = key.decode('latin-1') if isinstance(key, bytes) else key + values = value if isinstance(value, (list, tuple)) else [value] + encoded[str_key] = [_b64encode_value(item) for item in values] + return encoded + + +def _decode_headers(headers: dict[str, Any]) -> dict[bytes, list[bytes]]: + """Reverse `_encode_headers`, restoring the `{bytes: [bytes]}` mapping Scrapy expects.""" + decoded: dict[bytes, list[bytes]] = {} + for key, value in headers.items(): + bytes_key = key.encode('latin-1') if isinstance(key, str) else key + values = value if isinstance(value, list) else [value] + decoded[bytes_key] = [base64.b64decode(item) for item in values] + return decoded + + +def _b64encode_value(value: Any) -> str: + """Base64-encode a single header value, coercing non-bytes values to bytes first.""" + raw = value if isinstance(value, bytes) else str(value).encode('utf-8') + return base64.b64encode(raw).decode('ascii') diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 14d8753d3..062d315b1 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -2,7 +2,6 @@ import gzip import io -import pickle import re import struct from logging import getLogger @@ -14,6 +13,7 @@ from apify import Configuration from apify.scrapy._async_thread import AsyncThread +from apify.scrapy._serialization import decode_from_json, encode_to_json from apify.storage_clients import ApifyStorageClient from apify.storages import KeyValueStore @@ -36,7 +36,10 @@ class ApifyCacheStorage: """ def __init__(self, settings: BaseSettings) -> None: - self._expiration_max_items = 100 + # Upper bound on how many keys the per-spider-close cleanup sweeps. Configurable because the + # cleanup is bounded and best-effort (see `close_spider`); the default keeps the historical + # behavior. + self._expiration_max_items: int = settings.getint('APIFY_HTTPCACHE_EXPIRATION_MAX_ITEMS', 100) self._expiration_secs: int = settings.getint('HTTPCACHE_EXPIRATION_SECS') self._spider: Spider | None = None self._kvs: KeyValueStore | None = None @@ -79,8 +82,15 @@ def close_spider(self, _: Spider, current_time: int | None = None) -> None: async def expire_kvs() -> None: if self._kvs is None: raise ValueError('Key value store not initialized') - i = 0 + # Best-effort, bounded cleanup: at most `_expiration_max_items` keys are swept per + # spider close and `iterate_keys()` order is not guaranteed, so stale entries may + # linger across runs. This only reclaims storage; correctness is handled at read + # time, where `retrieve_response` treats an expired entry as a cache miss. + processed = 0 async for item in self._kvs.iterate_keys(): + if processed >= self._expiration_max_items: + break + processed += 1 value = await self._kvs.get_value(item.key) try: gzip_time = read_gzip_time(value) @@ -93,9 +103,6 @@ async def expire_kvs() -> None: await self._kvs.set_value(item.key, None) else: logger.debug(f'Valid cache item {item.key}') - if i == self._expiration_max_items: - break - i += 1 self._async_thread.run_coro(expire_kvs()) @@ -127,11 +134,18 @@ def retrieve_response(self, _: Spider, request: Request, current_time: int | Non if current_time is None: current_time = int(time()) - if 0 < self._expiration_secs < current_time - read_gzip_time(value): - logger.debug('Cache expired', extra={'request': request}) + + # A malformed or legacy (e.g. pickle-format) cache entry must not crash retrieval. Treat it + # as a cache miss so Scrapy re-fetches and re-stores it in the current format. + try: + if 0 < self._expiration_secs < current_time - read_gzip_time(value): + logger.debug('Cache expired', extra={'request': request}) + return None + data = from_gzip(value) + except Exception as exc: + logger.warning(f'Ignoring malformed cache entry {key!r}: {exc}', extra={'request': request}) return None - data = from_gzip(value) url = data['url'] status = data['status'] headers = Headers(data['headers']) @@ -162,18 +176,26 @@ def store_response(self, _: Spider, request: Request, response: Response) -> Non def to_gzip(data: dict, mtime: int | None = None) -> bytes: - """Dump a dictionary to a gzip-compressed byte stream.""" + """Dump a dictionary to a gzip-compressed byte stream as JSON. + + Cached entries are stored in and read back from the Apify key-value store, which holds JSON, so + the payload is serialized as JSON rather than pickled. See `apify.scrapy._serialization` for the + encoding details. + """ + payload = encode_to_json(data).encode('utf-8') with io.BytesIO() as byte_stream: with gzip.GzipFile(fileobj=byte_stream, mode='wb', mtime=mtime) as gzip_file: - pickle.dump(data, gzip_file, protocol=4) + gzip_file.write(payload) return byte_stream.getvalue() def from_gzip(gzip_bytes: bytes) -> dict: - """Load a dictionary from a gzip-compressed byte stream.""" + """Load a dictionary from a gzip-compressed JSON byte stream.""" with io.BytesIO(gzip_bytes) as byte_stream, gzip.GzipFile(fileobj=byte_stream, mode='rb') as gzip_file: - data: dict = pickle.load(gzip_file) - return data + data = decode_from_json(gzip_file.read().decode('utf-8')) + if not isinstance(data, dict): + raise TypeError(f'Expected a dict from the cached payload, got {type(data)}') + return data def read_gzip_time(gzip_bytes: bytes) -> int: diff --git a/src/apify/scrapy/middlewares/apify_proxy.py b/src/apify/scrapy/middlewares/apify_proxy.py index acaf7d86e..2f2f22cc0 100644 --- a/src/apify/scrapy/middlewares/apify_proxy.py +++ b/src/apify/scrapy/middlewares/apify_proxy.py @@ -7,7 +7,7 @@ from scrapy.exceptions import NotConfigured from apify import Actor, ProxyConfiguration -from apify.scrapy import get_basic_auth_header +from apify.scrapy.utils import get_basic_auth_header if TYPE_CHECKING: from scrapy import Request, Spider @@ -30,7 +30,6 @@ def __init__(self, proxy_settings: dict) -> None: Args: proxy_settings: Dictionary containing proxy settings, provided by the Actor input. - auth_encoding: Encoding for basic authentication (default is 'latin-1'). """ self._proxy_settings = proxy_settings self._proxy_cfg_internal: ProxyConfiguration | None = None @@ -111,7 +110,7 @@ def process_exception( if isinstance(exception, TunnelError): Actor.log.warning( f'ApifyHttpProxyMiddleware: TunnelError occurred for request="{request}", ' - 'reason="{exception}", skipping...' + f'reason="{exception}", skipping...' ) async def _get_new_proxy_url(self) -> ParseResult: diff --git a/src/apify/scrapy/requests.py b/src/apify/scrapy/requests.py index dd40482d4..2724107a4 100644 --- a/src/apify/scrapy/requests.py +++ b/src/apify/scrapy/requests.py @@ -1,7 +1,7 @@ from __future__ import annotations import codecs -import pickle +import sys from logging import getLogger from typing import Any, cast @@ -13,11 +13,41 @@ from crawlee._request import UserData from crawlee._types import HttpHeaders +from ._serialization import decode_from_json, encode_to_json from apify import Request as ApifyRequest logger = getLogger(__name__) +def _ensure_known_request_class(request_dict: dict[str, Any]) -> None: + """Validate the optional `_class` entry before `request_from_dict` resolves it. + + `scrapy.utils.request.request_from_dict` resolves a `_class` entry via `load_object`, which + imports the dotted path it is given. To keep reconstruction self-contained — importing nothing + that the running spider has not already imported — we only accept a `_class` that is already + present in `sys.modules` and is a `scrapy.Request` subclass. + + A spider that reads its own requests always has its request classes imported by the time the + requests are reconstructed, so this does not restrict legitimate use. + """ + class_path = request_dict.get('_class') + if class_path is None: + return + + if not isinstance(class_path, str): + raise TypeError(f'Invalid scrapy_request `_class`, expected a string, got {type(class_path)}') + + module_name, _, class_name = class_path.rpartition('.') + module = sys.modules.get(module_name) if module_name else None + request_cls = getattr(module, class_name, None) if module is not None else None + + if not (isinstance(request_cls, type) and issubclass(request_cls, ScrapyRequest)): + raise TypeError( + f'Refusing to reconstruct a Scrapy request of type {class_path!r}: it is not an already-imported ' + f'scrapy.Request subclass.' + ) + + def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequest | None: """Convert a Scrapy request to an Apify request. @@ -35,6 +65,13 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ logger.debug(f'to_apify_request was called (scrapy_request={scrapy_request})...') # Configuration to behave as similarly as possible to Scrapy's default RFPDupeFilter. + # + # `payload` carries the request body, which is used both for platform processing and for + # computing the extended unique key. The body is also part of the serialized Scrapy request + # stored further below, where it is needed to faithfully reconstruct the request. Both copies + # originate from `scrapy_request.body` and are kept intentionally: dropping `payload` would + # change deduplication, and dropping the serialized copy would couple reconstruction to the + # Apify payload. request_kwargs: dict[str, Any] = { 'url': scrapy_request.url, 'method': scrapy_request.method, @@ -67,28 +104,48 @@ def to_apify_request(scrapy_request: ScrapyRequest, spider: Spider) -> ApifyRequ request_kwargs['user_data'] = user_data if isinstance(user_data, dict) else {} - # Convert Scrapy's headers to a HttpHeaders and store them in the apify_request + # Convert Scrapy's headers to HttpHeaders and store them on the apify_request. This is only + # the Apify-platform-level view of the headers; the authoritative copy, with exact bytes, + # travels inside the serialized scrapy_request below. `to_unicode_dict()` decodes as UTF-8 + # and raises on non-UTF-8 header values, so it is guarded: a request with binary headers + # keeps them in the serialized payload instead of being dropped entirely. if isinstance(scrapy_request.headers, Headers): - request_kwargs['headers'] = HttpHeaders(dict(scrapy_request.headers.to_unicode_dict())) + try: + request_kwargs['headers'] = HttpHeaders(dict(scrapy_request.headers.to_unicode_dict())) + except UnicodeDecodeError: + logger.warning( + 'Could not represent Scrapy request headers as Apify request headers (non-UTF-8 values); ' + 'they are preserved in the serialized request instead.' + ) else: logger.warning( f'Invalid scrapy_request.headers type, not scrapy.http.headers.Headers: {scrapy_request.headers}' ) apify_request = ApifyRequest.from_url(**request_kwargs) - - # Serialize the Scrapy ScrapyRequest and store it in the apify_request. - # - This process involves converting the Scrapy ScrapyRequest object into a dictionary, encoding it to base64, - # and storing it as 'scrapy_request' within the 'userData' dictionary of the apify_request. - # - The serialization process can be referenced at: https://stackoverflow.com/questions/30469575/. scrapy_request_dict = scrapy_request.to_dict(spider=spider) - scrapy_request_dict_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode() - apify_request.user_data['scrapy_request'] = scrapy_request_dict_encoded except Exception as exc: logger.warning(f'Conversion of Scrapy request {scrapy_request} to Apify request failed; {exc}') return None + # Serialize the Scrapy request and store it (base64-encoded JSON) under 'scrapy_request' in the + # Apify request's user data. This is intentionally outside the broad except above so that a + # non-JSON-serializable value in `meta`/`cb_kwargs` is reported loudly rather than hidden as a + # generic warning. The failure is logged with a full traceback and the request is skipped (None + # is returned, honoring this function's contract) instead of crashing the whole crawl. See + # `_serialization` for the encoding details. + try: + scrapy_request_json = encode_to_json(scrapy_request_dict) + except TypeError: + logger.exception( + f'Failed to serialize Scrapy request {scrapy_request} for storage on the Apify platform; skipping it. ' + 'Ensure all values in `meta` and `cb_kwargs` are JSON-serializable.' + ) + return None + + apify_request.user_data['scrapy_request'] = codecs.encode(scrapy_request_json.encode('utf-8'), 'base64').decode() + logger.debug(f'scrapy_request was converted to the apify_request={apify_request}') return apify_request @@ -101,14 +158,15 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ spider: The Scrapy spider that the request is associated with. Raises: - TypeError: If the Apify request is not an instance of the `ApifyRequest` class. - ValueError: If the Apify request does not contain the required keys. + TypeError: If `apify_request` is not an `ApifyRequest`, if the stored Scrapy request payload + is malformed, or if its `_class` does not refer to an already-imported `scrapy.Request` + subclass. Returns: The converted Scrapy request. """ if not isinstance(cast('Any', apify_request), ApifyRequest): - raise TypeError('apify_request must be a crawlee.ScrapyRequest instance') + raise TypeError('apify_request must be an apify.Request instance') logger.debug(f'to_scrapy_request was called (apify_request={apify_request})...') @@ -123,10 +181,14 @@ def to_scrapy_request(apify_request: ApifyRequest, spider: Spider) -> ScrapyRequ if not isinstance(scrapy_request_dict_encoded, str): raise TypeError('scrapy_request_dict_encoded must be a string') - scrapy_request_dict = pickle.loads(codecs.decode(scrapy_request_dict_encoded.encode(), 'base64')) + scrapy_request_json = codecs.decode(scrapy_request_dict_encoded.encode(), 'base64').decode('utf-8') + scrapy_request_dict = decode_from_json(scrapy_request_json) if not isinstance(scrapy_request_dict, dict): raise TypeError('scrapy_request_dict must be a dictionary') + # Validate any `_class` entry before request_from_dict resolves and imports it. + _ensure_known_request_class(scrapy_request_dict) + scrapy_request = request_from_dict(scrapy_request_dict, spider=spider) if not isinstance(scrapy_request, ScrapyRequest): raise TypeError('scrapy_request must be an instance of the ScrapyRequest class') diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index dc3add93f..5aa721ddd 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -170,6 +170,14 @@ def next_request(self) -> Request | None: traceback.print_exc() raise - scrapy_request = to_scrapy_request(apify_request, spider=self.spider) + # Reconstruct the Scrapy request. A malformed queue entry (e.g. an unknown `_class` or a + # payload that no longer parses) must not crash the whole run: it has already been marked + # handled above, so log it and skip it by returning None instead of propagating. + try: + scrapy_request = to_scrapy_request(apify_request, spider=self.spider) + except Exception: + logger.exception(f'Failed to convert Apify request {apify_request} to a Scrapy request; skipping it.') + return None + logger.debug(f'Converted to scrapy_request: {scrapy_request}') return scrapy_request diff --git a/tests/unit/scrapy/extensions/test_httpcache.py b/tests/unit/scrapy/extensions/test_httpcache.py index 8e250aa05..4cb52fc2a 100644 --- a/tests/unit/scrapy/extensions/test_httpcache.py +++ b/tests/unit/scrapy/extensions/test_httpcache.py @@ -1,3 +1,7 @@ +import gzip +import io +import json +import pickle from time import time import pytest @@ -6,9 +10,10 @@ FIXTURE_DICT = {'name': 'Alice'} +# Gzip-compressed JSON (the pickle-free format) of FIXTURE_DICT with mtime=0. FIXTURE_BYTES = ( - b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xffk`\x99*\xcc\x00\x01\xb5SzX\xf2\x12s' - b'S\xa7\xf4\xb0:\xe6d&\xa7N)\xd6\x03\x00\x1c\xe8U\x9c\x1e\x00\x00\x00' + b'\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff\xabV\xcaK\xccMU\xb2RPr\xcc\xc9LNU' + b'\xaa\x05\x00\x03\x9a\x9d\xb0\x11\x00\x00\x00' ) @@ -39,6 +44,32 @@ def test_read_gzip_time_non_zero() -> None: assert read_gzip_time(data_bytes) == current_time +def test_gzip_round_trips_binary_response() -> None: + """A cached response with a binary body and Scrapy-style bytes headers round-trips.""" + data = { + 'status': 200, + 'url': 'https://example.com', + 'headers': {b'Content-Type': [b'text/html'], b'X-Bin': [b'\x00\xff']}, + 'body': b'\xff\xfe', + } + + assert from_gzip(to_gzip(data)) == data + + +def test_from_gzip_rejects_pickle_payload() -> None: + """Cache entries are stored as gzip-compressed JSON; a pickle payload is not valid JSON. + + The loader must reject such a payload rather than load it. + """ + with io.BytesIO() as byte_stream: + with gzip.GzipFile(fileobj=byte_stream, mode='wb') as gzip_file: + pickle.dump({'status': 200, 'body': b'x'}, gzip_file, protocol=4) + pickle_payload = byte_stream.getvalue() + + with pytest.raises((UnicodeDecodeError, json.JSONDecodeError, ValueError)): + from_gzip(pickle_payload) + + @pytest.mark.parametrize( ('spider_name', 'expected'), [ diff --git a/tests/unit/scrapy/requests/test_to_apify_request.py b/tests/unit/scrapy/requests/test_to_apify_request.py index d3aef6b2a..19013af48 100644 --- a/tests/unit/scrapy/requests/test_to_apify_request.py +++ b/tests/unit/scrapy/requests/test_to_apify_request.py @@ -1,5 +1,7 @@ from __future__ import annotations +import logging + import pytest from scrapy import Request, Spider from scrapy.http.headers import Headers @@ -92,6 +94,22 @@ def test_invalid_scrapy_request_returns_none(spider: Spider) -> None: assert apify_request is None +def test_non_json_serializable_meta_is_skipped(spider: Spider, caplog: pytest.LogCaptureFixture) -> None: + """A non-JSON-serializable value in meta is skipped loudly instead of crashing the crawl. + + The serializer requires `meta`/`cb_kwargs` to be JSON-serializable. When they are not, the request + is skipped (None is returned, honoring the function's contract) and the failure is logged with a + full traceback, rather than the request being silently lost or the whole run crashing. + """ + scrapy_request = Request(url='https://example.com', meta={'tags': {'a', 'b'}}) + + with caplog.at_level(logging.ERROR, logger='apify.scrapy.requests'): + apify_request = to_apify_request(scrapy_request, spider) + + assert apify_request is None + assert any('JSON-serializable' in record.getMessage() for record in caplog.records) + + def test_roundtrip_follow_up_request_with_propagated_userdata(spider: Spider) -> None: """Reproduce: CrawleeRequestData() argument after ** must be a mapping, not CrawleeRequestData. diff --git a/tests/unit/scrapy/requests/test_to_scrapy_request.py b/tests/unit/scrapy/requests/test_to_scrapy_request.py index fadb25249..ffe1065d8 100644 --- a/tests/unit/scrapy/requests/test_to_scrapy_request.py +++ b/tests/unit/scrapy/requests/test_to_scrapy_request.py @@ -1,14 +1,17 @@ from __future__ import annotations import binascii +import codecs +import json +import pickle import pytest -from scrapy import Request, Spider +from scrapy import FormRequest, Request, Spider from crawlee._types import HttpHeaders from apify import Request as ApifyRequest -from apify.scrapy.requests import to_scrapy_request +from apify.scrapy.requests import to_apify_request, to_scrapy_request class DummySpider(Spider): @@ -21,6 +24,21 @@ def spider() -> DummySpider: return DummySpider() +# Base64-encoded JSON fixture (the pickle-free format) for a GET request to https://apify.com. +_SCRAPY_REQUEST_JSON_ENCODED = ( + 'eyJ1cmwiOiAiaHR0cHM6Ly9hcGlmeS5jb20iLCAiY2FsbGJhY2siOiBudWxsLCAiZXJyYmFjayI6\n' + 'IG51bGwsICJoZWFkZXJzIjogeyJBY2NlcHQiOiBbImRHVjRkQzlvZEcxc0xHRndjR3hwWTJGMGFX\n' + 'OXVMM2hvZEcxc0szaHRiQ3hoY0hCc2FXTmhkR2x2Ymk5NGJXdzdjVDB3TGprc0tpOHFPM0U5TUM0\n' + 'NCJdLCAiQWNjZXB0LUxhbmd1YWdlIjogWyJaVzQ9Il0sICJVc2VyLUFnZW50IjogWyJVMk55WVhC\n' + 'NUx6SXVNVEV1TUNBb0syaDBkSEJ6T2k4dmMyTnlZWEI1TG05eVp5az0iXSwgIkFjY2VwdC1FbmNv\n' + 'ZGluZyI6IFsiWjNwcGNDd2daR1ZtYkdGMFpRPT0iXX0sICJib2R5IjogIiIsICJjb29raWVzIjog' + 'e30sICJtZXRhIjogeyJhcGlmeV9yZXF1ZXN0X2lkIjogImZ2d3NjTzJVSkxkcjEwQiIsICJhcGlm' + 'eV9yZXF1ZXN0X3VuaXF1ZV9rZXkiOiAiaHR0cHM6Ly9hcGlmeS5jb20ifSwgImVuY29kaW5nIjog' + 'InV0Zi04IiwgImZsYWdzIjogW10sICJjYl9rd2FyZ3MiOiB7fSwgImRvbnRfZmlsdGVyIjogZmFs' + 'c2UsICJtZXRob2QiOiAiR0VUIiwgInByaW9yaXR5IjogMH0=\n' +) + + def test_without_reconstruction(spider: Spider) -> None: # Without reconstruction of encoded Scrapy request apify_request = ApifyRequest( @@ -62,13 +80,13 @@ def test_without_reconstruction_with_optional_fields(spider: Spider) -> None: def test_with_reconstruction(spider: Spider) -> None: - # With reconstruction of encoded Scrapy request + # With reconstruction of JSON-encoded Scrapy request apify_request = ApifyRequest( url='https://apify.com', method='GET', unique_key='https://apify.com', user_data={ - 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501 + 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED, }, ) @@ -82,7 +100,7 @@ def test_with_reconstruction(spider: Spider) -> None: def test_with_reconstruction_with_optional_fields(spider: Spider) -> None: - # With reconstruction of encoded Scrapy request + # With reconstruction of JSON-encoded Scrapy request apify_request = ApifyRequest( url='https://apify.com', method='GET', @@ -90,7 +108,7 @@ def test_with_reconstruction_with_optional_fields(spider: Spider) -> None: headers=HttpHeaders({'Authorization': 'Bearer access_token'}), user_data={ 'some_user_data': 'hello', - 'scrapy_request': 'gASVJgIAAAAAAAB9lCiMA3VybJSMEWh0dHBzOi8vYXBpZnkuY29tlIwIY2FsbGJhY2uUTowHZXJy\nYmFja5ROjAdoZWFkZXJzlH2UKEMGQWNjZXB0lF2UQz90ZXh0L2h0bWwsYXBwbGljYXRpb24veGh0\nbWwreG1sLGFwcGxpY2F0aW9uL3htbDtxPTAuOSwqLyo7cT0wLjiUYUMPQWNjZXB0LUxhbmd1YWdl\nlF2UQwJlbpRhQwpVc2VyLUFnZW50lF2UQyNTY3JhcHkvMi4xMS4wICgraHR0cHM6Ly9zY3JhcHku\nb3JnKZRhQw9BY2NlcHQtRW5jb2RpbmeUXZRDDWd6aXAsIGRlZmxhdGWUYXWMBm1ldGhvZJSMA0dF\nVJSMBGJvZHmUQwCUjAdjb29raWVzlH2UjARtZXRhlH2UKIwQYXBpZnlfcmVxdWVzdF9pZJSMD2Z2\nd3NjTzJVSkxkcjEwQpSMGGFwaWZ5X3JlcXVlc3RfdW5pcXVlX2tleZSMEWh0dHBzOi8vYXBpZnku\nY29tlIwQZG93bmxvYWRfdGltZW91dJRHQGaAAAAAAACMDWRvd25sb2FkX3Nsb3SUjAlhcGlmeS5j\nb22UjBBkb3dubG9hZF9sYXRlbmN5lEc/tYIIAAAAAHWMCGVuY29kaW5nlIwFdXRmLTiUjAhwcmlv\ncml0eZRLAIwLZG9udF9maWx0ZXKUiYwFZmxhZ3OUXZSMCWNiX2t3YXJnc5R9lHUu\n', # noqa: E501 + 'scrapy_request': _SCRAPY_REQUEST_JSON_ENCODED, }, ) @@ -119,3 +137,216 @@ def test_invalid_request_for_reconstruction(spider: Spider) -> None: with pytest.raises(binascii.Error): to_scrapy_request(apify_request, spider) + + +def test_pickle_payload_rejected(spider: Spider) -> None: + """Data stored under 'scrapy_request' is JSON; a pickle-encoded payload is not valid JSON. + + The reconstruction path must reject such a payload rather than deserialize it. + """ + # Build a pickle payload like the old code produced. + scrapy_request_dict = { + 'url': 'https://example.com', + 'callback': None, + 'errback': None, + 'headers': {}, + 'method': 'GET', + 'body': b'', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + } + pickle_encoded = codecs.encode(pickle.dumps(scrapy_request_dict), 'base64').decode() + + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': pickle_encoded}, + ) + + # The JSON-based reconstruction must reject the pickle payload. + with pytest.raises((json.JSONDecodeError, UnicodeDecodeError, ValueError)): + to_scrapy_request(apify_request, spider) + + +def test_roundtrip_serialization(spider: Spider) -> None: + """Verify that to_apify_request -> to_scrapy_request roundtrip works with JSON encoding.""" + original_request = Request( + url='https://example.com/test', + method='POST', + body=b'test body content', + headers={'Content-Type': 'application/json', 'X-Custom': 'value'}, + meta={'userData': {'custom_key': 'custom_value'}}, + ) + + apify_request = to_apify_request(original_request, spider) + assert apify_request is not None + + # Verify the encoded data is valid JSON (not pickle) + encoded = apify_request.user_data['scrapy_request'] + assert isinstance(encoded, str) + decoded_bytes = codecs.decode(encoded.encode(), 'base64') + decoded_json = json.loads(decoded_bytes.decode('utf-8')) + assert isinstance(decoded_json, dict) + assert decoded_json['url'] == 'https://example.com/test' + + # Reconstruct the Scrapy request + restored = to_scrapy_request(apify_request, spider) + assert isinstance(restored, Request) + assert restored.url == original_request.url + assert restored.method == original_request.method + assert restored.body == original_request.body + + +def test_no_pickle_in_serialized_output(spider: Spider) -> None: + """Confirm that to_apify_request never produces pickle-serialized output.""" + scrapy_request = Request(url='https://example.com') + apify_request = to_apify_request(scrapy_request, spider) + assert apify_request is not None + + encoded = apify_request.user_data['scrapy_request'] + assert isinstance(encoded, str) + raw_bytes = codecs.decode(encoded.encode(), 'base64') + + # Pickle protocol 4 starts with b'\x80\x04'; JSON starts with b'{' + assert not raw_bytes.startswith(b'\x80'), 'Output must not be pickle-serialized' + # Verify it's valid JSON + json.loads(raw_bytes.decode('utf-8')) + + +def _encode_request_dict(request_dict: dict) -> str: + """Encode a raw request dict the same way `to_apify_request` does (base64-encoded JSON).""" + return codecs.encode(json.dumps(request_dict).encode('utf-8'), 'base64').decode() + + +def test_binary_body_round_trips(spider: Spider) -> None: + """Non-UTF-8 bytes in the request body survive the JSON roundtrip (base64-encoded).""" + original = Request( + url='https://example.com', + method='POST', + body=b'\x00\x01\x02\xff\xfe binary', + headers={'Content-Type': 'application/octet-stream'}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert restored.body == b'\x00\x01\x02\xff\xfe binary' + assert restored.headers.get('Content-Type') == b'application/octet-stream' + + +def test_binary_headers_round_trip_and_request_not_dropped(spider: Spider) -> None: + """A request with non-UTF-8 header values is not dropped; its headers survive the roundtrip. + + The Apify-request-level headers can only hold UTF-8-decodable values, so binary header values are + preserved inside the serialized Scrapy request instead. The conversion must still succeed (return + a request, not None) and the exact header bytes must come back. + """ + original = Request( + url='https://example.com', + headers={b'Accept': b'text/html', b'X-Bin': b'\xff\xfe\x00'}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None # must not be dropped + + restored = to_scrapy_request(apify_request, spider) + assert restored.headers.get(b'X-Bin') == b'\xff\xfe\x00' + assert restored.headers.get(b'Accept') == b'text/html' + + +def test_userdata_with_b64_sentinel_key_round_trips(spider: Spider) -> None: + """A user dict that happens to look like a bytes wrapper must round-trip unchanged. + + The encoder uses no in-band sentinel for user data, so an arbitrary value such as + ``{"__b64__": "..."}`` in `meta` is preserved exactly instead of being reinterpreted. + """ + original = Request( + url='https://example.com', + meta={'userData': {}, 'looks_like_sentinel': {'__b64__': 'not really base64 !!!'}}, + ) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert restored.meta['looks_like_sentinel'] == {'__b64__': 'not really base64 !!!'} + + +def test_already_imported_request_subclass_round_trips(spider: Spider) -> None: + """A `_class` referring to an already-imported `scrapy.Request` subclass is reconstructed.""" + original = FormRequest(url='https://example.com', formdata={'key': 'value'}) + + apify_request = to_apify_request(original, spider) + assert apify_request is not None + + restored = to_scrapy_request(apify_request, spider) + assert isinstance(restored, FormRequest) + assert restored.method == 'POST' + + +def test_non_request_class_is_rejected(spider: Spider) -> None: + """A `_class` that resolves to something other than a `scrapy.Request` subclass is rejected. + + `scrapy.utils.request.request_from_dict` resolves `_class` via `load_object`, which imports the + dotted path it is given. Reconstruction only accepts an already-imported `scrapy.Request` + subclass; anything else (here a plain `dict`) is rejected. + """ + request_dict = { + 'url': 'https://example.com', + 'callback': None, + 'errback': None, + 'headers': {}, + 'body': '', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': 'builtins.dict', + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + with pytest.raises(TypeError, match='not an already-imported'): + to_scrapy_request(apify_request, spider) + + +def test_class_referring_to_unimported_module_is_rejected(spider: Spider) -> None: + """A `_class` whose module is not already imported is rejected without importing it.""" + request_dict = { + 'url': 'https://example.com', + 'headers': {}, + 'body': '', + 'cookies': {}, + 'meta': {}, + 'encoding': 'utf-8', + 'priority': 0, + 'dont_filter': False, + 'flags': [], + 'cb_kwargs': {}, + 'method': 'GET', + '_class': 'definitely_not_imported_pkg.some_module.SomeRequest', + } + apify_request = ApifyRequest( + url='https://example.com', + method='GET', + unique_key='https://example.com', + user_data={'scrapy_request': _encode_request_dict(request_dict)}, + ) + + with pytest.raises(TypeError, match='not an already-imported'): + to_scrapy_request(apify_request, spider)