2.6.3 Template Spider 模板爬虫
Template Spider
是一种简化的配置式爬虫,专门为线性的爬取流程设计,提供了更简洁的配置方式。
核心特性
1. 线性流程
Template Spider 按照固定的线性流程执行:
Init → Download → Parse → Pipeline
每个阶段都可以配置多个节点,按顺序执行。
2. 简化配置
相比 Form Spider,Template Spider 提供了更简洁的配置方式:
from dataclasses import dataclass
from bricks.spider.template import Spider, Config, Download, Parse, Pipeline
@dataclass
class SimpleConfig(Config):
# 初始化阶段
init = [
Init(func=lambda: [{"page": i} for i in range(1, 6)])
]
# 下载阶段
download = [
Download(url="https://httpbin.org/json?page={page}")
]
# 解析阶段
parse = [
Parse(func="json", kwargs={"rules": {"args": {"page": "page"}}})
]
# 处理阶段
pipeline = [
Pipeline(func=lambda ctx: print(f"页面 {ctx.seeds['page']}: {ctx.items}"), success=True)
]
class SimpleSpider(Spider):
@property
def config(self):
return SimpleConfig()
配置节点
1. Init 节点
生成初始种子数据:
from bricks.spider.template import Init
# 静态种子
Init(func=lambda: [
{"url": "https://example.com/page1"},
{"url": "https://example.com/page2"}
])
# 动态种子
def generate_urls():
# 从数据库或API获取URL
urls = get_urls_from_database()
return [{"url": url} for url in urls]
Init(func=generate_urls)
# 参数化种子
Init(func=lambda **kwargs: [
{"page": i, "category": kwargs.get("category", "default")}
for i in range(1, kwargs.get("max_pages", 5) + 1)
])
2. Download 节点
发送 HTTP 请求:
from bricks.spider.template import Download
# 基本下载
Download(url="https://httpbin.org/json")
# 参数化URL
Download(url="https://api.example.com/data?page={page}&category={category}")
# 带请求配置
Download(
url="https://httpbin.org/post",
method="POST",
headers={"Content-Type": "application/json"},
body={"query": "{search_term}"},
timeout=30
)
# Template Spider 中每个配置只能有一个下载节点
# 如果需要处理不同类型的URL,使用条件判断
download = [
Download(
url="{url}", # 通用URL字段
condition=lambda ctx: ctx.seeds.get("url") # 确保有URL
)
]
3. Parse 节点
解析响应数据:
from bricks.spider.template import Parse
# JSON解析
Parse(
func="json",
kwargs={
"rules": {
"items[*]": {
"id": "id",
"title": "title",
"description": "desc"
}
}
}
)
# XPath解析
Parse(
func="xpath",
kwargs={
"rules": {
"//div[@class='item']": {
"title": ".//h3/text()",
"link": ".//a/@href",
"price": ".//span[@class='price']/text()"
}
}
}
)
# 自定义解析
def extract_data(context):
response = context.response
data = response.json()
return [{"processed": item} for item in data.get("results", [])]
Parse(func=extract_data)
4. Pipeline 节点
处理解析结果:
from bricks.spider.template import Pipeline
# 数据处理
def process_items(context):
items = context.items
processed = []
for item in items:
# 数据清洗
item["title"] = item.get("title", "").strip()
# 数据验证
if item.get("title"):
processed.append(item)
return processed
Pipeline(func=process_items)
# 数据存储
Pipeline(
func=lambda ctx: [save_to_db(item) for item in ctx.items],
success=True # 标记任务完成
)
# 条件处理
Pipeline(
func=lambda ctx: print(f"处理了 {len(ctx.items)} 条数据"),
condition=lambda ctx: len(ctx.items) > 0,
success=True
)
完整示例
1. API 数据爬取
from dataclasses import dataclass
from bricks.spider.template import Spider, Config, Init, Download, Parse, Pipeline
@dataclass
class ApiConfig(Config):
init = [
Init(func=lambda: [{"endpoint": "users"}, {"endpoint": "posts"}])
]
download = [
Download(url="https://jsonplaceholder.typicode.com/{endpoint}")
]
parse = [
Parse(
func="json",
kwargs={
"rules": {
"[*]": {
"id": "id",
"title": "title",
"body": "body"
}
}
}
)
]
pipeline = [
Pipeline(
func=lambda ctx: print(f"从 {ctx.seeds['endpoint']} 获取了 {len(ctx.items)} 条数据"),
success=True
)
]
class ApiSpider(Spider):
@property
def config(self):
return ApiConfig()
# 运行
spider = ApiSpider()
spider.run()
2. 分页数据爬取
@dataclass
class PaginationConfig(Config):
init = [
Init(func=lambda **kwargs: [
{"page": i} for i in range(1, kwargs.get("max_pages", 5) + 1)
])
]
download = [
Download(url="https://httpbin.org/json?page={page}")
]
parse = [
Parse(
func="json",
kwargs={
"rules": {
"args": {
"page": "page"
}
}
}
)
]
pipeline = [
# 数据验证
Pipeline(func=lambda ctx: ctx.items if ctx.items else []),
# 数据存储
Pipeline(
func=lambda ctx: save_page_data(ctx.seeds["page"], ctx.items),
success=True
)
]
class PaginationSpider(Spider):
@property
def config(self):
return PaginationConfig()
def save_page_data(page, items):
print(f"保存第 {page} 页数据: {len(items)} 条")
return items
# 运行时传递参数
spider = PaginationSpider()
spider.run(max_pages=10)
3. 多数据源爬取
@dataclass
class MultiSourceConfig(Config):
init = [
Init(func=lambda: [
{"source": "api1", "url": "https://api1.example.com/data"},
{"source": "api2", "url": "https://api2.example.com/info"},
{"source": "web", "url": "https://example.com/page"}
])
]
download = [
Download(url="{url}")
]
parse = [
# JSON API 解析
Parse(
func="json",
kwargs={"rules": {"data[*]": {"id": "id", "name": "name"}}},
condition=lambda ctx: ctx.seeds["source"].startswith("api")
),
# HTML 页面解析
Parse(
func="xpath",
kwargs={"rules": {"//div[@class='item']": {"title": ".//h3/text()"}}},
condition=lambda ctx: ctx.seeds["source"] == "web"
)
]
pipeline = [
Pipeline(
func=lambda ctx: process_by_source(ctx.seeds["source"], ctx.items),
success=True
)
]
class MultiSourceSpider(Spider):
@property
def config(self):
return MultiSourceConfig()
def process_by_source(source, items):
print(f"处理来自 {source} 的 {len(items)} 条数据")
return items
高级特性
1. 条件执行
# 基于种子条件的下载
Download(
url="https://api.example.com/detail/{id}",
condition=lambda ctx: "id" in ctx.seeds
)
# 基于响应条件的解析
Parse(
func="json",
kwargs={"rules": {"data": "content"}},
condition=lambda ctx: "application/json" in ctx.response.headers.get("content-type", "")
)
# 基于数据条件的处理
Pipeline(
func=lambda ctx: send_notification(ctx.items),
condition=lambda ctx: len(ctx.items) > 100,
success=True
)
2. 错误处理
def safe_download(context):
try:
# 自定义下载逻辑
return download_with_retry(context.seeds["url"])
except Exception as e:
print(f"下载失败: {e}")
return None
Download(func=safe_download)
def robust_parser(context):
try:
return context.response.json()
except:
# 降级到文本解析
return [{"content": context.response.text[:100]}]
Parse(func=robust_parser)
3. 数据流控制
# 数据过滤
Pipeline(func=lambda ctx: [
item for item in ctx.items
if item.get("status") == "active"
])
# 数据转换
Pipeline(func=lambda ctx: [
{**item, "processed_at": datetime.now().isoformat()}
for item in ctx.items
])
# 数据聚合
Pipeline(func=lambda ctx: [{
"total_items": len(ctx.items),
"page": ctx.seeds.get("page"),
"items": ctx.items
}])
4. 参数传递
class ParameterizedSpider(Spider):
def __init__(self, category="default", max_pages=5, **kwargs):
self.category = category
self.max_pages = max_pages
super().__init__(**kwargs)
@property
def config(self):
@dataclass
class DynamicConfig(Config):
init = [
Init(func=lambda: [
{"page": i, "category": self.category}
for i in range(1, self.max_pages + 1)
])
]
download = [
Download(url="https://api.example.com/data?page={page}&category={category}")
]
parse = [
Parse(func="json", kwargs={"rules": {"items[*]": {"id": "id", "title": "title"}}})
]
pipeline = [
Pipeline(func=lambda ctx: print(f"类别 {ctx.seeds['category']}: {len(ctx.items)} 条"), success=True)
]
return DynamicConfig()
# 使用参数化爬虫
spider = ParameterizedSpider(category="electronics", max_pages=10)
spider.run()
重要说明
Template Spider 的执行模式
Template Spider 采用固定的线性流程:
Init → Download → Parse → Pipeline
关键特点:
- 固定流程:每个种子都会按照
init → download → parse → pipeline
的顺序执行 - 单阶段处理:每个配置阶段只能有一个处理逻辑
- 循环执行:每个种子独立执行完整流程
- 条件控制:可以通过
condition
参数控制是否执行某个阶段
多类型种子处理
如果需要处理不同类型的种子,推荐使用条件判断:
@dataclass
class MultiTypeConfig(Config):
download = [
Download(
url="{url}",
condition=lambda ctx: ctx.seeds.get("url") # 确保有URL
)
]
parse = [
# 根据种子类型选择解析方式
Parse(
func="json",
kwargs={"rules": {"items[*]": {"title": "title"}}},
condition=lambda ctx: ctx.seeds.get("type") == "api"
),
Parse(
func="xpath",
kwargs={"rules": {"//h1": {"title": "text()"}}},
condition=lambda ctx: ctx.seeds.get("type") == "html"
)
]
pipeline = [
Pipeline(
func=lambda ctx: save_data(ctx.items, ctx.seeds.get("type")),
success=True
)
]
与 Form Spider 的区别
特性 | Template Spider | Form Spider |
---|---|---|
流程结构 | 固定四阶段 | 自定义节点序列 |
配置方式 | 分阶段配置 | 统一列表配置 |
灵活性 | 较低,适合标准流程 | 较高,适合复杂流程 |
使用场景 | 简单的线性爬取 | 复杂的多步骤处理 |
Template Spider 提供了一种简洁而强大的方式来构建线性流程的爬虫,特别适合标准的数据爬取任务。