2.6.5 Spider 生命周期
Spider 生命周期描述了爬虫从启动到结束的完整执行过程,理解生命周期有助于更好地控制爬虫行为和优化性能。
生命周期概览
1. 完整生命周期
启动 → 初始化 → 种子生成 → 任务循环 → 清理 → 结束
↓ ↓ ↓ ↓ ↓ ↓
run → run_init → make_seeds → 执行任务 → cleanup → exit
2. 任务循环详解
获取种子 → 构造请求 → 发送请求 → 解析响应 → 处理数据 → 标记完成
↓ ↓ ↓ ↓ ↓ ↓
get_seed → make_request → download → parse → pipeline → success/failure
生命周期阶段详解
1. 启动阶段 (run)
from bricks.spider import air
class MySpider(air.Spider):
def run(self, **kwargs):
"""
爬虫主入口
:param kwargs: 运行时参数
:return: 执行结果
"""
# 1. 执行初始化
init_result = self.run_init(**kwargs)
# 2. 执行爬虫主逻辑
spider_result = self.run_spider()
# 3. 返回结果
return {
"init": init_result,
"spider": spider_result,
"stats": self.get_stats()
}
# 启动爬虫
spider = MySpider()
result = spider.run(category="news", max_pages=10)
2. 初始化阶段 (run_init)
class MySpider(air.Spider):
def run_init(self, **kwargs):
"""
初始化阶段
1. 触发 BEFORE_INIT 事件
2. 调用 make_seeds 生成种子
3. 将种子放入任务队列
4. 触发 AFTER_INIT 事件
"""
print("开始初始化...")
# 生成种子
seeds = self.make_seeds(self.Context(target=self), **kwargs)
# 统计种子数量
seed_count = 0
for seed in seeds:
self.task_queue.put(seed)
seed_count += 1
print(f"生成了 {seed_count} 个种子")
return {"seed_count": seed_count}
def make_seeds(self, context, **kwargs):
"""生成初始种子"""
category = kwargs.get("category", "default")
max_pages = kwargs.get("max_pages", 5)
return [
{"page": i, "category": category}
for i in range(1, max_pages + 1)
]
3. 任务循环阶段 (run_spider)
class MySpider(air.Spider):
def run_spider(self):
"""
爬虫主循环
1. 从队列获取种子
2. 构造请求
3. 发送请求
4. 解析响应
5. 处理数据
6. 重复直到队列为空
"""
processed_count = 0
while not self.task_queue.empty() or not self.should_stop():
try:
# 获取种子
seed = self.task_queue.get(timeout=1)
# 创建上下文
context = self.Context(target=self, seeds=seed)
# 执行任务
self.process_task(context)
processed_count += 1
except queue.Empty:
if self.should_stop():
break
except Exception as e:
print(f"任务处理失败: {e}")
return {"processed_count": processed_count}
def process_task(self, context):
"""处理单个任务"""
try:
# 1. 构造请求
request = self.make_request(context)
if not request:
return
context.request = request
# 2. 发送请求
response = self.downloader.download(request)
context.response = response
# 3. 解析响应
items = self.parse(context)
context.items = items
# 4. 处理数据
self.item_pipeline(context)
except Exception as e:
context.failure(e)
事件驱动的生命周期
1. 生命周期事件
from bricks import const
from bricks.core import events
class EventDrivenSpider(air.Spider):
@events.on(const.BEFORE_INIT)
def on_before_init(self, context):
"""初始化前事件"""
print("准备开始初始化...")
self.start_time = time.time()
@events.on(const.AFTER_INIT)
def on_after_init(self, context):
"""初始化后事件"""
init_duration = time.time() - self.start_time
print(f"初始化完成,耗时: {init_duration:.2f}s")
@events.on(const.BEFORE_REQUEST)
def on_before_request(self, context):
"""请求前事件"""
print(f"准备请求: {context.request.url}")
context.custom_data["request_start"] = time.time()
@events.on(const.AFTER_REQUEST)
def on_after_request(self, context):
"""请求后事件"""
duration = time.time() - context.custom_data["request_start"]
print(f"请求完成: {context.response.status_code}, 耗时: {duration:.2f}s")
@events.on(const.BEFORE_PARSE)
def on_before_parse(self, context):
"""解析前事件"""
print("开始解析响应...")
@events.on(const.AFTER_PARSE)
def on_after_parse(self, context):
"""解析后事件"""
print(f"解析完成,提取了 {len(context.items)} 条数据")
@events.on(const.BEFORE_PIPELINE)
def on_before_pipeline(self, context):
"""处理前事件"""
print("开始处理数据...")
@events.on(const.AFTER_PIPELINE)
def on_after_pipeline(self, context):
"""处理后事件"""
if context.is_success:
print("数据处理成功")
else:
print("数据处理失败")
2. 错误处理事件
class ErrorHandlingSpider(air.Spider):
@events.on(const.ON_ERROR)
def on_error(self, context):
"""错误处理事件"""
exception = context.exception
print(f"发生错误: {exception}")
# 记录错误日志
self.log_error(context, exception)
# 决定是否重试
if context.retry_times < context.max_retry:
print("准备重试...")
context.retry()
else:
print("重试次数已达上限,标记失败")
context.failure()
@events.on(const.ON_RETRY)
def on_retry(self, context):
"""重试事件"""
print(f"第 {context.retry_times} 次重试")
# 增加重试延迟
time.sleep(2 ** context.retry_times)
def log_error(self, context, exception):
"""记录错误日志"""
error_info = {
"url": getattr(context.request, "url", "unknown"),
"error": str(exception),
"retry_times": context.retry_times,
"timestamp": time.time()
}
print(f"错误日志: {error_info}")
生命周期控制
1. 优雅停止
class GracefulSpider(air.Spider):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.stop_requested = False
# 注册信号处理器
import signal
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
"""信号处理器"""
print(f"收到信号 {signum},准备优雅停止...")
self.stop_requested = True
def should_stop(self):
"""检查是否应该停止"""
return self.stop_requested or super().should_stop()
def run_spider(self):
"""重写主循环,支持优雅停止"""
while not self.should_stop():
try:
seed = self.task_queue.get(timeout=1)
context = self.Context(target=self, seeds=seed)
self.process_task(context)
except queue.Empty:
continue
except KeyboardInterrupt:
print("收到中断信号,开始清理...")
self.cleanup()
break
print("爬虫已停止")
2. 生命周期钩子
class HookedSpider(air.Spider):
def before_start(self):
"""启动前钩子"""
print("爬虫即将启动...")
self.setup_logging()
self.setup_database()
def after_start(self):
"""启动后钩子"""
print("爬虫已启动")
def before_stop(self):
"""停止前钩子"""
print("爬虫即将停止...")
self.save_progress()
def after_stop(self):
"""停止后钩子"""
print("爬虫已停止")
self.cleanup_resources()
def run(self, **kwargs):
"""重写 run 方法,添加钩子"""
try:
self.before_start()
result = super().run(**kwargs)
self.after_start()
return result
finally:
self.before_stop()
self.after_stop()
def setup_logging(self):
"""设置日志"""
import logging
logging.basicConfig(level=logging.INFO)
def setup_database(self):
"""设置数据库连接"""
print("连接数据库...")
def save_progress(self):
"""保存进度"""
print("保存爬虫进度...")
def cleanup_resources(self):
"""清理资源"""
print("清理资源...")
3. 状态监控
class MonitoredSpider(air.Spider):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.status = "initialized"
self.metrics = {
"start_time": None,
"end_time": None,
"total_tasks": 0,
"completed_tasks": 0,
"failed_tasks": 0
}
def run_init(self, **kwargs):
"""重写初始化,添加状态监控"""
self.status = "initializing"
self.metrics["start_time"] = time.time()
result = super().run_init(**kwargs)
self.status = "running"
self.metrics["total_tasks"] = result.get("seed_count", 0)
return result
def process_task(self, context):
"""重写任务处理,添加指标统计"""
try:
super().process_task(context)
if context.is_success:
self.metrics["completed_tasks"] += 1
else:
self.metrics["failed_tasks"] += 1
except Exception as e:
self.metrics["failed_tasks"] += 1
raise
def run_spider(self):
"""重写主循环,添加状态更新"""
try:
result = super().run_spider()
self.status = "completed"
return result
except Exception as e:
self.status = "failed"
raise
finally:
self.metrics["end_time"] = time.time()
def get_status(self):
"""获取当前状态"""
return {
"status": self.status,
"metrics": self.metrics.copy(),
"progress": self.get_progress()
}
def get_progress(self):
"""计算进度"""
total = self.metrics["total_tasks"]
completed = self.metrics["completed_tasks"]
failed = self.metrics["failed_tasks"]
if total == 0:
return 0
return (completed + failed) / total * 100
生命周期最佳实践
1. 资源管理
class ResourceManagedSpider(air.Spider):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.resources = []
def acquire_resource(self, resource):
"""获取资源"""
self.resources.append(resource)
return resource
def release_resources(self):
"""释放所有资源"""
for resource in self.resources:
try:
if hasattr(resource, "close"):
resource.close()
elif hasattr(resource, "cleanup"):
resource.cleanup()
except Exception as e:
print(f"释放资源失败: {e}")
self.resources.clear()
def __del__(self):
"""析构函数,确保资源释放"""
self.release_resources()
2. 异常恢复
class RecoverableSpider(air.Spider):
def save_state(self):
"""保存爬虫状态"""
state = {
"queue_size": self.task_queue.qsize(),
"processed_count": self.metrics.get("completed_tasks", 0),
"timestamp": time.time()
}
with open("spider_state.json", "w") as f:
json.dump(state, f)
def load_state(self):
"""加载爬虫状态"""
try:
with open("spider_state.json", "r") as f:
return json.load(f)
except FileNotFoundError:
return None
def run(self, **kwargs):
"""支持状态恢复的运行"""
# 尝试恢复状态
saved_state = self.load_state()
if saved_state:
print(f"发现保存的状态: {saved_state}")
# 根据保存的状态调整运行参数
try:
return super().run(**kwargs)
except Exception as e:
# 保存当前状态
self.save_state()
raise
理解 Spider 生命周期有助于更好地控制爬虫行为,实现高效、稳定的数据爬取。