2.6.2 Form Spider 表单爬虫
Form Spider
是基于配置的声明式爬虫,通过配置文件或配置类来定义爬虫行为,无需编写复杂的爬虫逻辑代码。
核心概念
1. 配置驱动
Form Spider 通过配置来驱动爬虫行为:
from dataclasses import dataclass
from bricks.spider.form import Spider, Config, Download, Parse, Pipeline
@dataclass
class MyConfig(Config):
"""爬虫配置类"""
spider = [
Download(url="https://httpbin.org/json"),
Parse(func="json", kwargs={"rules": {"args": {"page": "page"}}}),
Pipeline(func=lambda ctx: print(f"结果: {ctx.items}"), success=True)
]
class MySpider(Spider):
@property
def config(self):
return MyConfig()
2. 流程节点
Form Spider 支持多种流程节点:
节点类型 | 描述 | 用途 |
---|---|---|
Download | 下载节点 | 发送 HTTP 请求 |
Parse | 解析节点 | 解析响应数据 |
Pipeline | 管道节点 | 处理解析结果 |
Task | 任务节点 | 执行自定义逻辑 |
Init | 初始化节点 | 生成初始种子 |
配置节点详解
1. Download 节点
用于发送 HTTP 请求:
from bricks.spider.form import Download
# 基本下载
Download(url="https://httpbin.org/json")
# 带参数的下载
Download(
url="https://httpbin.org/post",
method="POST",
headers={"Content-Type": "application/json"},
body={"key": "value"},
timeout=30,
proxy="http://proxy:8080"
)
# 动态URL(使用种子数据)
Download(url="https://api.example.com/page/{page}")
# 条件下载
Download(
url="https://httpbin.org/json",
condition=lambda ctx: ctx.seeds.get("download", True)
)
2. Parse 节点
用于解析响应数据:
from bricks.spider.form import Parse
# JSON解析
Parse(
func="json",
kwargs={
"rules": {
"data[*]": {
"id": "id",
"title": "title",
"url": "url"
}
}
}
)
# XPath解析
Parse(
func="xpath",
kwargs={
"rules": {
"//article": {
"title": ".//h2/text()",
"content": ".//p/text()",
"link": ".//a/@href"
}
}
}
)
# 自定义解析函数
def custom_parser(context):
response = context.response
return {"content": response.text[:100]}
Parse(func=custom_parser)
3. Pipeline 节点
用于处理解析结果:
from bricks.spider.form import Pipeline
# 简单处理
Pipeline(
func=lambda ctx: print(f"处理了 {len(ctx.items)} 条数据"),
success=True
)
# 复杂处理
def process_items(context):
items = context.items
for item in items:
# 数据清洗
item["title"] = item.get("title", "").strip()
# 数据存储
save_to_database(item)
return items
Pipeline(func=process_items, success=True)
# 条件处理
Pipeline(
func=lambda ctx: print("有数据"),
condition=lambda ctx: len(ctx.items) > 0,
success=True
)
4. Task 节点
用于执行自定义任务:
from bricks.spider.form import Task
# 提交新种子
def submit_new_seeds(context):
items = context.items
new_seeds = []
for item in items:
if item.get("has_detail"):
new_seeds.append({"url": item["detail_url"]})
if new_seeds:
context.submit(*new_seeds)
Task(func=submit_new_seeds)
# 条件任务
Task(
func=lambda ctx: print("执行特殊任务"),
condition=lambda ctx: ctx.seeds.get("special", False)
)
5. Init 节点
用于生成初始种子:
from bricks.spider.form import Init
# 静态种子
Init(func=lambda: [{"page": i} for i in range(1, 6)])
# 动态种子
def generate_seeds():
# 从数据库或API获取种子
return [{"url": f"https://api.example.com/page/{i}"} for i in range(1, 11)]
Init(func=generate_seeds)
完整示例
1. 新闻爬虫
from dataclasses import dataclass
from bricks.spider.form import Spider, Config, Download, Parse, Pipeline, Task
@dataclass
class NewsConfig(Config):
spider = [
# 下载列表页
Download(url="https://news.example.com/list?page={page}"),
# 解析列表页
Parse(
func="xpath",
kwargs={
"rules": {
"//article": {
"title": ".//h2/text()",
"summary": ".//p[@class='summary']/text()",
"detail_url": ".//a/@href",
"publish_time": ".//time/@datetime"
}
}
}
),
# 提交详情页种子到队列,由其他实例处理
Task(func=lambda ctx: ctx.submit(*[
{"url": item["detail_url"], "type": "detail"}
for item in ctx.items if item.get("detail_url")
], call_later=True)), # call_later=True 表示放入队列等待处理
# 数据处理
Pipeline(
func=lambda ctx: [save_news(item) for item in ctx.items],
success=True
)
]
class NewsSpider(Spider):
@property
def config(self):
return NewsConfig()
def save_news(item):
print(f"保存新闻: {item.get('title', 'Unknown')}")
return item
# 运行爬虫
if __name__ == "__main__":
spider = NewsSpider()
spider.run(page=1) # 传递初始参数
2. 电商爬虫
@dataclass
class EcommerceConfig(Config):
spider = [
# 搜索商品
Download(
url="https://api.shop.com/search",
params={"q": "{keyword}", "page": "{page}"}
),
# 解析商品列表
Parse(
func="json",
kwargs={
"rules": {
"products[*]": {
"id": "id",
"name": "name",
"price": "price",
"rating": "rating",
"detail_url": "detail_url"
}
}
}
),
# 过滤高评分商品
Pipeline(
func=lambda ctx: [
item for item in ctx.items
if float(item.get("rating", 0)) >= 4.0
]
),
# 提交详情页到队列,由其他实例处理
Task(func=lambda ctx: ctx.submit(*[
{"url": item["detail_url"], "product_id": item["id"], "type": "detail"}
for item in ctx.items
], call_later=True)), # 放入队列等待处理
# 保存商品列表信息
Pipeline(
func=lambda ctx: [save_product_list(item) for item in ctx.items],
success=True
)
]
class EcommerceSpider(Spider):
@property
def config(self):
return EcommerceConfig()
def save_product_list(product):
print(f"保存商品列表: {product}")
# 如果需要处理详情页,需要创建专门的详情页爬虫
@dataclass
class ProductDetailConfig(Config):
spider = [
# 只处理详情页类型的种子
Download(
url="{url}",
condition=lambda ctx: ctx.seeds.get("type") == "detail"
),
Parse(
func="xpath",
kwargs={
"rules": {
"//div[@class='product-detail']": {
"description": ".//div[@class='description']//text()",
"specifications": ".//table[@class='specs']//text()",
"reviews_count": ".//span[@class='review-count']/text()"
}
}
}
),
Pipeline(
func=lambda ctx: save_product_detail(ctx.items[0], ctx.seeds["product_id"]),
success=True
)
]
class ProductDetailSpider(Spider):
@property
def config(self):
return ProductDetailConfig()
def save_product_detail(product_detail, product_id):
print(f"保存商品详情 {product_id}: {product_detail}")
高级特性
1. 条件执行
# 基于种子条件
Download(
url="https://api.example.com/data",
condition=lambda ctx: ctx.seeds.get("need_download", True)
)
# 基于响应条件
Parse(
func="json",
kwargs={"rules": {"data": "content"}},
condition=lambda ctx: ctx.response.status_code == 200
)
# 基于数据条件
Pipeline(
func=lambda ctx: process_data(ctx.items),
condition=lambda ctx: len(ctx.items) > 0,
success=True
)
2. 错误处理
def safe_parser(context):
try:
return context.response.json()
except Exception as e:
print(f"解析失败: {e}")
return []
Parse(func=safe_parser)
def error_handler(context):
if context.response.status_code >= 400:
print(f"请求失败: {context.response.status_code}")
return []
return context.response.json()
Parse(func=error_handler)
3. 数据传递
# 在种子中传递数据
Task(func=lambda ctx: ctx.submit(
{"url": "https://example.com", "parent_data": ctx.items[0]}
))
# 在上下文中传递数据
def set_context_data(context):
context.custom_data = {"processed_at": time.time()}
Task(func=set_context_data)
4. 动态配置
class DynamicSpider(Spider):
def __init__(self, config_type="basic", **kwargs):
self.config_type = config_type
super().__init__(**kwargs)
@property
def config(self):
if self.config_type == "advanced":
return AdvancedConfig()
else:
return BasicConfig()
重要说明
流程执行方式
Form Spider 的 spider
配置是一个线性执行流程,每个种子会按照配置的节点顺序依次执行:
- 单次执行:每个种子只会执行一遍完整的流程
- 顺序执行:按照
spider
列表中的顺序依次执行节点 - 条件跳过:可以通过
condition
参数跳过某些节点 - 新种子处理:通过
context.submit()
提交的新种子会重新进入队列
多阶段处理的正确方式
如果需要处理列表页和详情页,推荐的方式是:
方式一:使用不同的爬虫实例
# 列表页爬虫
class ListSpider(Spider):
# 处理列表页,提交详情页种子到队列
# 详情页爬虫
class DetailSpider(Spider):
# 专门处理详情页种子
方式二:在同一个爬虫中使用条件判断
@dataclass
class MultiStageConfig(Config):
spider = [
# 根据种子类型选择不同的URL
Download(url="{list_url}", condition=lambda ctx: ctx.seeds.get("type") == "list"),
Download(url="{detail_url}", condition=lambda ctx: ctx.seeds.get("type") == "detail"),
# 根据种子类型选择不同的解析方式
Parse(func="parse_list", condition=lambda ctx: ctx.seeds.get("type") == "list"),
Parse(func="parse_detail", condition=lambda ctx: ctx.seeds.get("type") == "detail"),
# 统一的数据处理
Pipeline(func=lambda ctx: save_data(ctx.items), success=True)
]
Form Spider 通过配置驱动的方式,大大简化了爬虫的开发复杂度,特别适合结构化的数据爬取任务。