bricks
开发指南
2.6 爬虫基类
2.6.2 Form Spider 表单爬虫

2.6.2 Form Spider 表单爬虫

Form Spider 是基于配置的声明式爬虫,通过配置文件或配置类来定义爬虫行为,无需编写复杂的爬虫逻辑代码。

核心概念

1. 配置驱动

Form Spider 通过配置来驱动爬虫行为:

from dataclasses import dataclass
from bricks.spider.form import Spider, Config, Download, Parse, Pipeline
 
@dataclass
class MyConfig(Config):
    """爬虫配置类"""
    spider = [
        Download(url="https://httpbin.org/json"),
        Parse(func="json", kwargs={"rules": {"args": {"page": "page"}}}),
        Pipeline(func=lambda ctx: print(f"结果: {ctx.items}"), success=True)
    ]
 
class MySpider(Spider):
    @property
    def config(self):
        return MyConfig()

2. 流程节点

Form Spider 支持多种流程节点:

节点类型描述用途
Download下载节点发送 HTTP 请求
Parse解析节点解析响应数据
Pipeline管道节点处理解析结果
Task任务节点执行自定义逻辑
Init初始化节点生成初始种子

配置节点详解

1. Download 节点

用于发送 HTTP 请求:

from bricks.spider.form import Download
 
# 基本下载
Download(url="https://httpbin.org/json")
 
# 带参数的下载
Download(
    url="https://httpbin.org/post",
    method="POST",
    headers={"Content-Type": "application/json"},
    body={"key": "value"},
    timeout=30,
    proxy="http://proxy:8080"
)
 
# 动态URL(使用种子数据)
Download(url="https://api.example.com/page/{page}")
 
# 条件下载
Download(
    url="https://httpbin.org/json",
    condition=lambda ctx: ctx.seeds.get("download", True)
)

2. Parse 节点

用于解析响应数据:

from bricks.spider.form import Parse
 
# JSON解析
Parse(
    func="json",
    kwargs={
        "rules": {
            "data[*]": {
                "id": "id",
                "title": "title",
                "url": "url"
            }
        }
    }
)
 
# XPath解析
Parse(
    func="xpath",
    kwargs={
        "rules": {
            "//article": {
                "title": ".//h2/text()",
                "content": ".//p/text()",
                "link": ".//a/@href"
            }
        }
    }
)
 
# 自定义解析函数
def custom_parser(context):
    response = context.response
    return {"content": response.text[:100]}
 
Parse(func=custom_parser)

3. Pipeline 节点

用于处理解析结果:

from bricks.spider.form import Pipeline
 
# 简单处理
Pipeline(
    func=lambda ctx: print(f"处理了 {len(ctx.items)} 条数据"),
    success=True
)
 
# 复杂处理
def process_items(context):
    items = context.items
    for item in items:
        # 数据清洗
        item["title"] = item.get("title", "").strip()
        # 数据存储
        save_to_database(item)
    return items
 
Pipeline(func=process_items, success=True)
 
# 条件处理
Pipeline(
    func=lambda ctx: print("有数据"),
    condition=lambda ctx: len(ctx.items) > 0,
    success=True
)

4. Task 节点

用于执行自定义任务:

from bricks.spider.form import Task
 
# 提交新种子
def submit_new_seeds(context):
    items = context.items
    new_seeds = []
    for item in items:
        if item.get("has_detail"):
            new_seeds.append({"url": item["detail_url"]})
 
    if new_seeds:
        context.submit(*new_seeds)
 
Task(func=submit_new_seeds)
 
# 条件任务
Task(
    func=lambda ctx: print("执行特殊任务"),
    condition=lambda ctx: ctx.seeds.get("special", False)
)

5. Init 节点

用于生成初始种子:

from bricks.spider.form import Init
 
# 静态种子
Init(func=lambda: [{"page": i} for i in range(1, 6)])
 
# 动态种子
def generate_seeds():
    # 从数据库或API获取种子
    return [{"url": f"https://api.example.com/page/{i}"} for i in range(1, 11)]
 
Init(func=generate_seeds)

完整示例

1. 新闻爬虫

from dataclasses import dataclass
from bricks.spider.form import Spider, Config, Download, Parse, Pipeline, Task
 
@dataclass
class NewsConfig(Config):
    spider = [
        # 下载列表页
        Download(url="https://news.example.com/list?page={page}"),
 
        # 解析列表页
        Parse(
            func="xpath",
            kwargs={
                "rules": {
                    "//article": {
                        "title": ".//h2/text()",
                        "summary": ".//p[@class='summary']/text()",
                        "detail_url": ".//a/@href",
                        "publish_time": ".//time/@datetime"
                    }
                }
            }
        ),
 
        # 提交详情页种子到队列,由其他实例处理
        Task(func=lambda ctx: ctx.submit(*[
            {"url": item["detail_url"], "type": "detail"}
            for item in ctx.items if item.get("detail_url")
        ], call_later=True)),  # call_later=True 表示放入队列等待处理
 
        # 数据处理
        Pipeline(
            func=lambda ctx: [save_news(item) for item in ctx.items],
            success=True
        )
    ]
 
class NewsSpider(Spider):
    @property
    def config(self):
        return NewsConfig()
 
def save_news(item):
    print(f"保存新闻: {item.get('title', 'Unknown')}")
    return item
 
# 运行爬虫
if __name__ == "__main__":
    spider = NewsSpider()
    spider.run(page=1)  # 传递初始参数

2. 电商爬虫

@dataclass
class EcommerceConfig(Config):
    spider = [
        # 搜索商品
        Download(
            url="https://api.shop.com/search",
            params={"q": "{keyword}", "page": "{page}"}
        ),
 
        # 解析商品列表
        Parse(
            func="json",
            kwargs={
                "rules": {
                    "products[*]": {
                        "id": "id",
                        "name": "name",
                        "price": "price",
                        "rating": "rating",
                        "detail_url": "detail_url"
                    }
                }
            }
        ),
 
        # 过滤高评分商品
        Pipeline(
            func=lambda ctx: [
                item for item in ctx.items
                if float(item.get("rating", 0)) >= 4.0
            ]
        ),
 
        # 提交详情页到队列,由其他实例处理
        Task(func=lambda ctx: ctx.submit(*[
            {"url": item["detail_url"], "product_id": item["id"], "type": "detail"}
            for item in ctx.items
        ], call_later=True)),  # 放入队列等待处理
 
        # 保存商品列表信息
        Pipeline(
            func=lambda ctx: [save_product_list(item) for item in ctx.items],
            success=True
        )
    ]
 
class EcommerceSpider(Spider):
    @property
    def config(self):
        return EcommerceConfig()
 
def save_product_list(product):
    print(f"保存商品列表: {product}")
 
# 如果需要处理详情页,需要创建专门的详情页爬虫
@dataclass
class ProductDetailConfig(Config):
    spider = [
        # 只处理详情页类型的种子
        Download(
            url="{url}",
            condition=lambda ctx: ctx.seeds.get("type") == "detail"
        ),
 
        Parse(
            func="xpath",
            kwargs={
                "rules": {
                    "//div[@class='product-detail']": {
                        "description": ".//div[@class='description']//text()",
                        "specifications": ".//table[@class='specs']//text()",
                        "reviews_count": ".//span[@class='review-count']/text()"
                    }
                }
            }
        ),
 
        Pipeline(
            func=lambda ctx: save_product_detail(ctx.items[0], ctx.seeds["product_id"]),
            success=True
        )
    ]
 
class ProductDetailSpider(Spider):
    @property
    def config(self):
        return ProductDetailConfig()
 
def save_product_detail(product_detail, product_id):
    print(f"保存商品详情 {product_id}: {product_detail}")

高级特性

1. 条件执行

# 基于种子条件
Download(
    url="https://api.example.com/data",
    condition=lambda ctx: ctx.seeds.get("need_download", True)
)
 
# 基于响应条件
Parse(
    func="json",
    kwargs={"rules": {"data": "content"}},
    condition=lambda ctx: ctx.response.status_code == 200
)
 
# 基于数据条件
Pipeline(
    func=lambda ctx: process_data(ctx.items),
    condition=lambda ctx: len(ctx.items) > 0,
    success=True
)

2. 错误处理

def safe_parser(context):
    try:
        return context.response.json()
    except Exception as e:
        print(f"解析失败: {e}")
        return []
 
Parse(func=safe_parser)
 
def error_handler(context):
    if context.response.status_code >= 400:
        print(f"请求失败: {context.response.status_code}")
        return []
    return context.response.json()
 
Parse(func=error_handler)

3. 数据传递

# 在种子中传递数据
Task(func=lambda ctx: ctx.submit(
    {"url": "https://example.com", "parent_data": ctx.items[0]}
))
 
# 在上下文中传递数据
def set_context_data(context):
    context.custom_data = {"processed_at": time.time()}
 
Task(func=set_context_data)

4. 动态配置

class DynamicSpider(Spider):
    def __init__(self, config_type="basic", **kwargs):
        self.config_type = config_type
        super().__init__(**kwargs)
 
    @property
    def config(self):
        if self.config_type == "advanced":
            return AdvancedConfig()
        else:
            return BasicConfig()

重要说明

流程执行方式

Form Spider 的 spider 配置是一个线性执行流程,每个种子会按照配置的节点顺序依次执行:

  1. 单次执行:每个种子只会执行一遍完整的流程
  2. 顺序执行:按照 spider 列表中的顺序依次执行节点
  3. 条件跳过:可以通过 condition 参数跳过某些节点
  4. 新种子处理:通过 context.submit() 提交的新种子会重新进入队列

多阶段处理的正确方式

如果需要处理列表页和详情页,推荐的方式是:

方式一:使用不同的爬虫实例

# 列表页爬虫
class ListSpider(Spider):
    # 处理列表页,提交详情页种子到队列
 
# 详情页爬虫
class DetailSpider(Spider):
    # 专门处理详情页种子

方式二:在同一个爬虫中使用条件判断

@dataclass
class MultiStageConfig(Config):
    spider = [
        # 根据种子类型选择不同的URL
        Download(url="{list_url}", condition=lambda ctx: ctx.seeds.get("type") == "list"),
        Download(url="{detail_url}", condition=lambda ctx: ctx.seeds.get("type") == "detail"),
 
        # 根据种子类型选择不同的解析方式
        Parse(func="parse_list", condition=lambda ctx: ctx.seeds.get("type") == "list"),
        Parse(func="parse_detail", condition=lambda ctx: ctx.seeds.get("type") == "detail"),
 
        # 统一的数据处理
        Pipeline(func=lambda ctx: save_data(ctx.items), success=True)
    ]

Form Spider 通过配置驱动的方式,大大简化了爬虫的开发复杂度,特别适合结构化的数据爬取任务。