2.7.2 本地队列
LocalQueue
是基于内存的本地队列实现,适用于单机爬取和开发测试场景。
特性
1. 核心特性
- 内存存储:所有数据存储在内存中,访问速度快
- 线程安全:支持多线程并发访问
- 轻量级:无需外部依赖,开箱即用
- 状态持久化:支持基于环境变量的简单状态持久化
2. 适用场景
- 单机爬虫开发和测试
- 小规模数据爬取
- 快速原型开发
- 不需要分布式的场景
基本使用
1. 创建和配置
from bricks.lib.queues import LocalQueue
from bricks.spider import air
# 创建本地队列
queue = LocalQueue()
# 在爬虫中使用
class MySpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", LocalQueue())
kwargs.setdefault("queue_name", "my_local_spider")
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
return [{"page": i} for i in range(1, 6)]
2. 基础操作
from bricks.lib.queues import LocalQueue
queue = LocalQueue()
# 投放种子
queue.put("test_spider", {"url": "https://example.com/page1"})
queue.put("test_spider", {"url": "https://example.com/page2"})
# 批量投放
seeds = [{"url": f"https://example.com/page{i}"} for i in range(3, 6)]
queue.put("test_spider", *seeds)
# 获取种子
seed = queue.get("test_spider")
print(f"获取到种子: {seed}")
# 获取多个种子
seeds = queue.get("test_spider", count=3)
print(f"获取到 {len(seeds)} 个种子")
# 查看队列大小
size = queue.size("test_spider")
print(f"队列大小: {size}")
# 移除种子(标记完成)
if seed:
queue.remove("test_spider", *seed)
高级功能
1. 队列类型操作
queue = LocalQueue()
# 投放到不同类型的队列
queue.put("spider", {"url": "page1"}, qtypes="current") # 当前队列
queue.put("spider", {"url": "page2"}, qtypes="temp") # 临时队列
queue.put("spider", {"url": "page3"}, qtypes="failure") # 失败队列
# 查看不同队列的大小
current_size = queue.size("spider", qtypes=("current",))
temp_size = queue.size("spider", qtypes=("temp",))
failure_size = queue.size("spider", qtypes=("failure",))
print(f"当前队列: {current_size}")
print(f"临时队列: {temp_size}")
print(f"失败队列: {failure_size}")
2. 队列翻转
# 手动翻转:将失败队列和临时队列合并到当前队列
queue.reverse("spider", qtypes=["temp", "failure"])
# 智能翻转:根据队列状态自动决定是否翻转
success = queue.smart_reverse("spider", status=0)
if success:
print("队列翻转成功")
else:
print("无需翻转")
3. 种子替换
# 替换种子数据
old_seed = {"url": "https://old.com", "page": 1}
new_seed = {"url": "https://new.com", "page": 1, "updated": True}
# 在所有队列类型中查找并替换
count = queue.replace("spider", (old_seed, new_seed))
print(f"替换了 {count} 个种子")
# 只在特定队列类型中替换
count = queue.replace("spider", (old_seed, new_seed), qtypes=["current"])
4. 队列合并
# 将多个队列合并到目标队列
queue.put("spider1", {"url": "page1"})
queue.put("spider2", {"url": "page2"})
# 合并 spider1 和 spider2 到 merged_spider
queue.merge("merged_spider", "spider1", "spider2")
# 检查合并结果
merged_size = queue.size("merged_spider")
print(f"合并后队列大小: {merged_size}")
状态管理
1. 初始化状态
# 本地队列使用环境变量存储状态
import os
queue = LocalQueue()
# 模拟爬虫初始化过程
spider_name = "test_spider"
# 获取初始化权限
permission = queue.command(spider_name, {"action": queue.COMMANDS.GET_PERMISSION})
print(f"初始化权限: {permission}")
# 设置初始化状态
queue.command(spider_name, {"action": queue.COMMANDS.SET_INIT})
# 检查初始化状态
is_init = queue.command(spider_name, {"action": queue.COMMANDS.IS_INIT})
print(f"是否已初始化: {is_init}")
# 设置记录信息
record = {
"total": 100,
"success": 50,
"start_time": "2024-01-01 10:00:00"
}
queue.command(spider_name, {
"action": queue.COMMANDS.SET_RECORD,
"record": record
})
# 获取记录信息
saved_record = queue.command(spider_name, {"action": queue.COMMANDS.GET_RECORD})
print(f"保存的记录: {saved_record}")
2. 等待机制
import threading
import time
def producer():
"""生产者:投放种子"""
time.sleep(2) # 模拟延迟
queue.put("test_spider", {"url": "delayed_page"})
queue.command("test_spider", {"action": queue.COMMANDS.SET_INIT})
def consumer():
"""消费者:等待并获取种子"""
# 等待初始化完成
queue.command("test_spider", {"action": queue.COMMANDS.WAIT_INIT})
# 获取种子
seed = queue.get("test_spider")
print(f"消费者获取到: {seed}")
# 启动生产者和消费者
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()
time.sleep(5) # 等待完成
实际应用示例
1. 简单爬虫
from bricks.spider import air
from bricks.lib.queues import LocalQueue
from bricks import Request
class SimpleLocalSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", LocalQueue())
kwargs.setdefault("queue_name", "simple_local")
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
"""生成种子"""
return [{"page": i} for i in range(1, 6)]
def make_request(self, context) -> Request:
"""构造请求"""
page = context.seeds["page"]
return Request(url=f"https://httpbin.org/json?page={page}")
def parse(self, context):
"""解析响应"""
data = context.response.json()
return [{"page": context.seeds["page"], "data": data}]
def item_pipeline(self, context):
"""处理数据"""
print(f"处理页面 {context.seeds['page']}: {len(context.items)} 条数据")
context.success()
# 运行爬虫
if __name__ == "__main__":
spider = SimpleLocalSpider()
result = spider.run()
print(f"爬取完成: {result}")
2. 多线程爬虫
class ConcurrentLocalSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", LocalQueue())
kwargs.setdefault("queue_name", "concurrent_local")
kwargs.setdefault("concurrency", 5) # 5个并发线程
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
"""生成大量种子"""
return [{"url": f"https://httpbin.org/delay/{i%3}"} for i in range(20)]
def make_request(self, context) -> Request:
return Request(url=context.seeds["url"])
def parse(self, context):
return [{"url": context.seeds["url"], "status": "success"}]
def item_pipeline(self, context):
print(f"线程处理: {context.seeds['url']}")
context.success()
# 运行并发爬虫
spider = ConcurrentLocalSpider()
spider.run()
3. 错误处理和重试
class RobustLocalSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", LocalQueue())
kwargs.setdefault("queue_name", "robust_local")
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
return [
{"url": "https://httpbin.org/status/200"}, # 成功
{"url": "https://httpbin.org/status/404"}, # 失败
{"url": "https://httpbin.org/status/500"}, # 服务器错误
]
def make_request(self, context) -> Request:
return Request(
url=context.seeds["url"],
max_retry=3 # 最多重试3次
)
def parse(self, context):
if context.response.status_code == 200:
return [{"url": context.seeds["url"], "status": "success"}]
else:
# 触发重试
raise Exception(f"HTTP {context.response.status_code}")
def item_pipeline(self, context):
print(f"成功处理: {context.items}")
context.success()
# 运行并查看重试机制
spider = RobustLocalSpider()
spider.run()
# 检查失败队列
failure_size = spider.task_queue.size(spider.queue_name, qtypes=("failure",))
print(f"失败队列大小: {failure_size}")
性能优化
1. 内存管理
class MemoryOptimizedSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", LocalQueue())
super().__init__(**kwargs)
def item_pipeline(self, context):
# 处理完数据后立即清理
context.items.clear()
context.success()
def cleanup_queue(self):
"""定期清理队列"""
# 清理已完成的临时数据
self.task_queue.clear(self.queue_name, qtypes=("temp",))
2. 批量处理
def batch_process_seeds():
"""批量处理种子"""
queue = LocalQueue()
# 批量投放
seeds = [{"id": i} for i in range(1000)]
queue.put("batch_spider", *seeds)
# 批量获取和处理
while not queue.is_empty("batch_spider"):
batch = queue.get("batch_spider", count=50) # 每次处理50个
if batch:
print(f"处理批次: {len(batch)} 个种子")
# 处理完成后移除
queue.remove("batch_spider", *batch)
batch_process_seeds()
调试和监控
1. 队列状态监控
def monitor_queue(queue, spider_name, interval=5):
"""监控队列状态"""
import time
while True:
stats = {
"current": queue.size(spider_name, qtypes=("current",)),
"temp": queue.size(spider_name, qtypes=("temp",)),
"failure": queue.size(spider_name, qtypes=("failure",)),
}
print(f"队列状态 - 当前: {stats['current']}, "
f"处理中: {stats['temp']}, 失败: {stats['failure']}")
if sum(stats.values()) == 0:
print("队列已空,监控结束")
break
time.sleep(interval)
# 在另一个线程中监控
import threading
queue = LocalQueue()
threading.Thread(target=monitor_queue, args=(queue, "test_spider"), daemon=True).start()
2. 调试工具
def debug_queue_state(queue, spider_name):
"""调试队列状态"""
print(f"=== 队列调试信息: {spider_name} ===")
# 检查各子队列
qtypes = ["current", "temp", "failure"]
for qtype in qtypes:
size = queue.size(spider_name, qtypes=(qtype,))
print(f"{qtype} 队列大小: {size}")
# 检查环境变量中的状态
import os
record_key = queue.name2key(spider_name, "record")
record = os.environ.get(record_key)
if record:
print(f"记录信息: {record}")
print("=" * 40)
# 使用调试工具
debug_queue_state(queue, "test_spider")
LocalQueue 是 Bricks 框架中最简单易用的队列实现,非常适合单机开发和小规模爬取任务。