bricks
开发指南
2.6 爬虫基类
2.6.3 Template Spider 模板爬虫

2.6.3 Template Spider 模板爬虫

Template Spider 是一种简化的配置式爬虫,专门为线性的爬取流程设计,提供了更简洁的配置方式。

核心特性

1. 线性流程

Template Spider 按照固定的线性流程执行:

Init → Download → Parse → Pipeline

每个阶段都可以配置多个节点,按顺序执行。

2. 简化配置

相比 Form Spider,Template Spider 提供了更简洁的配置方式:

from dataclasses import dataclass
from bricks.spider.template import Spider, Config, Download, Parse, Pipeline
 
@dataclass
class SimpleConfig(Config):
    # 初始化阶段
    init = [
        Init(func=lambda: [{"page": i} for i in range(1, 6)])
    ]
 
    # 下载阶段
    download = [
        Download(url="https://httpbin.org/json?page={page}")
    ]
 
    # 解析阶段
    parse = [
        Parse(func="json", kwargs={"rules": {"args": {"page": "page"}}})
    ]
 
    # 处理阶段
    pipeline = [
        Pipeline(func=lambda ctx: print(f"页面 {ctx.seeds['page']}: {ctx.items}"), success=True)
    ]
 
class SimpleSpider(Spider):
    @property
    def config(self):
        return SimpleConfig()

配置节点

1. Init 节点

生成初始种子数据:

from bricks.spider.template import Init
 
# 静态种子
Init(func=lambda: [
    {"url": "https://example.com/page1"},
    {"url": "https://example.com/page2"}
])
 
# 动态种子
def generate_urls():
    # 从数据库或API获取URL
    urls = get_urls_from_database()
    return [{"url": url} for url in urls]
 
Init(func=generate_urls)
 
# 参数化种子
Init(func=lambda **kwargs: [
    {"page": i, "category": kwargs.get("category", "default")}
    for i in range(1, kwargs.get("max_pages", 5) + 1)
])

2. Download 节点

发送 HTTP 请求:

from bricks.spider.template import Download
 
# 基本下载
Download(url="https://httpbin.org/json")
 
# 参数化URL
Download(url="https://api.example.com/data?page={page}&category={category}")
 
# 带请求配置
Download(
    url="https://httpbin.org/post",
    method="POST",
    headers={"Content-Type": "application/json"},
    body={"query": "{search_term}"},
    timeout=30
)
 
# Template Spider 中每个配置只能有一个下载节点
# 如果需要处理不同类型的URL,使用条件判断
download = [
    Download(
        url="{url}",  # 通用URL字段
        condition=lambda ctx: ctx.seeds.get("url")  # 确保有URL
    )
]

3. Parse 节点

解析响应数据:

from bricks.spider.template import Parse
 
# JSON解析
Parse(
    func="json",
    kwargs={
        "rules": {
            "items[*]": {
                "id": "id",
                "title": "title",
                "description": "desc"
            }
        }
    }
)
 
# XPath解析
Parse(
    func="xpath",
    kwargs={
        "rules": {
            "//div[@class='item']": {
                "title": ".//h3/text()",
                "link": ".//a/@href",
                "price": ".//span[@class='price']/text()"
            }
        }
    }
)
 
# 自定义解析
def extract_data(context):
    response = context.response
    data = response.json()
    return [{"processed": item} for item in data.get("results", [])]
 
Parse(func=extract_data)

4. Pipeline 节点

处理解析结果:

from bricks.spider.template import Pipeline
 
# 数据处理
def process_items(context):
    items = context.items
    processed = []
    for item in items:
        # 数据清洗
        item["title"] = item.get("title", "").strip()
        # 数据验证
        if item.get("title"):
            processed.append(item)
    return processed
 
Pipeline(func=process_items)
 
# 数据存储
Pipeline(
    func=lambda ctx: [save_to_db(item) for item in ctx.items],
    success=True  # 标记任务完成
)
 
# 条件处理
Pipeline(
    func=lambda ctx: print(f"处理了 {len(ctx.items)} 条数据"),
    condition=lambda ctx: len(ctx.items) > 0,
    success=True
)

完整示例

1. API 数据爬取

from dataclasses import dataclass
from bricks.spider.template import Spider, Config, Init, Download, Parse, Pipeline
 
@dataclass
class ApiConfig(Config):
    init = [
        Init(func=lambda: [{"endpoint": "users"}, {"endpoint": "posts"}])
    ]
 
    download = [
        Download(url="https://jsonplaceholder.typicode.com/{endpoint}")
    ]
 
    parse = [
        Parse(
            func="json",
            kwargs={
                "rules": {
                    "[*]": {
                        "id": "id",
                        "title": "title",
                        "body": "body"
                    }
                }
            }
        )
    ]
 
    pipeline = [
        Pipeline(
            func=lambda ctx: print(f"从 {ctx.seeds['endpoint']} 获取了 {len(ctx.items)} 条数据"),
            success=True
        )
    ]
 
class ApiSpider(Spider):
    @property
    def config(self):
        return ApiConfig()
 
# 运行
spider = ApiSpider()
spider.run()

2. 分页数据爬取

@dataclass
class PaginationConfig(Config):
    init = [
        Init(func=lambda **kwargs: [
            {"page": i} for i in range(1, kwargs.get("max_pages", 5) + 1)
        ])
    ]
 
    download = [
        Download(url="https://httpbin.org/json?page={page}")
    ]
 
    parse = [
        Parse(
            func="json",
            kwargs={
                "rules": {
                    "args": {
                        "page": "page"
                    }
                }
            }
        )
    ]
 
    pipeline = [
        # 数据验证
        Pipeline(func=lambda ctx: ctx.items if ctx.items else []),
 
        # 数据存储
        Pipeline(
            func=lambda ctx: save_page_data(ctx.seeds["page"], ctx.items),
            success=True
        )
    ]
 
class PaginationSpider(Spider):
    @property
    def config(self):
        return PaginationConfig()
 
def save_page_data(page, items):
    print(f"保存第 {page} 页数据: {len(items)} 条")
    return items
 
# 运行时传递参数
spider = PaginationSpider()
spider.run(max_pages=10)

3. 多数据源爬取

@dataclass
class MultiSourceConfig(Config):
    init = [
        Init(func=lambda: [
            {"source": "api1", "url": "https://api1.example.com/data"},
            {"source": "api2", "url": "https://api2.example.com/info"},
            {"source": "web", "url": "https://example.com/page"}
        ])
    ]
 
    download = [
        Download(url="{url}")
    ]
 
    parse = [
        # JSON API 解析
        Parse(
            func="json",
            kwargs={"rules": {"data[*]": {"id": "id", "name": "name"}}},
            condition=lambda ctx: ctx.seeds["source"].startswith("api")
        ),
 
        # HTML 页面解析
        Parse(
            func="xpath",
            kwargs={"rules": {"//div[@class='item']": {"title": ".//h3/text()"}}},
            condition=lambda ctx: ctx.seeds["source"] == "web"
        )
    ]
 
    pipeline = [
        Pipeline(
            func=lambda ctx: process_by_source(ctx.seeds["source"], ctx.items),
            success=True
        )
    ]
 
class MultiSourceSpider(Spider):
    @property
    def config(self):
        return MultiSourceConfig()
 
def process_by_source(source, items):
    print(f"处理来自 {source}{len(items)} 条数据")
    return items

高级特性

1. 条件执行

# 基于种子条件的下载
Download(
    url="https://api.example.com/detail/{id}",
    condition=lambda ctx: "id" in ctx.seeds
)
 
# 基于响应条件的解析
Parse(
    func="json",
    kwargs={"rules": {"data": "content"}},
    condition=lambda ctx: "application/json" in ctx.response.headers.get("content-type", "")
)
 
# 基于数据条件的处理
Pipeline(
    func=lambda ctx: send_notification(ctx.items),
    condition=lambda ctx: len(ctx.items) > 100,
    success=True
)

2. 错误处理

def safe_download(context):
    try:
        # 自定义下载逻辑
        return download_with_retry(context.seeds["url"])
    except Exception as e:
        print(f"下载失败: {e}")
        return None
 
Download(func=safe_download)
 
def robust_parser(context):
    try:
        return context.response.json()
    except:
        # 降级到文本解析
        return [{"content": context.response.text[:100]}]
 
Parse(func=robust_parser)

3. 数据流控制

# 数据过滤
Pipeline(func=lambda ctx: [
    item for item in ctx.items
    if item.get("status") == "active"
])
 
# 数据转换
Pipeline(func=lambda ctx: [
    {**item, "processed_at": datetime.now().isoformat()}
    for item in ctx.items
])
 
# 数据聚合
Pipeline(func=lambda ctx: [{
    "total_items": len(ctx.items),
    "page": ctx.seeds.get("page"),
    "items": ctx.items
}])

4. 参数传递

class ParameterizedSpider(Spider):
    def __init__(self, category="default", max_pages=5, **kwargs):
        self.category = category
        self.max_pages = max_pages
        super().__init__(**kwargs)
 
    @property
    def config(self):
        @dataclass
        class DynamicConfig(Config):
            init = [
                Init(func=lambda: [
                    {"page": i, "category": self.category}
                    for i in range(1, self.max_pages + 1)
                ])
            ]
 
            download = [
                Download(url="https://api.example.com/data?page={page}&category={category}")
            ]
 
            parse = [
                Parse(func="json", kwargs={"rules": {"items[*]": {"id": "id", "title": "title"}}})
            ]
 
            pipeline = [
                Pipeline(func=lambda ctx: print(f"类别 {ctx.seeds['category']}: {len(ctx.items)} 条"), success=True)
            ]
 
        return DynamicConfig()
 
# 使用参数化爬虫
spider = ParameterizedSpider(category="electronics", max_pages=10)
spider.run()

重要说明

Template Spider 的执行模式

Template Spider 采用固定的线性流程

Init → Download → Parse → Pipeline

关键特点:

  1. 固定流程:每个种子都会按照 init → download → parse → pipeline 的顺序执行
  2. 单阶段处理:每个配置阶段只能有一个处理逻辑
  3. 循环执行:每个种子独立执行完整流程
  4. 条件控制:可以通过 condition 参数控制是否执行某个阶段

多类型种子处理

如果需要处理不同类型的种子,推荐使用条件判断:

@dataclass
class MultiTypeConfig(Config):
    download = [
        Download(
            url="{url}",
            condition=lambda ctx: ctx.seeds.get("url")  # 确保有URL
        )
    ]
 
    parse = [
        # 根据种子类型选择解析方式
        Parse(
            func="json",
            kwargs={"rules": {"items[*]": {"title": "title"}}},
            condition=lambda ctx: ctx.seeds.get("type") == "api"
        ),
        Parse(
            func="xpath",
            kwargs={"rules": {"//h1": {"title": "text()"}}},
            condition=lambda ctx: ctx.seeds.get("type") == "html"
        )
    ]
 
    pipeline = [
        Pipeline(
            func=lambda ctx: save_data(ctx.items, ctx.seeds.get("type")),
            success=True
        )
    ]

与 Form Spider 的区别

特性Template SpiderForm Spider
流程结构固定四阶段自定义节点序列
配置方式分阶段配置统一列表配置
灵活性较低,适合标准流程较高,适合复杂流程
使用场景简单的线性爬取复杂的多步骤处理

Template Spider 提供了一种简洁而强大的方式来构建线性流程的爬虫,特别适合标准的数据爬取任务。