Source code for scrapy.downloadermiddlewares.httpcompression

from __future__ import annotations

import warnings
from itertools import chain
from logging import getLogger
from typing import TYPE_CHECKING, List, Optional, Union

from scrapy import Request, Spider, signals
from scrapy.crawler import Crawler
from scrapy.exceptions import IgnoreRequest, NotConfigured
from scrapy.http import Response, TextResponse
from scrapy.responsetypes import responsetypes
from scrapy.statscollectors import StatsCollector
from scrapy.utils._compression import (
    _DecompressionMaxSizeExceeded,
    _inflate,
    _unbrotli,
    _unzstd,
)
from scrapy.utils.deprecate import ScrapyDeprecationWarning
from scrapy.utils.gz import gunzip

if TYPE_CHECKING:
    # typing.Self requires Python 3.11
    from typing_extensions import Self

logger = getLogger(__name__)

ACCEPTED_ENCODINGS: List[bytes] = [b"gzip", b"deflate"]

try:
    try:
        import brotli  # noqa: F401
    except ImportError:
        import brotlicffi  # noqa: F401
except ImportError:
    pass
else:
    ACCEPTED_ENCODINGS.append(b"br")

try:
    import zstandard  # noqa: F401
except ImportError:
    pass
else:
    ACCEPTED_ENCODINGS.append(b"zstd")


[docs]class HttpCompressionMiddleware: """This middleware allows compressed (gzip, deflate) traffic to be sent/received from web sites""" def __init__( self, stats: Optional[StatsCollector] = None, *, crawler: Optional[Crawler] = None, ): if not crawler: self.stats = stats self._max_size = 1073741824 self._warn_size = 33554432 return self.stats = crawler.stats self._max_size = crawler.settings.getint("DOWNLOAD_MAXSIZE") self._warn_size = crawler.settings.getint("DOWNLOAD_WARNSIZE") crawler.signals.connect(self.open_spider, signals.spider_opened) @classmethod def from_crawler(cls, crawler: Crawler) -> Self: if not crawler.settings.getbool("COMPRESSION_ENABLED"): raise NotConfigured try: return cls(crawler=crawler) except TypeError: warnings.warn( "HttpCompressionMiddleware subclasses must either modify " "their '__init__' method to support a 'crawler' parameter or " "reimplement their 'from_crawler' method.", ScrapyDeprecationWarning, ) mw = cls() mw.stats = crawler.stats mw._max_size = crawler.settings.getint("DOWNLOAD_MAXSIZE") mw._warn_size = crawler.settings.getint("DOWNLOAD_WARNSIZE") crawler.signals.connect(mw.open_spider, signals.spider_opened) return mw def open_spider(self, spider): if hasattr(spider, "download_maxsize"): self._max_size = spider.download_maxsize if hasattr(spider, "download_warnsize"): self._warn_size = spider.download_warnsize def process_request( self, request: Request, spider: Spider ) -> Union[Request, Response, None]: request.headers.setdefault("Accept-Encoding", b", ".join(ACCEPTED_ENCODINGS)) return None def process_response( self, request: Request, response: Response, spider: Spider ) -> Union[Request, Response]: if request.method == "HEAD": return response if isinstance(response, Response): content_encoding = response.headers.getlist("Content-Encoding") if content_encoding: max_size = request.meta.get("download_maxsize", self._max_size) warn_size = request.meta.get("download_warnsize", self._warn_size) try: decoded_body, content_encoding = self._handle_encoding( response.body, content_encoding, max_size ) except _DecompressionMaxSizeExceeded: raise IgnoreRequest( f"Ignored response {response} because its body " f"({len(response.body)} B compressed) exceeded " f"DOWNLOAD_MAXSIZE ({max_size} B) during " f"decompression." ) if len(response.body) < warn_size <= len(decoded_body): logger.warning( f"{response} body size after decompression " f"({len(decoded_body)} B) is larger than the " f"download warning size ({warn_size} B)." ) response.headers["Content-Encoding"] = content_encoding if self.stats: self.stats.inc_value( "httpcompression/response_bytes", len(decoded_body), spider=spider, ) self.stats.inc_value( "httpcompression/response_count", spider=spider ) respcls = responsetypes.from_args( headers=response.headers, url=response.url, body=decoded_body ) kwargs = {"cls": respcls, "body": decoded_body} if issubclass(respcls, TextResponse): # force recalculating the encoding until we make sure the # responsetypes guessing is reliable kwargs["encoding"] = None response = response.replace(**kwargs) if not content_encoding: del response.headers["Content-Encoding"] return response def _handle_encoding(self, body, content_encoding, max_size): to_decode, to_keep = self._split_encodings(content_encoding) for encoding in to_decode: body = self._decode(body, encoding, max_size) return body, to_keep def _split_encodings(self, content_encoding): to_keep = [ encoding.strip().lower() for encoding in chain.from_iterable( encodings.split(b",") for encodings in content_encoding ) ] to_decode = [] while to_keep: encoding = to_keep.pop() if encoding not in ACCEPTED_ENCODINGS: to_keep.append(encoding) return to_decode, to_keep to_decode.append(encoding) return to_decode, to_keep def _decode(self, body: bytes, encoding: bytes, max_size: int) -> bytes: if encoding in {b"gzip", b"x-gzip"}: return gunzip(body, max_size=max_size) if encoding == b"deflate": return _inflate(body, max_size=max_size) if encoding == b"br" and b"br" in ACCEPTED_ENCODINGS: return _unbrotli(body, max_size=max_size) if encoding == b"zstd" and b"zstd" in ACCEPTED_ENCODINGS: return _unzstd(body, max_size=max_size) return body