bricks
开发指南
2.6 爬虫基类
2.6.5 Spider 生命周期

2.6.5 Spider 生命周期

Spider 生命周期描述了爬虫从启动到结束的完整执行过程,理解生命周期有助于更好地控制爬虫行为和优化性能。

生命周期概览

1. 完整生命周期

启动 → 初始化 → 种子生成 → 任务循环 → 清理 → 结束
 ↓       ↓        ↓        ↓       ↓      ↓
run → run_init → make_seeds → 执行任务 → cleanup → exit

2. 任务循环详解

获取种子 → 构造请求 → 发送请求 → 解析响应 → 处理数据 → 标记完成
   ↓         ↓         ↓         ↓         ↓         ↓
get_seed → make_request → download → parse → pipeline → success/failure

生命周期阶段详解

1. 启动阶段 (run)

from bricks.spider import air
 
class MySpider(air.Spider):
    def run(self, **kwargs):
        """
        爬虫主入口
        
        :param kwargs: 运行时参数
        :return: 执行结果
        """
        # 1. 执行初始化
        init_result = self.run_init(**kwargs)
        
        # 2. 执行爬虫主逻辑
        spider_result = self.run_spider()
        
        # 3. 返回结果
        return {
            "init": init_result,
            "spider": spider_result,
            "stats": self.get_stats()
        }
 
# 启动爬虫
spider = MySpider()
result = spider.run(category="news", max_pages=10)

2. 初始化阶段 (run_init)

class MySpider(air.Spider):
    def run_init(self, **kwargs):
        """
        初始化阶段
        
        1. 触发 BEFORE_INIT 事件
        2. 调用 make_seeds 生成种子
        3. 将种子放入任务队列
        4. 触发 AFTER_INIT 事件
        """
        print("开始初始化...")
        
        # 生成种子
        seeds = self.make_seeds(self.Context(target=self), **kwargs)
        
        # 统计种子数量
        seed_count = 0
        for seed in seeds:
            self.task_queue.put(seed)
            seed_count += 1
        
        print(f"生成了 {seed_count} 个种子")
        return {"seed_count": seed_count}
    
    def make_seeds(self, context, **kwargs):
        """生成初始种子"""
        category = kwargs.get("category", "default")
        max_pages = kwargs.get("max_pages", 5)
        
        return [
            {"page": i, "category": category} 
            for i in range(1, max_pages + 1)
        ]

3. 任务循环阶段 (run_spider)

class MySpider(air.Spider):
    def run_spider(self):
        """
        爬虫主循环
        
        1. 从队列获取种子
        2. 构造请求
        3. 发送请求
        4. 解析响应
        5. 处理数据
        6. 重复直到队列为空
        """
        processed_count = 0
        
        while not self.task_queue.empty() or not self.should_stop():
            try:
                # 获取种子
                seed = self.task_queue.get(timeout=1)
                
                # 创建上下文
                context = self.Context(target=self, seeds=seed)
                
                # 执行任务
                self.process_task(context)
                
                processed_count += 1
                
            except queue.Empty:
                if self.should_stop():
                    break
            except Exception as e:
                print(f"任务处理失败: {e}")
        
        return {"processed_count": processed_count}
    
    def process_task(self, context):
        """处理单个任务"""
        try:
            # 1. 构造请求
            request = self.make_request(context)
            if not request:
                return
            
            context.request = request
            
            # 2. 发送请求
            response = self.downloader.download(request)
            context.response = response
            
            # 3. 解析响应
            items = self.parse(context)
            context.items = items
            
            # 4. 处理数据
            self.item_pipeline(context)
            
        except Exception as e:
            context.failure(e)

事件驱动的生命周期

1. 生命周期事件

from bricks import const
from bricks.core import events
 
class EventDrivenSpider(air.Spider):
    @events.on(const.BEFORE_INIT)
    def on_before_init(self, context):
        """初始化前事件"""
        print("准备开始初始化...")
        self.start_time = time.time()
    
    @events.on(const.AFTER_INIT)
    def on_after_init(self, context):
        """初始化后事件"""
        init_duration = time.time() - self.start_time
        print(f"初始化完成,耗时: {init_duration:.2f}s")
    
    @events.on(const.BEFORE_REQUEST)
    def on_before_request(self, context):
        """请求前事件"""
        print(f"准备请求: {context.request.url}")
        context.custom_data["request_start"] = time.time()
    
    @events.on(const.AFTER_REQUEST)
    def on_after_request(self, context):
        """请求后事件"""
        duration = time.time() - context.custom_data["request_start"]
        print(f"请求完成: {context.response.status_code}, 耗时: {duration:.2f}s")
    
    @events.on(const.BEFORE_PARSE)
    def on_before_parse(self, context):
        """解析前事件"""
        print("开始解析响应...")
    
    @events.on(const.AFTER_PARSE)
    def on_after_parse(self, context):
        """解析后事件"""
        print(f"解析完成,提取了 {len(context.items)} 条数据")
    
    @events.on(const.BEFORE_PIPELINE)
    def on_before_pipeline(self, context):
        """处理前事件"""
        print("开始处理数据...")
    
    @events.on(const.AFTER_PIPELINE)
    def on_after_pipeline(self, context):
        """处理后事件"""
        if context.is_success:
            print("数据处理成功")
        else:
            print("数据处理失败")

2. 错误处理事件

class ErrorHandlingSpider(air.Spider):
    @events.on(const.ON_ERROR)
    def on_error(self, context):
        """错误处理事件"""
        exception = context.exception
        print(f"发生错误: {exception}")
        
        # 记录错误日志
        self.log_error(context, exception)
        
        # 决定是否重试
        if context.retry_times < context.max_retry:
            print("准备重试...")
            context.retry()
        else:
            print("重试次数已达上限,标记失败")
            context.failure()
    
    @events.on(const.ON_RETRY)
    def on_retry(self, context):
        """重试事件"""
        print(f"第 {context.retry_times} 次重试")
        
        # 增加重试延迟
        time.sleep(2 ** context.retry_times)
    
    def log_error(self, context, exception):
        """记录错误日志"""
        error_info = {
            "url": getattr(context.request, "url", "unknown"),
            "error": str(exception),
            "retry_times": context.retry_times,
            "timestamp": time.time()
        }
        print(f"错误日志: {error_info}")

生命周期控制

1. 优雅停止

class GracefulSpider(air.Spider):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.stop_requested = False
        
        # 注册信号处理器
        import signal
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
    
    def signal_handler(self, signum, frame):
        """信号处理器"""
        print(f"收到信号 {signum},准备优雅停止...")
        self.stop_requested = True
    
    def should_stop(self):
        """检查是否应该停止"""
        return self.stop_requested or super().should_stop()
    
    def run_spider(self):
        """重写主循环,支持优雅停止"""
        while not self.should_stop():
            try:
                seed = self.task_queue.get(timeout=1)
                context = self.Context(target=self, seeds=seed)
                self.process_task(context)
            except queue.Empty:
                continue
            except KeyboardInterrupt:
                print("收到中断信号,开始清理...")
                self.cleanup()
                break
        
        print("爬虫已停止")

2. 生命周期钩子

class HookedSpider(air.Spider):
    def before_start(self):
        """启动前钩子"""
        print("爬虫即将启动...")
        self.setup_logging()
        self.setup_database()
    
    def after_start(self):
        """启动后钩子"""
        print("爬虫已启动")
    
    def before_stop(self):
        """停止前钩子"""
        print("爬虫即将停止...")
        self.save_progress()
    
    def after_stop(self):
        """停止后钩子"""
        print("爬虫已停止")
        self.cleanup_resources()
    
    def run(self, **kwargs):
        """重写 run 方法,添加钩子"""
        try:
            self.before_start()
            result = super().run(**kwargs)
            self.after_start()
            return result
        finally:
            self.before_stop()
            self.after_stop()
    
    def setup_logging(self):
        """设置日志"""
        import logging
        logging.basicConfig(level=logging.INFO)
    
    def setup_database(self):
        """设置数据库连接"""
        print("连接数据库...")
    
    def save_progress(self):
        """保存进度"""
        print("保存爬虫进度...")
    
    def cleanup_resources(self):
        """清理资源"""
        print("清理资源...")

3. 状态监控

class MonitoredSpider(air.Spider):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.status = "initialized"
        self.metrics = {
            "start_time": None,
            "end_time": None,
            "total_tasks": 0,
            "completed_tasks": 0,
            "failed_tasks": 0
        }
    
    def run_init(self, **kwargs):
        """重写初始化,添加状态监控"""
        self.status = "initializing"
        self.metrics["start_time"] = time.time()
        
        result = super().run_init(**kwargs)
        
        self.status = "running"
        self.metrics["total_tasks"] = result.get("seed_count", 0)
        
        return result
    
    def process_task(self, context):
        """重写任务处理,添加指标统计"""
        try:
            super().process_task(context)
            
            if context.is_success:
                self.metrics["completed_tasks"] += 1
            else:
                self.metrics["failed_tasks"] += 1
                
        except Exception as e:
            self.metrics["failed_tasks"] += 1
            raise
    
    def run_spider(self):
        """重写主循环,添加状态更新"""
        try:
            result = super().run_spider()
            self.status = "completed"
            return result
        except Exception as e:
            self.status = "failed"
            raise
        finally:
            self.metrics["end_time"] = time.time()
    
    def get_status(self):
        """获取当前状态"""
        return {
            "status": self.status,
            "metrics": self.metrics.copy(),
            "progress": self.get_progress()
        }
    
    def get_progress(self):
        """计算进度"""
        total = self.metrics["total_tasks"]
        completed = self.metrics["completed_tasks"]
        failed = self.metrics["failed_tasks"]
        
        if total == 0:
            return 0
        
        return (completed + failed) / total * 100

生命周期最佳实践

1. 资源管理

class ResourceManagedSpider(air.Spider):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.resources = []
    
    def acquire_resource(self, resource):
        """获取资源"""
        self.resources.append(resource)
        return resource
    
    def release_resources(self):
        """释放所有资源"""
        for resource in self.resources:
            try:
                if hasattr(resource, "close"):
                    resource.close()
                elif hasattr(resource, "cleanup"):
                    resource.cleanup()
            except Exception as e:
                print(f"释放资源失败: {e}")
        
        self.resources.clear()
    
    def __del__(self):
        """析构函数,确保资源释放"""
        self.release_resources()

2. 异常恢复

class RecoverableSpider(air.Spider):
    def save_state(self):
        """保存爬虫状态"""
        state = {
            "queue_size": self.task_queue.qsize(),
            "processed_count": self.metrics.get("completed_tasks", 0),
            "timestamp": time.time()
        }
        
        with open("spider_state.json", "w") as f:
            json.dump(state, f)
    
    def load_state(self):
        """加载爬虫状态"""
        try:
            with open("spider_state.json", "r") as f:
                return json.load(f)
        except FileNotFoundError:
            return None
    
    def run(self, **kwargs):
        """支持状态恢复的运行"""
        # 尝试恢复状态
        saved_state = self.load_state()
        if saved_state:
            print(f"发现保存的状态: {saved_state}")
            # 根据保存的状态调整运行参数
        
        try:
            return super().run(**kwargs)
        except Exception as e:
            # 保存当前状态
            self.save_state()
            raise

理解 Spider 生命周期有助于更好地控制爬虫行为,实现高效、稳定的数据爬取。