bricks
开发指南
2.7 任务队列
2.7.4 队列操作

2.7.4 队列操作

本章详细介绍 Bricks 队列系统的各种操作方法和高级用法。

基础操作

1. 种子投放 (put)

from bricks.lib.queues import LocalQueue, RedisQueue
 
queue = LocalQueue()  # 或 RedisQueue()
 
# 单个种子投放
queue.put("spider_name", {"url": "https://example.com"})
 
# 批量种子投放
seeds = [
    {"url": "https://example.com/page1"},
    {"url": "https://example.com/page2"},
    {"url": "https://example.com/page3"}
]
queue.put("spider_name", *seeds)
 
# 投放到指定队列类型
queue.put("spider_name", {"url": "page1"}, qtypes="current")
queue.put("spider_name", {"url": "page2"}, qtypes="temp")
queue.put("spider_name", {"url": "page3"}, qtypes="failure")
 
# 投放到多个队列类型
queue.put("spider_name", {"url": "page4"}, qtypes=["current", "temp"])

2. 种子获取 (get)

# 获取单个种子
seed = queue.get("spider_name")
if seed:
    print(f"获取到种子: {seed[0]}")
 
# 获取多个种子
seeds = queue.get("spider_name", count=5)
if seeds:
    print(f"获取到 {len(seeds)} 个种子")
 
# 从指定队列类型获取
seed = queue.get("spider_name", qtypes="current")
 
# 获取时的额外参数(Redis队列)
if isinstance(queue, RedisQueue):
    # 从队列尾部获取
    seed = queue.get("spider_name", tail=True)

3. 种子移除 (remove)

# 移除种子(标记完成)
if seed:
    queue.remove("spider_name", *seed)
 
# 移除并备份到失败队列
queue.remove("spider_name", seed, backup="failure")
 
# 从指定队列类型移除
queue.remove("spider_name", seed, qtypes="temp")
 
# 从多个队列类型移除
queue.remove("spider_name", seed, qtypes=["current", "temp"])

4. 队列大小查询 (size)

# 获取总队列大小
total_size = queue.size("spider_name")
 
# 获取指定队列类型大小
current_size = queue.size("spider_name", qtypes=("current",))
temp_size = queue.size("spider_name", qtypes=("temp",))
failure_size = queue.size("spider_name", qtypes=("failure",))
 
# 获取多个队列类型的总大小
active_size = queue.size("spider_name", qtypes=("current", "temp"))
 
# 获取多个队列的大小
total_size = queue.size("spider1", "spider2", "spider3")

5. 队列清空 (clear)

# 清空所有队列类型
queue.clear("spider_name")
 
# 清空指定队列类型
queue.clear("spider_name", qtypes=("temp",))
queue.clear("spider_name", qtypes=("failure",))
 
# 清空多个队列
queue.clear("spider1", "spider2", qtypes=("current", "temp"))
 
# 清空包括记录和锁
queue.clear("spider_name", qtypes=("current", "temp", "failure", "record", "lock"))

高级操作

1. 队列翻转 (reverse)

# 手动翻转:将失败和临时队列合并到当前队列
success = queue.reverse("spider_name")
if success:
    print("队列翻转成功")
 
# 指定翻转的队列类型
queue.reverse("spider_name", qtypes=["failure"])  # 只翻转失败队列
queue.reverse("spider_name", qtypes=["temp"])     # 只翻转临时队列
 
# 智能翻转:根据队列状态自动决定
success = queue.smart_reverse("spider_name")
if success:
    print("智能翻转成功")
else:
    print("无需翻转")
 
# 带状态的智能翻转
success = queue.smart_reverse("spider_name", status=0)  # status=0 表示无运行任务

2. 种子替换 (replace)

# 单个种子替换
old_seed = {"url": "https://old.com", "page": 1}
new_seed = {"url": "https://new.com", "page": 1, "updated": True}
 
count = queue.replace("spider_name", (old_seed, new_seed))
print(f"替换了 {count} 个种子")
 
# 批量种子替换
replacements = [
    ({"url": "old1"}, {"url": "new1"}),
    ({"url": "old2"}, {"url": "new2"}),
    ({"url": "old3"}, {"url": "new3"})
]
count = queue.replace("spider_name", *replacements)
 
# 在指定队列类型中替换
count = queue.replace("spider_name", (old_seed, new_seed), qtypes=["current"])
 
# 在多个队列类型中替换
count = queue.replace("spider_name", (old_seed, new_seed), qtypes=["current", "temp", "failure"])

3. 队列合并 (merge)

# 将源队列合并到目标队列
queue.put("source1", {"url": "page1"})
queue.put("source2", {"url": "page2"})
 
# 合并到目标队列
success = queue.merge("target", "source1", "source2")
if success:
    print("队列合并成功")
 
# 检查合并结果
merged_size = queue.size("target")
print(f"合并后队列大小: {merged_size}")
 
# 合并后源队列会被清空
source1_size = queue.size("source1")  # 应该为 0
source2_size = queue.size("source2")  # 应该为 0

队列状态管理

1. 初始化控制

# 获取初始化权限
permission = queue.command("spider_name", {
    "action": queue.COMMANDS.GET_PERMISSION,
    "interval": 10  # 心跳间隔
})
 
if permission.get("state"):
    print("获得初始化权限")
    
    # 设置初始化状态
    queue.command("spider_name", {
        "action": queue.COMMANDS.SET_INIT
    })
    
    # 执行初始化逻辑
    # ...
    
    # 释放初始化状态
    queue.command("spider_name", {
        "action": queue.COMMANDS.RELEASE_INIT,
        "history_ttl": 3600,  # 历史记录保留时间
        "record_ttl": 86400   # 记录保留时间
    })
else:
    print(f"无法获得权限: {permission.get('msg')}")

2. 记录管理

# 设置记录信息
record = {
    "total": 1000,
    "success": 500,
    "start_time": "2024-01-01 10:00:00",
    "machine_id": "worker_001"
}
 
queue.command("spider_name", {
    "action": queue.COMMANDS.SET_RECORD,
    "record": record
})
 
# 获取记录信息
saved_record = queue.command("spider_name", {
    "action": queue.COMMANDS.GET_RECORD
})
print(f"保存的记录: {saved_record}")
 
# 继续之前的记录
queue.command("spider_name", {
    "action": queue.COMMANDS.CONTINUE_RECORD
})

3. 等待机制

import threading
import time
 
def producer():
    """生产者线程"""
    time.sleep(3)  # 模拟延迟
    
    # 投放种子
    queue.put("spider_name", {"url": "delayed_page"})
    
    # 设置初始化完成
    queue.command("spider_name", {
        "action": queue.COMMANDS.SET_INIT
    })
 
def consumer():
    """消费者线程"""
    # 等待初始化完成
    queue.command("spider_name", {
        "action": queue.COMMANDS.WAIT_INIT,
        "time": int(time.time() * 1000)
    })
    
    # 获取种子
    seed = queue.get("spider_name")
    print(f"消费者获取到: {seed}")
 
# 启动生产者和消费者
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()
 
time.sleep(5)

实用工具函数

1. 队列监控工具

def monitor_queue_status(queue, spider_name, interval=5):
    """监控队列状态"""
    import time
    
    print(f"开始监控队列: {spider_name}")
    
    while True:
        stats = {
            "current": queue.size(spider_name, qtypes=("current",)),
            "temp": queue.size(spider_name, qtypes=("temp",)),
            "failure": queue.size(spider_name, qtypes=("failure",)),
            "total": queue.size(spider_name)
        }
        
        print(f"[{time.strftime('%H:%M:%S')}] "
              f"当前: {stats['current']}, "
              f"处理中: {stats['temp']}, "
              f"失败: {stats['failure']}, "
              f"总计: {stats['total']}")
        
        # 检查异常情况
        if stats["failure"] > stats["total"] * 0.2:
            print("⚠️  警告: 失败率过高")
        
        if stats["temp"] > stats["current"] * 3:
            print("⚠️  警告: 处理积压严重")
        
        if stats["total"] == 0:
            print("✅ 队列已空,监控结束")
            break
        
        time.sleep(interval)
 
# 使用监控工具
import threading
threading.Thread(
    target=monitor_queue_status, 
    args=(queue, "spider_name"), 
    daemon=True
).start()

2. 队列健康检查

def health_check(queue, spider_name):
    """队列健康检查"""
    health_report = {
        "status": "healthy",
        "issues": [],
        "recommendations": []
    }
    
    # 检查队列大小
    stats = {
        "current": queue.size(spider_name, qtypes=("current",)),
        "temp": queue.size(spider_name, qtypes=("temp",)),
        "failure": queue.size(spider_name, qtypes=("failure",))
    }
    
    # 检查失败率
    total = sum(stats.values())
    if total > 0:
        failure_rate = stats["failure"] / total
        if failure_rate > 0.1:
            health_report["status"] = "warning"
            health_report["issues"].append(f"失败率过高: {failure_rate:.2%}")
            health_report["recommendations"].append("检查爬虫逻辑和网络连接")
    
    # 检查积压情况
    if stats["temp"] > stats["current"] * 2:
        health_report["status"] = "warning"
        health_report["issues"].append("处理积压严重")
        health_report["recommendations"].append("增加并发数或优化处理逻辑")
    
    # 检查是否有死锁
    if isinstance(queue, RedisQueue):
        is_init = queue.command(spider_name, {"action": queue.COMMANDS.IS_INIT})
        if not is_init and total == 0:
            health_report["issues"].append("可能存在死锁")
            health_report["recommendations"].append("尝试重启爬虫或清理队列")
    
    return health_report
 
# 使用健康检查
report = health_check(queue, "spider_name")
print(f"健康状态: {report['status']}")
if report["issues"]:
    print("发现问题:")
    for issue in report["issues"]:
        print(f"  - {issue}")
    print("建议:")
    for rec in report["recommendations"]:
        print(f"  - {rec}")

3. 队列数据迁移

def migrate_queue_data(source_queue, target_queue, spider_name, batch_size=100):
    """迁移队列数据"""
    print(f"开始迁移队列数据: {spider_name}")
    
    total_migrated = 0
    qtypes = ["current", "temp", "failure"]
    
    for qtype in qtypes:
        print(f"迁移 {qtype} 队列...")
        
        while True:
            # 批量获取数据
            if qtype == "current":
                seeds = source_queue.get(spider_name, count=batch_size)
            else:
                # 对于 temp 和 failure 队列,需要特殊处理
                size = source_queue.size(spider_name, qtypes=(qtype,))
                if size == 0:
                    break
                
                # 临时移动到 current 队列进行获取
                source_queue.reverse(spider_name, qtypes=[qtype])
                seeds = source_queue.get(spider_name, count=min(batch_size, size))
            
            if not seeds:
                break
            
            # 投放到目标队列
            target_queue.put(spider_name, *seeds, qtypes=qtype)
            total_migrated += len(seeds)
            
            print(f"已迁移 {total_migrated} 个种子...")
    
    print(f"迁移完成,总计: {total_migrated} 个种子")
    return total_migrated
 
# 使用迁移工具
# 从本地队列迁移到 Redis 队列
local_queue = LocalQueue()
redis_queue = RedisQueue(host="localhost", database=0)
 
migrate_queue_data(local_queue, redis_queue, "spider_name")

4. 队列性能测试

def benchmark_queue_operations(queue, spider_name, num_operations=1000):
    """队列操作性能测试"""
    import time
    
    print(f"开始性能测试: {num_operations} 次操作")
    
    # 测试投放性能
    start_time = time.time()
    seeds = [{"id": i} for i in range(num_operations)]
    queue.put(spider_name, *seeds)
    put_time = time.time() - start_time
    
    # 测试获取性能
    start_time = time.time()
    retrieved_seeds = []
    while len(retrieved_seeds) < num_operations:
        batch = queue.get(spider_name, count=100)
        if batch:
            retrieved_seeds.extend(batch)
        else:
            break
    get_time = time.time() - start_time
    
    # 测试移除性能
    start_time = time.time()
    for i in range(0, len(retrieved_seeds), 100):
        batch = retrieved_seeds[i:i+100]
        queue.remove(spider_name, *batch)
    remove_time = time.time() - start_time
    
    # 输出结果
    print(f"投放性能: {num_operations/put_time:.2f} ops/sec")
    print(f"获取性能: {len(retrieved_seeds)/get_time:.2f} ops/sec")
    print(f"移除性能: {len(retrieved_seeds)/remove_time:.2f} ops/sec")
    
    return {
        "put_ops_per_sec": num_operations/put_time,
        "get_ops_per_sec": len(retrieved_seeds)/get_time,
        "remove_ops_per_sec": len(retrieved_seeds)/remove_time
    }
 
# 性能测试
benchmark_queue_operations(queue, "benchmark_test", 10000)

最佳实践

1. 错误处理

def safe_queue_operation(queue, operation, *args, **kwargs):
    """安全的队列操作"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return operation(*args, **kwargs)
        except Exception as e:
            print(f"队列操作失败 (尝试 {attempt + 1}/{max_retries}): {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # 指数退避
 
# 使用安全操作
try:
    seeds = safe_queue_operation(queue, queue.get, "spider_name", count=10)
except Exception as e:
    print(f"队列操作最终失败: {e}")

2. 资源清理

def cleanup_expired_queues(queue, prefix="spider_", max_age_hours=24):
    """清理过期的队列"""
    import time
    
    if isinstance(queue, RedisQueue):
        # 获取所有匹配的键
        pattern = f"{prefix}*"
        keys = queue.redis_db.keys(pattern)
        
        current_time = time.time()
        cleaned_count = 0
        
        for key in keys:
            # 检查键的最后修改时间
            ttl = queue.redis_db.ttl(key)
            if ttl == -1:  # 没有过期时间的键
                # 检查是否长时间未使用
                last_modified = queue.redis_db.object("idletime", key)
                if last_modified and last_modified > max_age_hours * 3600:
                    queue.redis_db.delete(key)
                    cleaned_count += 1
        
        print(f"清理了 {cleaned_count} 个过期队列")
        return cleaned_count
 
# 定期清理
cleanup_expired_queues(queue)

掌握这些队列操作方法,可以更好地控制和优化爬虫的执行流程。