bricks
开发指南
2.7 任务队列
2.7.3 Redis队列

2.7.3 Redis队列

RedisQueue 是基于 Redis 的分布式队列实现,支持多机器协同爬取,适用于大规模生产环境。

特性

1. 核心特性

  • 分布式支持:多台机器可以共享同一个队列
  • 持久化存储:数据存储在 Redis 中,重启不丢失
  • 高性能:基于 Redis 的高性能内存数据库
  • 原子操作:使用 Lua 脚本保证操作的原子性
  • 智能调度:支持智能队列翻转和负载均衡

2. 数据结构支持

  • List:有序队列,支持 FIFO/LIFO
  • Set:无序集合,自动去重
  • ZSet:有序集合,支持优先级

基本配置

1. 安装和连接

from bricks.lib.queues import RedisQueue
 
# 基本连接
queue = RedisQueue(
    host="127.0.0.1",
    port=6379,
    password=None,
    database=0
)
 
# 高级配置
queue = RedisQueue(
    host="redis.example.com",
    port=6379,
    password="your_password",
    database=0,
    genre="set",                    # 队列类型:list/set/zset
    decode_responses=True,          # 自动解码
    socket_timeout=30,              # 连接超时
    socket_connect_timeout=30,      # 连接建立超时
    retry_on_timeout=True,          # 超时重试
    health_check_interval=30,       # 健康检查间隔
    max_connections=20              # 最大连接数
)

2. 在爬虫中使用

from bricks.spider import air
from bricks.lib.queues import RedisQueue
 
class DistributedSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="localhost",
            port=6379,
            database=0,
            genre="set"  # 使用 set 类型自动去重
        ))
        kwargs.setdefault("queue_name", "distributed_spider")
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        return [{"page": i} for i in range(1, 101)]

队列类型详解

1. List 队列

# 使用 List 类型(FIFO 队列)
list_queue = RedisQueue(genre="list")
 
# 投放种子(按顺序处理)
list_queue.put("spider", {"url": "page1"})
list_queue.put("spider", {"url": "page2"})
list_queue.put("spider", {"url": "page3"})
 
# 获取种子(先进先出)
seed = list_queue.get("spider")  # 获取 page1

2. Set 队列

# 使用 Set 类型(自动去重)
set_queue = RedisQueue(genre="set")
 
# 投放重复种子
set_queue.put("spider", {"url": "page1"})
set_queue.put("spider", {"url": "page1"})  # 重复,会被去重
set_queue.put("spider", {"url": "page2"})
 
# 队列中只有 page1 和 page2
size = set_queue.size("spider")  # 返回 2

3. ZSet 队列

# 使用 ZSet 类型(支持优先级)
zset_queue = RedisQueue(genre="zset")
 
# 投放带优先级的种子
zset_queue.put("spider", {"url": "low_priority", "priority": 1})
zset_queue.put("spider", {"url": "high_priority", "priority": 10})
zset_queue.put("spider", {"url": "medium_priority", "priority": 5})
 
# 获取种子(按优先级顺序)
seed = zset_queue.get("spider")  # 获取 high_priority

分布式特性

1. 多机器协同

# 机器 A:主要负责初始化
class MasterSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="redis.cluster.com",
            database=0
        ))
        kwargs.setdefault("queue_name", "cluster_spider")
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        # 生成大量种子
        return [{"page": i} for i in range(1, 10000)]
 
# 机器 B、C、D:工作节点
class WorkerSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="redis.cluster.com",
            database=0
        ))
        kwargs.setdefault("queue_name", "cluster_spider")
        # 不执行初始化,只处理任务
        kwargs.setdefault("forever", True)
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        # 工作节点不生成种子
        return []
 
# 启动方式
# 机器 A: python spider.py --mode=master
# 机器 B: python spider.py --mode=worker
# 机器 C: python spider.py --mode=worker
# 机器 D: python spider.py --mode=worker

2. 智能负载均衡

class SmartDistributedSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="redis.cluster.com",
            database=0
        ))
        super().__init__(**kwargs)
    
    def run_spider(self):
        """重写运行方法,添加智能调度"""
        while True:
            try:
                # 尝试获取种子
                seed = self.task_queue.get(self.queue_name)
                if seed:
                    self.process_seed(seed)
                else:
                    # 没有种子时,尝试智能翻转
                    if self.task_queue.smart_reverse(self.queue_name):
                        print("队列翻转成功,继续处理")
                        continue
                    else:
                        print("暂无任务,等待中...")
                        time.sleep(5)
            except KeyboardInterrupt:
                break

高级功能

1. 发布订阅机制

import json
import threading
 
class PubSubSpider(air.Spider):
    def __init__(self, **kwargs):
        self.redis_queue = RedisQueue(host="localhost", database=0)
        kwargs.setdefault("task_queue", self.redis_queue)
        super().__init__(**kwargs)
    
    def start_subscriber(self):
        """启动订阅者"""
        def message_handler(message):
            """处理订阅消息"""
            data = json.loads(message["data"])
            action = data.get("action")
            
            if action == "collect-status":
                # 上报当前状态
                status = self.get_current_status()
                key = data["key"]
                self.redis_queue.redis_db.hset(key, {
                    "machine_id": "worker_001",
                    "status": status
                })
        
        # 订阅频道
        pubsub = self.redis_queue.redis_db.pubsub()
        pubsub.subscribe(**{f"{self.queue_name}-subscribe": message_handler})
        
        # 在后台线程中运行
        thread = pubsub.run_in_thread(sleep_time=0.001, daemon=True)
        return thread
    
    def get_current_status(self):
        """获取当前机器状态"""
        return {
            "running_tasks": self.dispatcher.running,
            "queue_size": self.task_queue.size(self.queue_name),
            "timestamp": time.time()
        }

2. 分布式锁

class DistributedLockSpider(air.Spider):
    def __init__(self, **kwargs):
        self.redis_queue = RedisQueue(host="localhost", database=0)
        kwargs.setdefault("task_queue", self.redis_queue)
        super().__init__(**kwargs)
    
    def acquire_lock(self, lock_name, timeout=10):
        """获取分布式锁"""
        import uuid
        identifier = str(uuid.uuid4())
        
        # 尝试获取锁
        if self.redis_queue.redis_db.set(
            f"lock:{lock_name}", 
            identifier, 
            nx=True, 
            ex=timeout
        ):
            return identifier
        return None
    
    def release_lock(self, lock_name, identifier):
        """释放分布式锁"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return self.redis_queue.redis_db.eval(
            lua_script, 1, f"lock:{lock_name}", identifier
        )
    
    def critical_section(self):
        """需要同步的关键代码段"""
        lock_id = self.acquire_lock("critical_operation")
        if lock_id:
            try:
                # 执行关键操作
                print("执行关键操作...")
                time.sleep(2)
            finally:
                self.release_lock("critical_operation", lock_id)
        else:
            print("获取锁失败,跳过操作")

3. 队列监控

class MonitoredRedisSpider(air.Spider):
    def __init__(self, **kwargs):
        self.redis_queue = RedisQueue(host="localhost", database=0)
        kwargs.setdefault("task_queue", self.redis_queue)
        super().__init__(**kwargs)
    
    def get_cluster_stats(self):
        """获取集群统计信息"""
        stats = {}
        
        # 获取队列大小
        stats["queue_sizes"] = {
            "current": self.redis_queue.size(self.queue_name, qtypes=("current",)),
            "temp": self.redis_queue.size(self.queue_name, qtypes=("temp",)),
            "failure": self.redis_queue.size(self.queue_name, qtypes=("failure",))
        }
        
        # 获取 Redis 信息
        redis_info = self.redis_queue.redis_db.info()
        stats["redis_info"] = {
            "used_memory": redis_info.get("used_memory_human"),
            "connected_clients": redis_info.get("connected_clients"),
            "total_commands_processed": redis_info.get("total_commands_processed")
        }
        
        return stats
    
    def monitor_performance(self, interval=60):
        """性能监控"""
        def monitor():
            while True:
                stats = self.get_cluster_stats()
                print(f"集群状态: {stats}")
                
                # 检查异常情况
                if stats["queue_sizes"]["failure"] > 1000:
                    print("警告:失败队列积压严重")
                
                if stats["redis_info"]["connected_clients"] > 100:
                    print("警告:Redis 连接数过多")
                
                time.sleep(interval)
        
        threading.Thread(target=monitor, daemon=True).start()

性能优化

1. 连接池优化

# 优化连接池配置
optimized_queue = RedisQueue(
    host="localhost",
    port=6379,
    database=0,
    max_connections=50,                    # 增加最大连接数
    connection_pool_kwargs={
        "max_connections": 50,
        "retry_on_timeout": True,
        "socket_keepalive": True,          # 启用 TCP keepalive
        "socket_keepalive_options": {},
        "health_check_interval": 30        # 健康检查间隔
    }
)

2. 批量操作

class BatchRedisSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="localhost",
            database=0,
            genre="set"
        ))
        super().__init__(**kwargs)
    
    def batch_put_seeds(self, seeds, batch_size=1000):
        """批量投放种子"""
        for i in range(0, len(seeds), batch_size):
            batch = seeds[i:i + batch_size]
            self.task_queue.put(self.queue_name, *batch)
            print(f"投放批次 {i//batch_size + 1}: {len(batch)} 个种子")
    
    def batch_get_seeds(self, count=10):
        """批量获取种子"""
        return self.task_queue.get(self.queue_name, count=count)

3. 内存优化

class MemoryOptimizedRedisSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="localhost",
            database=0
        ))
        super().__init__(**kwargs)
    
    def compress_large_data(self, data):
        """压缩大数据"""
        import gzip
        import json
        
        json_str = json.dumps(data)
        if len(json_str) > 1024:  # 大于 1KB 的数据进行压缩
            compressed = gzip.compress(json_str.encode())
            return {
                "compressed": True,
                "data": compressed.hex()
            }
        return data
    
    def decompress_data(self, data):
        """解压数据"""
        if isinstance(data, dict) and data.get("compressed"):
            import gzip
            import json
            
            compressed_data = bytes.fromhex(data["data"])
            json_str = gzip.decompress(compressed_data).decode()
            return json.loads(json_str)
        return data

故障处理

1. 连接重试

class ResilientRedisSpider(air.Spider):
    def __init__(self, **kwargs):
        self.redis_queue = RedisQueue(
            host="localhost",
            database=0,
            retry_on_timeout=True,
            socket_timeout=30
        )
        kwargs.setdefault("task_queue", self.redis_queue)
        super().__init__(**kwargs)
    
    def safe_redis_operation(self, operation, *args, **kwargs):
        """安全的 Redis 操作"""
        max_retries = 3
        for attempt in range(max_retries):
            try:
                return operation(*args, **kwargs)
            except Exception as e:
                print(f"Redis 操作失败 (尝试 {attempt + 1}/{max_retries}): {e}")
                if attempt == max_retries - 1:
                    raise
                time.sleep(2 ** attempt)  # 指数退避

2. 数据备份

def backup_redis_queue(source_queue, backup_file):
    """备份 Redis 队列数据"""
    import json
    
    backup_data = {}
    
    # 备份所有队列类型
    qtypes = ["current", "temp", "failure"]
    for qtype in qtypes:
        key = source_queue.name2key("spider_name", qtype)
        
        if source_queue.genre == "list":
            data = source_queue.redis_db.lrange(key, 0, -1)
        elif source_queue.genre == "set":
            data = list(source_queue.redis_db.smembers(key))
        elif source_queue.genre == "zset":
            data = source_queue.redis_db.zrange(key, 0, -1, withscores=True)
        
        backup_data[qtype] = data
    
    # 保存到文件
    with open(backup_file, 'w') as f:
        json.dump(backup_data, f, default=str)
    
    print(f"队列数据已备份到: {backup_file}")
 
def restore_redis_queue(target_queue, backup_file):
    """恢复 Redis 队列数据"""
    import json
    
    with open(backup_file, 'r') as f:
        backup_data = json.load(f)
    
    for qtype, data in backup_data.items():
        if data:
            target_queue.put("spider_name", *data, qtypes=qtype)
    
    print(f"队列数据已从 {backup_file} 恢复")

RedisQueue 是 Bricks 框架中功能最强大的队列实现,特别适合大规模分布式爬取场景。