bricks
开发指南
2.7 任务队列
2.7.1 队列概览

2.7.1 队列概览

任务队列是 Bricks 框架的核心组件之一,负责管理爬虫的种子数据,支持单机和分布式爬取场景。

队列架构

1. 队列类型

Bricks 提供了多种队列实现:

队列类型描述适用场景
LocalQueue本地内存队列单机爬取、开发测试
RedisQueueRedis分布式队列分布式爬取、生产环境
SmartQueue智能队列(内部使用)队列底层实现

2. 队列结构

每个队列包含多个子队列:

队列名称
├── current   # 当前待处理队列
├── temp      # 临时处理队列
├── failure   # 失败重试队列
├── record    # 记录信息队列
└── lock      # 锁定状态队列

队列工作流程

1. 种子生命周期

生成种子 → current队列 → 获取处理 → temp队列 → 处理完成 → 移除

                              处理失败 → failure队列 → 重试

2. 队列状态转换

# 1. 种子投放到 current 队列
queue.put("my_spider", {"url": "https://example.com"})
 
# 2. 爬虫获取种子,移动到 temp 队列
seed = queue.get("my_spider")
 
# 3. 处理成功,从 temp 队列移除
queue.remove("my_spider", seed)
 
# 4. 处理失败,移动到 failure 队列
queue.remove("my_spider", seed, backup="failure")

队列接口

1. 基础操作

from bricks.lib.queues import LocalQueue, RedisQueue
 
# 创建队列
local_queue = LocalQueue()
redis_queue = RedisQueue(host="localhost", port=6379, database=0)
 
# 投放种子
queue.put("spider_name", {"url": "https://example.com"})
 
# 获取种子
seed = queue.get("spider_name")
 
# 移除种子
queue.remove("spider_name", seed)
 
# 获取队列大小
size = queue.size("spider_name")
 
# 清空队列
queue.clear("spider_name")

2. 高级操作

# 队列翻转(将失败队列合并到当前队列)
queue.reverse("spider_name")
 
# 智能翻转(根据条件自动翻转)
queue.smart_reverse("spider_name")
 
# 替换种子
queue.replace("spider_name", (old_seed, new_seed))
 
# 合并队列
queue.merge("dest_queue", "source_queue1", "source_queue2")

队列配置

1. 在爬虫中使用队列

from bricks.spider import air
from bricks.lib.queues import LocalQueue, RedisQueue
 
class MySpider(air.Spider):
    def __init__(self, **kwargs):
        # 使用本地队列
        kwargs.setdefault("task_queue", LocalQueue())
        kwargs.setdefault("queue_name", "my_spider")
        
        super().__init__(**kwargs)
 
# 使用 Redis 队列
class DistributedSpider(air.Spider):
    def __init__(self, **kwargs):
        # 使用 Redis 队列
        kwargs.setdefault("task_queue", RedisQueue(
            host="localhost",
            port=6379,
            database=0
        ))
        kwargs.setdefault("queue_name", "distributed_spider")
        
        super().__init__(**kwargs)

2. 队列参数配置

# Redis 队列配置
redis_queue = RedisQueue(
    host="127.0.0.1",           # Redis 主机
    port=6379,                  # Redis 端口
    password=None,              # Redis 密码
    database=0,                 # Redis 数据库
    genre="set",                # 队列类型:list/set/zset
    decode_responses=True,      # 自动解码响应
    socket_timeout=30,          # 连接超时
    socket_connect_timeout=30,  # 连接建立超时
    retry_on_timeout=True,      # 超时重试
    health_check_interval=30    # 健康检查间隔
)
 
# 本地队列配置
local_queue = LocalQueue()  # 无需额外配置

队列监控

1. 队列状态查询

# 获取各子队列大小
current_size = queue.size("spider_name", qtypes=("current",))
temp_size = queue.size("spider_name", qtypes=("temp",))
failure_size = queue.size("spider_name", qtypes=("failure",))
 
print(f"待处理: {current_size}")
print(f"处理中: {temp_size}")
print(f"失败重试: {failure_size}")
 
# 获取总大小
total_size = queue.size("spider_name")
print(f"总计: {total_size}")

2. 队列健康检查

def check_queue_health(queue, spider_name):
    """检查队列健康状态"""
    stats = {
        "current": queue.size(spider_name, qtypes=("current",)),
        "temp": queue.size(spider_name, qtypes=("temp",)),
        "failure": queue.size(spider_name, qtypes=("failure",)),
        "total": queue.size(spider_name)
    }
    
    # 检查是否有积压
    if stats["temp"] > stats["current"] * 2:
        print("警告: temp队列积压严重")
    
    # 检查失败率
    if stats["failure"] > stats["total"] * 0.1:
        print("警告: 失败率过高")
    
    return stats

队列最佳实践

1. 队列命名规范

# 推荐的队列命名方式
queue_name = f"{project_name}.{spider_name}"
 
# 例如
queue_name = "ecommerce.product_spider"
queue_name = "news.article_spider"

2. 错误处理

class RobustSpider(air.Spider):
    def item_pipeline(self, context):
        try:
            # 处理数据
            self.process_data(context.items)
            # 标记成功
            context.success()
        except Exception as e:
            # 记录错误
            logger.error(f"处理失败: {e}")
            # 标记失败,种子会进入 failure 队列
            context.failure()

3. 队列清理

def cleanup_queue(queue, spider_name, max_failure_count=1000):
    """清理队列中的无效数据"""
    failure_size = queue.size(spider_name, qtypes=("failure",))
    
    if failure_size > max_failure_count:
        print(f"清理失败队列,当前大小: {failure_size}")
        queue.clear(spider_name, qtypes=("failure",))

4. 队列备份

def backup_queue(source_queue, backup_queue, spider_name):
    """备份队列数据"""
    # 获取所有种子
    seeds = []
    while not source_queue.is_empty(spider_name):
        seed = source_queue.get(spider_name)
        if seed:
            seeds.extend(seed)
    
    # 备份到另一个队列
    if seeds:
        backup_queue.put(f"{spider_name}_backup", *seeds)
    
    # 恢复原队列
    if seeds:
        source_queue.put(spider_name, *seeds)

队列性能优化

1. 批量操作

# 批量投放种子
seeds = [{"url": f"https://example.com/page{i}"} for i in range(1000)]
queue.put("spider_name", *seeds)
 
# 批量获取种子
seeds = queue.get("spider_name", count=10)

2. 连接池优化

# Redis 队列连接池配置
redis_queue = RedisQueue(
    host="localhost",
    port=6379,
    database=0,
    max_connections=20,         # 最大连接数
    connection_pool_kwargs={
        "max_connections": 20,
        "retry_on_timeout": True
    }
)

3. 内存优化

# 对于大量小种子,使用压缩
import json
import gzip
 
def compress_seed(seed):
    """压缩种子数据"""
    json_str = json.dumps(seed)
    return gzip.compress(json_str.encode())
 
def decompress_seed(compressed_data):
    """解压种子数据"""
    json_str = gzip.decompress(compressed_data).decode()
    return json.loads(json_str)

队列是 Bricks 框架的核心基础设施,正确理解和使用队列对于构建高效、稳定的爬虫系统至关重要。