bricks
开发指南
2.7 任务队列
2.7.2 本地队列

2.7.2 本地队列

LocalQueue 是基于内存的本地队列实现,适用于单机爬取和开发测试场景。

特性

1. 核心特性

  • 内存存储:所有数据存储在内存中,访问速度快
  • 线程安全:支持多线程并发访问
  • 轻量级:无需外部依赖,开箱即用
  • 状态持久化:支持基于环境变量的简单状态持久化

2. 适用场景

  • 单机爬虫开发和测试
  • 小规模数据爬取
  • 快速原型开发
  • 不需要分布式的场景

基本使用

1. 创建和配置

from bricks.lib.queues import LocalQueue
from bricks.spider import air
 
# 创建本地队列
queue = LocalQueue()
 
# 在爬虫中使用
class MySpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", LocalQueue())
        kwargs.setdefault("queue_name", "my_local_spider")
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        return [{"page": i} for i in range(1, 6)]

2. 基础操作

from bricks.lib.queues import LocalQueue
 
queue = LocalQueue()
 
# 投放种子
queue.put("test_spider", {"url": "https://example.com/page1"})
queue.put("test_spider", {"url": "https://example.com/page2"})
 
# 批量投放
seeds = [{"url": f"https://example.com/page{i}"} for i in range(3, 6)]
queue.put("test_spider", *seeds)
 
# 获取种子
seed = queue.get("test_spider")
print(f"获取到种子: {seed}")
 
# 获取多个种子
seeds = queue.get("test_spider", count=3)
print(f"获取到 {len(seeds)} 个种子")
 
# 查看队列大小
size = queue.size("test_spider")
print(f"队列大小: {size}")
 
# 移除种子(标记完成)
if seed:
    queue.remove("test_spider", *seed)

高级功能

1. 队列类型操作

queue = LocalQueue()
 
# 投放到不同类型的队列
queue.put("spider", {"url": "page1"}, qtypes="current")    # 当前队列
queue.put("spider", {"url": "page2"}, qtypes="temp")       # 临时队列
queue.put("spider", {"url": "page3"}, qtypes="failure")    # 失败队列
 
# 查看不同队列的大小
current_size = queue.size("spider", qtypes=("current",))
temp_size = queue.size("spider", qtypes=("temp",))
failure_size = queue.size("spider", qtypes=("failure",))
 
print(f"当前队列: {current_size}")
print(f"临时队列: {temp_size}")
print(f"失败队列: {failure_size}")

2. 队列翻转

# 手动翻转:将失败队列和临时队列合并到当前队列
queue.reverse("spider", qtypes=["temp", "failure"])
 
# 智能翻转:根据队列状态自动决定是否翻转
success = queue.smart_reverse("spider", status=0)
if success:
    print("队列翻转成功")
else:
    print("无需翻转")

3. 种子替换

# 替换种子数据
old_seed = {"url": "https://old.com", "page": 1}
new_seed = {"url": "https://new.com", "page": 1, "updated": True}
 
# 在所有队列类型中查找并替换
count = queue.replace("spider", (old_seed, new_seed))
print(f"替换了 {count} 个种子")
 
# 只在特定队列类型中替换
count = queue.replace("spider", (old_seed, new_seed), qtypes=["current"])

4. 队列合并

# 将多个队列合并到目标队列
queue.put("spider1", {"url": "page1"})
queue.put("spider2", {"url": "page2"})
 
# 合并 spider1 和 spider2 到 merged_spider
queue.merge("merged_spider", "spider1", "spider2")
 
# 检查合并结果
merged_size = queue.size("merged_spider")
print(f"合并后队列大小: {merged_size}")

状态管理

1. 初始化状态

# 本地队列使用环境变量存储状态
import os
 
queue = LocalQueue()
 
# 模拟爬虫初始化过程
spider_name = "test_spider"
 
# 获取初始化权限
permission = queue.command(spider_name, {"action": queue.COMMANDS.GET_PERMISSION})
print(f"初始化权限: {permission}")
 
# 设置初始化状态
queue.command(spider_name, {"action": queue.COMMANDS.SET_INIT})
 
# 检查初始化状态
is_init = queue.command(spider_name, {"action": queue.COMMANDS.IS_INIT})
print(f"是否已初始化: {is_init}")
 
# 设置记录信息
record = {
    "total": 100,
    "success": 50,
    "start_time": "2024-01-01 10:00:00"
}
queue.command(spider_name, {
    "action": queue.COMMANDS.SET_RECORD,
    "record": record
})
 
# 获取记录信息
saved_record = queue.command(spider_name, {"action": queue.COMMANDS.GET_RECORD})
print(f"保存的记录: {saved_record}")

2. 等待机制

import threading
import time
 
def producer():
    """生产者:投放种子"""
    time.sleep(2)  # 模拟延迟
    queue.put("test_spider", {"url": "delayed_page"})
    queue.command("test_spider", {"action": queue.COMMANDS.SET_INIT})
 
def consumer():
    """消费者:等待并获取种子"""
    # 等待初始化完成
    queue.command("test_spider", {"action": queue.COMMANDS.WAIT_INIT})
    
    # 获取种子
    seed = queue.get("test_spider")
    print(f"消费者获取到: {seed}")
 
# 启动生产者和消费者
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()
 
time.sleep(5)  # 等待完成

实际应用示例

1. 简单爬虫

from bricks.spider import air
from bricks.lib.queues import LocalQueue
from bricks import Request
 
class SimpleLocalSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", LocalQueue())
        kwargs.setdefault("queue_name", "simple_local")
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        """生成种子"""
        return [{"page": i} for i in range(1, 6)]
    
    def make_request(self, context) -> Request:
        """构造请求"""
        page = context.seeds["page"]
        return Request(url=f"https://httpbin.org/json?page={page}")
    
    def parse(self, context):
        """解析响应"""
        data = context.response.json()
        return [{"page": context.seeds["page"], "data": data}]
    
    def item_pipeline(self, context):
        """处理数据"""
        print(f"处理页面 {context.seeds['page']}: {len(context.items)} 条数据")
        context.success()
 
# 运行爬虫
if __name__ == "__main__":
    spider = SimpleLocalSpider()
    result = spider.run()
    print(f"爬取完成: {result}")

2. 多线程爬虫

class ConcurrentLocalSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", LocalQueue())
        kwargs.setdefault("queue_name", "concurrent_local")
        kwargs.setdefault("concurrency", 5)  # 5个并发线程
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        """生成大量种子"""
        return [{"url": f"https://httpbin.org/delay/{i%3}"} for i in range(20)]
    
    def make_request(self, context) -> Request:
        return Request(url=context.seeds["url"])
    
    def parse(self, context):
        return [{"url": context.seeds["url"], "status": "success"}]
    
    def item_pipeline(self, context):
        print(f"线程处理: {context.seeds['url']}")
        context.success()
 
# 运行并发爬虫
spider = ConcurrentLocalSpider()
spider.run()

3. 错误处理和重试

class RobustLocalSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", LocalQueue())
        kwargs.setdefault("queue_name", "robust_local")
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        return [
            {"url": "https://httpbin.org/status/200"},  # 成功
            {"url": "https://httpbin.org/status/404"},  # 失败
            {"url": "https://httpbin.org/status/500"},  # 服务器错误
        ]
    
    def make_request(self, context) -> Request:
        return Request(
            url=context.seeds["url"],
            max_retry=3  # 最多重试3次
        )
    
    def parse(self, context):
        if context.response.status_code == 200:
            return [{"url": context.seeds["url"], "status": "success"}]
        else:
            # 触发重试
            raise Exception(f"HTTP {context.response.status_code}")
    
    def item_pipeline(self, context):
        print(f"成功处理: {context.items}")
        context.success()
 
# 运行并查看重试机制
spider = RobustLocalSpider()
spider.run()
 
# 检查失败队列
failure_size = spider.task_queue.size(spider.queue_name, qtypes=("failure",))
print(f"失败队列大小: {failure_size}")

性能优化

1. 内存管理

class MemoryOptimizedSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", LocalQueue())
        super().__init__(**kwargs)
    
    def item_pipeline(self, context):
        # 处理完数据后立即清理
        context.items.clear()
        context.success()
    
    def cleanup_queue(self):
        """定期清理队列"""
        # 清理已完成的临时数据
        self.task_queue.clear(self.queue_name, qtypes=("temp",))

2. 批量处理

def batch_process_seeds():
    """批量处理种子"""
    queue = LocalQueue()
    
    # 批量投放
    seeds = [{"id": i} for i in range(1000)]
    queue.put("batch_spider", *seeds)
    
    # 批量获取和处理
    while not queue.is_empty("batch_spider"):
        batch = queue.get("batch_spider", count=50)  # 每次处理50个
        if batch:
            print(f"处理批次: {len(batch)} 个种子")
            # 处理完成后移除
            queue.remove("batch_spider", *batch)
 
batch_process_seeds()

调试和监控

1. 队列状态监控

def monitor_queue(queue, spider_name, interval=5):
    """监控队列状态"""
    import time
    
    while True:
        stats = {
            "current": queue.size(spider_name, qtypes=("current",)),
            "temp": queue.size(spider_name, qtypes=("temp",)),
            "failure": queue.size(spider_name, qtypes=("failure",)),
        }
        
        print(f"队列状态 - 当前: {stats['current']}, "
              f"处理中: {stats['temp']}, 失败: {stats['failure']}")
        
        if sum(stats.values()) == 0:
            print("队列已空,监控结束")
            break
        
        time.sleep(interval)
 
# 在另一个线程中监控
import threading
queue = LocalQueue()
threading.Thread(target=monitor_queue, args=(queue, "test_spider"), daemon=True).start()

2. 调试工具

def debug_queue_state(queue, spider_name):
    """调试队列状态"""
    print(f"=== 队列调试信息: {spider_name} ===")
    
    # 检查各子队列
    qtypes = ["current", "temp", "failure"]
    for qtype in qtypes:
        size = queue.size(spider_name, qtypes=(qtype,))
        print(f"{qtype} 队列大小: {size}")
    
    # 检查环境变量中的状态
    import os
    record_key = queue.name2key(spider_name, "record")
    record = os.environ.get(record_key)
    if record:
        print(f"记录信息: {record}")
    
    print("=" * 40)
 
# 使用调试工具
debug_queue_state(queue, "test_spider")

LocalQueue 是 Bricks 框架中最简单易用的队列实现,非常适合单机开发和小规模爬取任务。