2.7.4 队列操作
本章详细介绍 Bricks 队列系统的各种操作方法和高级用法。
基础操作
1. 种子投放 (put)
from bricks.lib.queues import LocalQueue, RedisQueue
queue = LocalQueue() # 或 RedisQueue()
# 单个种子投放
queue.put("spider_name", {"url": "https://example.com"})
# 批量种子投放
seeds = [
{"url": "https://example.com/page1"},
{"url": "https://example.com/page2"},
{"url": "https://example.com/page3"}
]
queue.put("spider_name", *seeds)
# 投放到指定队列类型
queue.put("spider_name", {"url": "page1"}, qtypes="current")
queue.put("spider_name", {"url": "page2"}, qtypes="temp")
queue.put("spider_name", {"url": "page3"}, qtypes="failure")
# 投放到多个队列类型
queue.put("spider_name", {"url": "page4"}, qtypes=["current", "temp"])
2. 种子获取 (get)
# 获取单个种子
seed = queue.get("spider_name")
if seed:
print(f"获取到种子: {seed[0]}")
# 获取多个种子
seeds = queue.get("spider_name", count=5)
if seeds:
print(f"获取到 {len(seeds)} 个种子")
# 从指定队列类型获取
seed = queue.get("spider_name", qtypes="current")
# 获取时的额外参数(Redis队列)
if isinstance(queue, RedisQueue):
# 从队列尾部获取
seed = queue.get("spider_name", tail=True)
3. 种子移除 (remove)
# 移除种子(标记完成)
if seed:
queue.remove("spider_name", *seed)
# 移除并备份到失败队列
queue.remove("spider_name", seed, backup="failure")
# 从指定队列类型移除
queue.remove("spider_name", seed, qtypes="temp")
# 从多个队列类型移除
queue.remove("spider_name", seed, qtypes=["current", "temp"])
4. 队列大小查询 (size)
# 获取总队列大小
total_size = queue.size("spider_name")
# 获取指定队列类型大小
current_size = queue.size("spider_name", qtypes=("current",))
temp_size = queue.size("spider_name", qtypes=("temp",))
failure_size = queue.size("spider_name", qtypes=("failure",))
# 获取多个队列类型的总大小
active_size = queue.size("spider_name", qtypes=("current", "temp"))
# 获取多个队列的大小
total_size = queue.size("spider1", "spider2", "spider3")
5. 队列清空 (clear)
# 清空所有队列类型
queue.clear("spider_name")
# 清空指定队列类型
queue.clear("spider_name", qtypes=("temp",))
queue.clear("spider_name", qtypes=("failure",))
# 清空多个队列
queue.clear("spider1", "spider2", qtypes=("current", "temp"))
# 清空包括记录和锁
queue.clear("spider_name", qtypes=("current", "temp", "failure", "record", "lock"))
高级操作
1. 队列翻转 (reverse)
# 手动翻转:将失败和临时队列合并到当前队列
success = queue.reverse("spider_name")
if success:
print("队列翻转成功")
# 指定翻转的队列类型
queue.reverse("spider_name", qtypes=["failure"]) # 只翻转失败队列
queue.reverse("spider_name", qtypes=["temp"]) # 只翻转临时队列
# 智能翻转:根据队列状态自动决定
success = queue.smart_reverse("spider_name")
if success:
print("智能翻转成功")
else:
print("无需翻转")
# 带状态的智能翻转
success = queue.smart_reverse("spider_name", status=0) # status=0 表示无运行任务
2. 种子替换 (replace)
# 单个种子替换
old_seed = {"url": "https://old.com", "page": 1}
new_seed = {"url": "https://new.com", "page": 1, "updated": True}
count = queue.replace("spider_name", (old_seed, new_seed))
print(f"替换了 {count} 个种子")
# 批量种子替换
replacements = [
({"url": "old1"}, {"url": "new1"}),
({"url": "old2"}, {"url": "new2"}),
({"url": "old3"}, {"url": "new3"})
]
count = queue.replace("spider_name", *replacements)
# 在指定队列类型中替换
count = queue.replace("spider_name", (old_seed, new_seed), qtypes=["current"])
# 在多个队列类型中替换
count = queue.replace("spider_name", (old_seed, new_seed), qtypes=["current", "temp", "failure"])
3. 队列合并 (merge)
# 将源队列合并到目标队列
queue.put("source1", {"url": "page1"})
queue.put("source2", {"url": "page2"})
# 合并到目标队列
success = queue.merge("target", "source1", "source2")
if success:
print("队列合并成功")
# 检查合并结果
merged_size = queue.size("target")
print(f"合并后队列大小: {merged_size}")
# 合并后源队列会被清空
source1_size = queue.size("source1") # 应该为 0
source2_size = queue.size("source2") # 应该为 0
队列状态管理
1. 初始化控制
# 获取初始化权限
permission = queue.command("spider_name", {
"action": queue.COMMANDS.GET_PERMISSION,
"interval": 10 # 心跳间隔
})
if permission.get("state"):
print("获得初始化权限")
# 设置初始化状态
queue.command("spider_name", {
"action": queue.COMMANDS.SET_INIT
})
# 执行初始化逻辑
# ...
# 释放初始化状态
queue.command("spider_name", {
"action": queue.COMMANDS.RELEASE_INIT,
"history_ttl": 3600, # 历史记录保留时间
"record_ttl": 86400 # 记录保留时间
})
else:
print(f"无法获得权限: {permission.get('msg')}")
2. 记录管理
# 设置记录信息
record = {
"total": 1000,
"success": 500,
"start_time": "2024-01-01 10:00:00",
"machine_id": "worker_001"
}
queue.command("spider_name", {
"action": queue.COMMANDS.SET_RECORD,
"record": record
})
# 获取记录信息
saved_record = queue.command("spider_name", {
"action": queue.COMMANDS.GET_RECORD
})
print(f"保存的记录: {saved_record}")
# 继续之前的记录
queue.command("spider_name", {
"action": queue.COMMANDS.CONTINUE_RECORD
})
3. 等待机制
import threading
import time
def producer():
"""生产者线程"""
time.sleep(3) # 模拟延迟
# 投放种子
queue.put("spider_name", {"url": "delayed_page"})
# 设置初始化完成
queue.command("spider_name", {
"action": queue.COMMANDS.SET_INIT
})
def consumer():
"""消费者线程"""
# 等待初始化完成
queue.command("spider_name", {
"action": queue.COMMANDS.WAIT_INIT,
"time": int(time.time() * 1000)
})
# 获取种子
seed = queue.get("spider_name")
print(f"消费者获取到: {seed}")
# 启动生产者和消费者
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()
time.sleep(5)
实用工具函数
1. 队列监控工具
def monitor_queue_status(queue, spider_name, interval=5):
"""监控队列状态"""
import time
print(f"开始监控队列: {spider_name}")
while True:
stats = {
"current": queue.size(spider_name, qtypes=("current",)),
"temp": queue.size(spider_name, qtypes=("temp",)),
"failure": queue.size(spider_name, qtypes=("failure",)),
"total": queue.size(spider_name)
}
print(f"[{time.strftime('%H:%M:%S')}] "
f"当前: {stats['current']}, "
f"处理中: {stats['temp']}, "
f"失败: {stats['failure']}, "
f"总计: {stats['total']}")
# 检查异常情况
if stats["failure"] > stats["total"] * 0.2:
print("⚠️ 警告: 失败率过高")
if stats["temp"] > stats["current"] * 3:
print("⚠️ 警告: 处理积压严重")
if stats["total"] == 0:
print("✅ 队列已空,监控结束")
break
time.sleep(interval)
# 使用监控工具
import threading
threading.Thread(
target=monitor_queue_status,
args=(queue, "spider_name"),
daemon=True
).start()
2. 队列健康检查
def health_check(queue, spider_name):
"""队列健康检查"""
health_report = {
"status": "healthy",
"issues": [],
"recommendations": []
}
# 检查队列大小
stats = {
"current": queue.size(spider_name, qtypes=("current",)),
"temp": queue.size(spider_name, qtypes=("temp",)),
"failure": queue.size(spider_name, qtypes=("failure",))
}
# 检查失败率
total = sum(stats.values())
if total > 0:
failure_rate = stats["failure"] / total
if failure_rate > 0.1:
health_report["status"] = "warning"
health_report["issues"].append(f"失败率过高: {failure_rate:.2%}")
health_report["recommendations"].append("检查爬虫逻辑和网络连接")
# 检查积压情况
if stats["temp"] > stats["current"] * 2:
health_report["status"] = "warning"
health_report["issues"].append("处理积压严重")
health_report["recommendations"].append("增加并发数或优化处理逻辑")
# 检查是否有死锁
if isinstance(queue, RedisQueue):
is_init = queue.command(spider_name, {"action": queue.COMMANDS.IS_INIT})
if not is_init and total == 0:
health_report["issues"].append("可能存在死锁")
health_report["recommendations"].append("尝试重启爬虫或清理队列")
return health_report
# 使用健康检查
report = health_check(queue, "spider_name")
print(f"健康状态: {report['status']}")
if report["issues"]:
print("发现问题:")
for issue in report["issues"]:
print(f" - {issue}")
print("建议:")
for rec in report["recommendations"]:
print(f" - {rec}")
3. 队列数据迁移
def migrate_queue_data(source_queue, target_queue, spider_name, batch_size=100):
"""迁移队列数据"""
print(f"开始迁移队列数据: {spider_name}")
total_migrated = 0
qtypes = ["current", "temp", "failure"]
for qtype in qtypes:
print(f"迁移 {qtype} 队列...")
while True:
# 批量获取数据
if qtype == "current":
seeds = source_queue.get(spider_name, count=batch_size)
else:
# 对于 temp 和 failure 队列,需要特殊处理
size = source_queue.size(spider_name, qtypes=(qtype,))
if size == 0:
break
# 临时移动到 current 队列进行获取
source_queue.reverse(spider_name, qtypes=[qtype])
seeds = source_queue.get(spider_name, count=min(batch_size, size))
if not seeds:
break
# 投放到目标队列
target_queue.put(spider_name, *seeds, qtypes=qtype)
total_migrated += len(seeds)
print(f"已迁移 {total_migrated} 个种子...")
print(f"迁移完成,总计: {total_migrated} 个种子")
return total_migrated
# 使用迁移工具
# 从本地队列迁移到 Redis 队列
local_queue = LocalQueue()
redis_queue = RedisQueue(host="localhost", database=0)
migrate_queue_data(local_queue, redis_queue, "spider_name")
4. 队列性能测试
def benchmark_queue_operations(queue, spider_name, num_operations=1000):
"""队列操作性能测试"""
import time
print(f"开始性能测试: {num_operations} 次操作")
# 测试投放性能
start_time = time.time()
seeds = [{"id": i} for i in range(num_operations)]
queue.put(spider_name, *seeds)
put_time = time.time() - start_time
# 测试获取性能
start_time = time.time()
retrieved_seeds = []
while len(retrieved_seeds) < num_operations:
batch = queue.get(spider_name, count=100)
if batch:
retrieved_seeds.extend(batch)
else:
break
get_time = time.time() - start_time
# 测试移除性能
start_time = time.time()
for i in range(0, len(retrieved_seeds), 100):
batch = retrieved_seeds[i:i+100]
queue.remove(spider_name, *batch)
remove_time = time.time() - start_time
# 输出结果
print(f"投放性能: {num_operations/put_time:.2f} ops/sec")
print(f"获取性能: {len(retrieved_seeds)/get_time:.2f} ops/sec")
print(f"移除性能: {len(retrieved_seeds)/remove_time:.2f} ops/sec")
return {
"put_ops_per_sec": num_operations/put_time,
"get_ops_per_sec": len(retrieved_seeds)/get_time,
"remove_ops_per_sec": len(retrieved_seeds)/remove_time
}
# 性能测试
benchmark_queue_operations(queue, "benchmark_test", 10000)
最佳实践
1. 错误处理
def safe_queue_operation(queue, operation, *args, **kwargs):
"""安全的队列操作"""
max_retries = 3
for attempt in range(max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
print(f"队列操作失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
# 使用安全操作
try:
seeds = safe_queue_operation(queue, queue.get, "spider_name", count=10)
except Exception as e:
print(f"队列操作最终失败: {e}")
2. 资源清理
def cleanup_expired_queues(queue, prefix="spider_", max_age_hours=24):
"""清理过期的队列"""
import time
if isinstance(queue, RedisQueue):
# 获取所有匹配的键
pattern = f"{prefix}*"
keys = queue.redis_db.keys(pattern)
current_time = time.time()
cleaned_count = 0
for key in keys:
# 检查键的最后修改时间
ttl = queue.redis_db.ttl(key)
if ttl == -1: # 没有过期时间的键
# 检查是否长时间未使用
last_modified = queue.redis_db.object("idletime", key)
if last_modified and last_modified > max_age_hours * 3600:
queue.redis_db.delete(key)
cleaned_count += 1
print(f"清理了 {cleaned_count} 个过期队列")
return cleaned_count
# 定期清理
cleanup_expired_queues(queue)
掌握这些队列操作方法,可以更好地控制和优化爬虫的执行流程。