2.7.1 队列概览
任务队列是 Bricks 框架的核心组件之一,负责管理爬虫的种子数据,支持单机和分布式爬取场景。
队列架构
1. 队列类型
Bricks 提供了多种队列实现:
队列类型 | 描述 | 适用场景 |
---|---|---|
LocalQueue | 本地内存队列 | 单机爬取、开发测试 |
RedisQueue | Redis分布式队列 | 分布式爬取、生产环境 |
SmartQueue | 智能队列(内部使用) | 队列底层实现 |
2. 队列结构
每个队列包含多个子队列:
队列名称
├── current # 当前待处理队列
├── temp # 临时处理队列
├── failure # 失败重试队列
├── record # 记录信息队列
└── lock # 锁定状态队列
队列工作流程
1. 种子生命周期
生成种子 → current队列 → 获取处理 → temp队列 → 处理完成 → 移除
↓
处理失败 → failure队列 → 重试
2. 队列状态转换
# 1. 种子投放到 current 队列
queue.put("my_spider", {"url": "https://example.com"})
# 2. 爬虫获取种子,移动到 temp 队列
seed = queue.get("my_spider")
# 3. 处理成功,从 temp 队列移除
queue.remove("my_spider", seed)
# 4. 处理失败,移动到 failure 队列
queue.remove("my_spider", seed, backup="failure")
队列接口
1. 基础操作
from bricks.lib.queues import LocalQueue, RedisQueue
# 创建队列
local_queue = LocalQueue()
redis_queue = RedisQueue(host="localhost", port=6379, database=0)
# 投放种子
queue.put("spider_name", {"url": "https://example.com"})
# 获取种子
seed = queue.get("spider_name")
# 移除种子
queue.remove("spider_name", seed)
# 获取队列大小
size = queue.size("spider_name")
# 清空队列
queue.clear("spider_name")
2. 高级操作
# 队列翻转(将失败队列合并到当前队列)
queue.reverse("spider_name")
# 智能翻转(根据条件自动翻转)
queue.smart_reverse("spider_name")
# 替换种子
queue.replace("spider_name", (old_seed, new_seed))
# 合并队列
queue.merge("dest_queue", "source_queue1", "source_queue2")
队列配置
1. 在爬虫中使用队列
from bricks.spider import air
from bricks.lib.queues import LocalQueue, RedisQueue
class MySpider(air.Spider):
def __init__(self, **kwargs):
# 使用本地队列
kwargs.setdefault("task_queue", LocalQueue())
kwargs.setdefault("queue_name", "my_spider")
super().__init__(**kwargs)
# 使用 Redis 队列
class DistributedSpider(air.Spider):
def __init__(self, **kwargs):
# 使用 Redis 队列
kwargs.setdefault("task_queue", RedisQueue(
host="localhost",
port=6379,
database=0
))
kwargs.setdefault("queue_name", "distributed_spider")
super().__init__(**kwargs)
2. 队列参数配置
# Redis 队列配置
redis_queue = RedisQueue(
host="127.0.0.1", # Redis 主机
port=6379, # Redis 端口
password=None, # Redis 密码
database=0, # Redis 数据库
genre="set", # 队列类型:list/set/zset
decode_responses=True, # 自动解码响应
socket_timeout=30, # 连接超时
socket_connect_timeout=30, # 连接建立超时
retry_on_timeout=True, # 超时重试
health_check_interval=30 # 健康检查间隔
)
# 本地队列配置
local_queue = LocalQueue() # 无需额外配置
队列监控
1. 队列状态查询
# 获取各子队列大小
current_size = queue.size("spider_name", qtypes=("current",))
temp_size = queue.size("spider_name", qtypes=("temp",))
failure_size = queue.size("spider_name", qtypes=("failure",))
print(f"待处理: {current_size}")
print(f"处理中: {temp_size}")
print(f"失败重试: {failure_size}")
# 获取总大小
total_size = queue.size("spider_name")
print(f"总计: {total_size}")
2. 队列健康检查
def check_queue_health(queue, spider_name):
"""检查队列健康状态"""
stats = {
"current": queue.size(spider_name, qtypes=("current",)),
"temp": queue.size(spider_name, qtypes=("temp",)),
"failure": queue.size(spider_name, qtypes=("failure",)),
"total": queue.size(spider_name)
}
# 检查是否有积压
if stats["temp"] > stats["current"] * 2:
print("警告: temp队列积压严重")
# 检查失败率
if stats["failure"] > stats["total"] * 0.1:
print("警告: 失败率过高")
return stats
队列最佳实践
1. 队列命名规范
# 推荐的队列命名方式
queue_name = f"{project_name}.{spider_name}"
# 例如
queue_name = "ecommerce.product_spider"
queue_name = "news.article_spider"
2. 错误处理
class RobustSpider(air.Spider):
def item_pipeline(self, context):
try:
# 处理数据
self.process_data(context.items)
# 标记成功
context.success()
except Exception as e:
# 记录错误
logger.error(f"处理失败: {e}")
# 标记失败,种子会进入 failure 队列
context.failure()
3. 队列清理
def cleanup_queue(queue, spider_name, max_failure_count=1000):
"""清理队列中的无效数据"""
failure_size = queue.size(spider_name, qtypes=("failure",))
if failure_size > max_failure_count:
print(f"清理失败队列,当前大小: {failure_size}")
queue.clear(spider_name, qtypes=("failure",))
4. 队列备份
def backup_queue(source_queue, backup_queue, spider_name):
"""备份队列数据"""
# 获取所有种子
seeds = []
while not source_queue.is_empty(spider_name):
seed = source_queue.get(spider_name)
if seed:
seeds.extend(seed)
# 备份到另一个队列
if seeds:
backup_queue.put(f"{spider_name}_backup", *seeds)
# 恢复原队列
if seeds:
source_queue.put(spider_name, *seeds)
队列性能优化
1. 批量操作
# 批量投放种子
seeds = [{"url": f"https://example.com/page{i}"} for i in range(1000)]
queue.put("spider_name", *seeds)
# 批量获取种子
seeds = queue.get("spider_name", count=10)
2. 连接池优化
# Redis 队列连接池配置
redis_queue = RedisQueue(
host="localhost",
port=6379,
database=0,
max_connections=20, # 最大连接数
connection_pool_kwargs={
"max_connections": 20,
"retry_on_timeout": True
}
)
3. 内存优化
# 对于大量小种子,使用压缩
import json
import gzip
def compress_seed(seed):
"""压缩种子数据"""
json_str = json.dumps(seed)
return gzip.compress(json_str.encode())
def decompress_seed(compressed_data):
"""解压种子数据"""
json_str = gzip.decompress(compressed_data).decode()
return json.loads(json_str)
队列是 Bricks 框架的核心基础设施,正确理解和使用队列对于构建高效、稳定的爬虫系统至关重要。