bricks
开发指南
2.6 爬虫基类
2.6.1 Air Spider 基础爬虫

2.6.1 Air Spider 基础爬虫

Air SpiderBricks 框架的核心基础爬虫类,提供了完整的爬虫生命周期管理和事件驱动的架构。它是所有其他爬虫类型的基础。

核心特性

1. 事件驱动架构

Air Spider 基于事件驱动架构,整个爬虫流程通过事件串联:

from bricks.spider import air
from bricks import Request
 
class MySpider(air.Spider):
    def make_seeds(self, context, **kwargs):
        """生成初始种子"""
        return [{"page": i} for i in range(1, 6)]
    
    def make_request(self, context) -> Request:
        """根据种子生成请求"""
        page = context.seeds["page"]
        return Request(url=f"https://httpbin.org/json?page={page}")
    
    def parse(self, context):
        """解析响应"""
        return context.response.json()
    
    def item_pipeline(self, context):
        """处理解析结果"""
        print(f"处理数据: {context.items}")
        context.success()  # 标记种子处理完成

2. 完整的生命周期

Air Spider 提供了完整的爬虫生命周期管理:

初始化 → 生成种子 → 构造请求 → 发送请求 → 解析响应 → 数据处理 → 完成
  ↓         ↓         ↓         ↓         ↓         ↓         ↓
run_init → make_seeds → make_request → on_request → parse → item_pipeline → success

核心方法详解

1. make_seeds 方法

生成爬虫的初始种子数据:

def make_seeds(self, context, **kwargs):
    """
    生成种子数据
    
    :param context: 初始化上下文
    :param kwargs: 额外参数,包含记录信息等
    :return: 种子列表或生成器
    """
    # 可以返回列表
    return [
        {"url": "https://example.com/page1"},
        {"url": "https://example.com/page2"}
    ]
    
    # 也可以返回生成器(节省内存)
    for i in range(1000):
        yield {"page": i, "category": "news"}

2. make_request 方法

根据种子数据构造请求:

def make_request(self, context) -> Request:
    """
    构造请求对象
    
    :param context: 爬虫上下文,包含种子数据
    :return: Request 对象
    """
    seeds = context.seeds
    
    return Request(
        url=seeds["url"],
        headers={
            "User-Agent": "My Spider 1.0"
        },
        params={"page": seeds.get("page", 1)},
        proxy=seeds.get("proxy")
    )

3. parse 方法

解析响应数据:

def parse(self, context):
    """
    解析响应
    
    :param context: 爬虫上下文,包含响应对象
    :return: 解析结果
    """
    response = context.response
    
    # JSON 响应解析
    if "application/json" in response.headers.get("content-type", ""):
        return response.json()
    
    # HTML 响应解析
    else:
        return {
            "title": response.xpath("//title/text()").get(""),
            "links": response.xpath("//a/@href").getall()
        }

4. item_pipeline 方法

处理解析后的数据:

def item_pipeline(self, context):
    """
    数据处理管道
    
    :param context: 爬虫上下文,包含解析结果
    """
    items = context.items
    
    # 数据验证
    if not items:
        print("没有提取到数据")
        context.success()
        return
    
    # 数据处理
    for item in items:
        # 数据清洗
        item["title"] = item.get("title", "").strip()
        
        # 数据存储
        self.save_to_database(item)
    
    # 标记处理完成
    context.success()

高级特性

1. 并发控制

class ConcurrentSpider(air.Spider):
    def __init__(self, **kwargs):
        # 设置并发数
        kwargs.setdefault("concurrency", 10)
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        return [{"url": f"https://httpbin.org/delay/{i}"} for i in range(100)]

2. 代理支持

class ProxySpider(air.Spider):
    def __init__(self, **kwargs):
        # 设置代理
        kwargs.setdefault("proxy", [
            "http://proxy1:8080",
            "http://proxy2:8080"
        ])
        super().__init__(**kwargs)

3. 自定义下载器

from bricks.downloader import cffi
 
class CustomSpider(air.Spider):
    def __init__(self, **kwargs):
        # 使用自定义下载器
        kwargs.setdefault("downloader", cffi.Downloader())
        super().__init__(**kwargs)

4. 任务队列配置

from bricks.lib.queues import RedisQueue
 
class DistributedSpider(air.Spider):
    def __init__(self, **kwargs):
        # 使用 Redis 队列实现分布式
        kwargs.setdefault("task_queue", RedisQueue(
            host="localhost",
            port=6379,
            db=0
        ))
        kwargs.setdefault("queue_name", "my_spider_queue")
        super().__init__(**kwargs)

内置事件处理

Air Spider 内置了一些常用的事件处理器:

1. 请求前处理

# 自动设置 User-Agent
on_request.Before.fake_ua
 
# 自动设置代理
on_request.Before.set_proxy

2. 请求后处理

# 显示响应信息
on_request.After.show_response
 
# 条件脚本处理
on_request.After.conditional_scripts
 
# 绕过检测
on_request.After.bypass

统计信息

Air Spider 提供了详细的统计信息:

spider = MySpider()
result = spider.run()
 
print(f"获取种子数量: {spider.number_of_seeds_obtained.value}")
print(f"新增种子数量: {spider.number_of_new_seeds.value}")
print(f"总请求数量: {spider.number_of_total_requests.value}")
print(f"失败请求数量: {spider.number_of_failure_requests.value}")
print(f"成功率: {(spider.number_of_total_requests.value - spider.number_of_failure_requests.value) / spider.number_of_total_requests.value * 100:.2f}%")

运行模式

1. 完整运行

spider = MySpider()
result = spider.run()  # 包含初始化和爬取

2. 分步运行

spider = MySpider()
 
# 只运行初始化
init_result = spider.run_init()
 
# 只运行爬虫
spider_result = spider.run_spider()

3. 持续运行

spider = MySpider(forever=True)  # 持续运行,不会因为队列为空而退出
spider.run()

调试和测试

1. survey 方法

用于快速测试和调试:

# 测试特定种子
results = MySpider.survey(
    {"url": "https://httpbin.org/json"},
    {"url": "https://httpbin.org/html"}
)
 
for context in results:
    print(f"URL: {context.request.url}")
    print(f"状态码: {context.response.status_code}")
    print(f"结果: {context.items}")

2. fetch 方法

发送单个请求:

spider = MySpider()
 
# 发送单个请求
response = spider.fetch("https://httpbin.org/json")
print(response.json())
 
# 带参数的请求
response = spider.fetch({
    "url": "https://httpbin.org/post",
    "method": "POST",
    "body": {"key": "value"}
})

Air Spider 是 Bricks 框架的核心,提供了强大而灵活的爬虫基础功能,是构建各种复杂爬虫的基石。