Item Pipeline
After an item has been scraped by a spider, it is sent to the Item Pipeline which processes it through several components that are executed sequentially.
Each item pipeline component (sometimes referred as just “Item Pipeline”) is a Python class that implements a simple method. They receive an item and perform an action over it, also deciding if the item should continue through the pipeline or be dropped and no longer processed.
Typical uses of item pipelines are:
cleansing HTML data
validating scraped data (checking that the items contain certain fields)
checking for duplicates (and dropping them)
storing the scraped item in a database
Writing your own item pipeline
Each item pipeline is a component that must implement the following method:
- process_item(self, item)
This method is called for every item pipeline component.
item is an item object, see Supporting All Item Types.
process_item()must either return an item object or raise aDropItemexception.Dropped items are no longer processed by further pipeline components.
- Parameters:
item (item object) – the scraped item
Additionally, they may also implement the following methods:
- open_spider(self)
This method is called when the spider is opened.
- close_spider(self)
This method is called when the spider is closed.
Any of these methods may be defined as a coroutine function (async def).
Item pipeline example
Price validation and dropping items with no prices
Let’s take a look at the following hypothetical pipeline that adjusts the
price attribute for those items that do not include VAT
(price_excludes_vat attribute), and drops those items which don’t
contain a price:
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class PricePipeline:
vat_factor = 1.15
def process_item(self, item):
adapter = ItemAdapter(item)
if adapter.get("price"):
if adapter.get("price_excludes_vat"):
adapter["price"] = adapter["price"] * self.vat_factor
return item
else:
raise DropItem("Missing price")
Write items to a JSON lines file
The following pipeline stores all scraped items (from all spiders) into a
single items.jsonl file, containing one item per line serialized in JSON
format:
import json
from itemadapter import ItemAdapter
class JsonWriterPipeline:
def open_spider(self):
self.file = open("items.jsonl", "w")
def close_spider(self):
self.file.close()
def process_item(self, item):
line = json.dumps(ItemAdapter(item).asdict()) + "\n"
self.file.write(line)
return item
Note
The purpose of JsonWriterPipeline is just to introduce how to write item pipelines. If you really want to store all scraped items into a JSON file you should use the Feed exports.
Write items to MongoDB
In this example we’ll write items to MongoDB using pymongo. MongoDB address and database name are specified in Scrapy settings; MongoDB collection is named after item class.
The main point of this example is to show how to get the crawler and how to clean up the resources properly.
import pymongo
from itemadapter import ItemAdapter
class MongoPipeline:
collection_name = "scrapy_items"
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
)
def open_spider(self):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self):
self.client.close()
def process_item(self, item):
self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
return item
Take screenshot of item
This example demonstrates how to use coroutine syntax in
the process_item() method.
This item pipeline makes a request to a locally-running instance of Splash to render a screenshot of the item URL. After the request response is downloaded, the item pipeline saves the screenshot to a file and adds the filename to the item.
import hashlib
from pathlib import Path
from urllib.parse import quote
import scrapy
from itemadapter import ItemAdapter
from scrapy.http.request import NO_CALLBACK
class ScreenshotPipeline:
"""Pipeline that uses Splash to render screenshot of
every Scrapy item."""
SPLASH_URL = "http://localhost:8050/render.png?url={}"
def __init__(self, crawler):
self.crawler = crawler
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
async def process_item(self, item):
adapter = ItemAdapter(item)
encoded_item_url = quote(adapter["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url, callback=NO_CALLBACK)
response = await self.crawler.engine.download_async(request)
if response.status != 200:
# Error happened, return item.
return item
# Save screenshot to file, filename will be hash of url.
url = adapter["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = f"{url_hash}.png"
Path(filename).write_bytes(response.body)
# Store filename in item.
adapter["screenshot_filename"] = filename
return item
Duplicates filter
A filter that looks for duplicate items, and drops those items that were already processed. Let’s say that our items have a unique id, but our spider returns multiples items with the same id:
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class DuplicatesPipeline:
def __init__(self):
self.ids_seen = set()
def process_item(self, item):
adapter = ItemAdapter(item)
if adapter["id"] in self.ids_seen:
raise DropItem(f"Item ID already seen: {adapter['id']}")
else:
self.ids_seen.add(adapter["id"])
return item
Activating an Item Pipeline component
To activate an Item Pipeline component you must add its class to the
ITEM_PIPELINES setting, like in the following example:
ITEM_PIPELINES = {
"myproject.pipelines.PricePipeline": 300,
"myproject.pipelines.JsonWriterPipeline": 800,
}
The integer values you assign to classes in this setting determine the order in which they run: items go through from lower valued to higher valued classes. It’s customary to define these numbers in the 0-1000 range.
A complete example
The examples above show item pipeline components on their own. In a project, a
pipeline is one of four pieces that work together: the item your spider produces, the spider that
yields it, the pipeline that processes it, and the ITEM_PIPELINES
setting that enables the pipeline.
The following example wires those pieces together to validate the price of
books scraped from books.toscrape.com, reusing the PricePipeline from
Price validation and dropping items with no prices above.
Define the item in myproject/items.py:
from dataclasses import dataclass
@dataclass
class BookItem:
title: str
price: float
Yield instances of that item from your spider, e.g. in
myproject/spiders/books.py:
import scrapy
from myproject.items import BookItem
class BooksSpider(scrapy.Spider):
name = "books"
start_urls = ["https://books.toscrape.com/"]
def parse(self, response):
for book in response.css("article.product_pod"):
yield BookItem(
title=book.css("h3 a::attr(title)").get(),
price=float(book.css("p.price_color::text").re_first(r"[\d.]+")),
)
Put the PricePipeline shown earlier in myproject/pipelines.py, and
enable it in myproject/settings.py:
ITEM_PIPELINES = {
"myproject.pipelines.PricePipeline": 300,
}
With these pieces in place, every BookItem that BooksSpider yields
passes through PricePipeline before it reaches the feed exports or any other output.
Common pitfalls
The pipeline does not run
A pipeline component only runs if its class is listed in the
ITEM_PIPELINES setting, normally in your project’s
settings.py file (see Activating an Item Pipeline component). Adding it to
the spider or elsewhere has no effect.
To confirm that Scrapy loaded your pipeline, look for a line like this near the start of the crawl log:
[scrapy.middleware] INFO: Enabled item pipelines:
['myproject.pipelines.PricePipeline']
If your pipeline is missing from that list, check that its import path matches
the ITEM_PIPELINES entry, and that the setting is not being
overridden, for example by custom_settings or by a
redefinition of ITEM_PIPELINES in settings.py.
The item is not returned
process_item() must return the item (or raise
DropItem). A common mistake is to modify the item but
forget to return it:
def process_item(self, item):
ItemAdapter(item)["price"] *= 1.15
# Bug: returns None, so the next component gets None instead of the item.
Return the item so that the next component, and the rest of Scrapy, can keep processing it:
def process_item(self, item):
ItemAdapter(item)["price"] *= 1.15
return item