2.7.3 Redis队列
RedisQueue
是基于 Redis 的分布式队列实现,支持多机器协同爬取,适用于大规模生产环境。
特性
1. 核心特性
- 分布式支持:多台机器可以共享同一个队列
- 持久化存储:数据存储在 Redis 中,重启不丢失
- 高性能:基于 Redis 的高性能内存数据库
- 原子操作:使用 Lua 脚本保证操作的原子性
- 智能调度:支持智能队列翻转和负载均衡
2. 数据结构支持
- List:有序队列,支持 FIFO/LIFO
- Set:无序集合,自动去重
- ZSet:有序集合,支持优先级
基本配置
1. 安装和连接
from bricks.lib.queues import RedisQueue
# 基本连接
queue = RedisQueue(
host="127.0.0.1",
port=6379,
password=None,
database=0
)
# 高级配置
queue = RedisQueue(
host="redis.example.com",
port=6379,
password="your_password",
database=0,
genre="set", # 队列类型:list/set/zset
decode_responses=True, # 自动解码
socket_timeout=30, # 连接超时
socket_connect_timeout=30, # 连接建立超时
retry_on_timeout=True, # 超时重试
health_check_interval=30, # 健康检查间隔
max_connections=20 # 最大连接数
)
2. 在爬虫中使用
from bricks.spider import air
from bricks.lib.queues import RedisQueue
class DistributedSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="localhost",
port=6379,
database=0,
genre="set" # 使用 set 类型自动去重
))
kwargs.setdefault("queue_name", "distributed_spider")
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
return [{"page": i} for i in range(1, 101)]
队列类型详解
1. List 队列
# 使用 List 类型(FIFO 队列)
list_queue = RedisQueue(genre="list")
# 投放种子(按顺序处理)
list_queue.put("spider", {"url": "page1"})
list_queue.put("spider", {"url": "page2"})
list_queue.put("spider", {"url": "page3"})
# 获取种子(先进先出)
seed = list_queue.get("spider") # 获取 page1
2. Set 队列
# 使用 Set 类型(自动去重)
set_queue = RedisQueue(genre="set")
# 投放重复种子
set_queue.put("spider", {"url": "page1"})
set_queue.put("spider", {"url": "page1"}) # 重复,会被去重
set_queue.put("spider", {"url": "page2"})
# 队列中只有 page1 和 page2
size = set_queue.size("spider") # 返回 2
3. ZSet 队列
# 使用 ZSet 类型(支持优先级)
zset_queue = RedisQueue(genre="zset")
# 投放带优先级的种子
zset_queue.put("spider", {"url": "low_priority", "priority": 1})
zset_queue.put("spider", {"url": "high_priority", "priority": 10})
zset_queue.put("spider", {"url": "medium_priority", "priority": 5})
# 获取种子(按优先级顺序)
seed = zset_queue.get("spider") # 获取 high_priority
分布式特性
1. 多机器协同
# 机器 A:主要负责初始化
class MasterSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="redis.cluster.com",
database=0
))
kwargs.setdefault("queue_name", "cluster_spider")
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
# 生成大量种子
return [{"page": i} for i in range(1, 10000)]
# 机器 B、C、D:工作节点
class WorkerSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="redis.cluster.com",
database=0
))
kwargs.setdefault("queue_name", "cluster_spider")
# 不执行初始化,只处理任务
kwargs.setdefault("forever", True)
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
# 工作节点不生成种子
return []
# 启动方式
# 机器 A: python spider.py --mode=master
# 机器 B: python spider.py --mode=worker
# 机器 C: python spider.py --mode=worker
# 机器 D: python spider.py --mode=worker
2. 智能负载均衡
class SmartDistributedSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="redis.cluster.com",
database=0
))
super().__init__(**kwargs)
def run_spider(self):
"""重写运行方法,添加智能调度"""
while True:
try:
# 尝试获取种子
seed = self.task_queue.get(self.queue_name)
if seed:
self.process_seed(seed)
else:
# 没有种子时,尝试智能翻转
if self.task_queue.smart_reverse(self.queue_name):
print("队列翻转成功,继续处理")
continue
else:
print("暂无任务,等待中...")
time.sleep(5)
except KeyboardInterrupt:
break
高级功能
1. 发布订阅机制
import json
import threading
class PubSubSpider(air.Spider):
def __init__(self, **kwargs):
self.redis_queue = RedisQueue(host="localhost", database=0)
kwargs.setdefault("task_queue", self.redis_queue)
super().__init__(**kwargs)
def start_subscriber(self):
"""启动订阅者"""
def message_handler(message):
"""处理订阅消息"""
data = json.loads(message["data"])
action = data.get("action")
if action == "collect-status":
# 上报当前状态
status = self.get_current_status()
key = data["key"]
self.redis_queue.redis_db.hset(key, {
"machine_id": "worker_001",
"status": status
})
# 订阅频道
pubsub = self.redis_queue.redis_db.pubsub()
pubsub.subscribe(**{f"{self.queue_name}-subscribe": message_handler})
# 在后台线程中运行
thread = pubsub.run_in_thread(sleep_time=0.001, daemon=True)
return thread
def get_current_status(self):
"""获取当前机器状态"""
return {
"running_tasks": self.dispatcher.running,
"queue_size": self.task_queue.size(self.queue_name),
"timestamp": time.time()
}
2. 分布式锁
class DistributedLockSpider(air.Spider):
def __init__(self, **kwargs):
self.redis_queue = RedisQueue(host="localhost", database=0)
kwargs.setdefault("task_queue", self.redis_queue)
super().__init__(**kwargs)
def acquire_lock(self, lock_name, timeout=10):
"""获取分布式锁"""
import uuid
identifier = str(uuid.uuid4())
# 尝试获取锁
if self.redis_queue.redis_db.set(
f"lock:{lock_name}",
identifier,
nx=True,
ex=timeout
):
return identifier
return None
def release_lock(self, lock_name, identifier):
"""释放分布式锁"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return self.redis_queue.redis_db.eval(
lua_script, 1, f"lock:{lock_name}", identifier
)
def critical_section(self):
"""需要同步的关键代码段"""
lock_id = self.acquire_lock("critical_operation")
if lock_id:
try:
# 执行关键操作
print("执行关键操作...")
time.sleep(2)
finally:
self.release_lock("critical_operation", lock_id)
else:
print("获取锁失败,跳过操作")
3. 队列监控
class MonitoredRedisSpider(air.Spider):
def __init__(self, **kwargs):
self.redis_queue = RedisQueue(host="localhost", database=0)
kwargs.setdefault("task_queue", self.redis_queue)
super().__init__(**kwargs)
def get_cluster_stats(self):
"""获取集群统计信息"""
stats = {}
# 获取队列大小
stats["queue_sizes"] = {
"current": self.redis_queue.size(self.queue_name, qtypes=("current",)),
"temp": self.redis_queue.size(self.queue_name, qtypes=("temp",)),
"failure": self.redis_queue.size(self.queue_name, qtypes=("failure",))
}
# 获取 Redis 信息
redis_info = self.redis_queue.redis_db.info()
stats["redis_info"] = {
"used_memory": redis_info.get("used_memory_human"),
"connected_clients": redis_info.get("connected_clients"),
"total_commands_processed": redis_info.get("total_commands_processed")
}
return stats
def monitor_performance(self, interval=60):
"""性能监控"""
def monitor():
while True:
stats = self.get_cluster_stats()
print(f"集群状态: {stats}")
# 检查异常情况
if stats["queue_sizes"]["failure"] > 1000:
print("警告:失败队列积压严重")
if stats["redis_info"]["connected_clients"] > 100:
print("警告:Redis 连接数过多")
time.sleep(interval)
threading.Thread(target=monitor, daemon=True).start()
性能优化
1. 连接池优化
# 优化连接池配置
optimized_queue = RedisQueue(
host="localhost",
port=6379,
database=0,
max_connections=50, # 增加最大连接数
connection_pool_kwargs={
"max_connections": 50,
"retry_on_timeout": True,
"socket_keepalive": True, # 启用 TCP keepalive
"socket_keepalive_options": {},
"health_check_interval": 30 # 健康检查间隔
}
)
2. 批量操作
class BatchRedisSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="localhost",
database=0,
genre="set"
))
super().__init__(**kwargs)
def batch_put_seeds(self, seeds, batch_size=1000):
"""批量投放种子"""
for i in range(0, len(seeds), batch_size):
batch = seeds[i:i + batch_size]
self.task_queue.put(self.queue_name, *batch)
print(f"投放批次 {i//batch_size + 1}: {len(batch)} 个种子")
def batch_get_seeds(self, count=10):
"""批量获取种子"""
return self.task_queue.get(self.queue_name, count=count)
3. 内存优化
class MemoryOptimizedRedisSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="localhost",
database=0
))
super().__init__(**kwargs)
def compress_large_data(self, data):
"""压缩大数据"""
import gzip
import json
json_str = json.dumps(data)
if len(json_str) > 1024: # 大于 1KB 的数据进行压缩
compressed = gzip.compress(json_str.encode())
return {
"compressed": True,
"data": compressed.hex()
}
return data
def decompress_data(self, data):
"""解压数据"""
if isinstance(data, dict) and data.get("compressed"):
import gzip
import json
compressed_data = bytes.fromhex(data["data"])
json_str = gzip.decompress(compressed_data).decode()
return json.loads(json_str)
return data
故障处理
1. 连接重试
class ResilientRedisSpider(air.Spider):
def __init__(self, **kwargs):
self.redis_queue = RedisQueue(
host="localhost",
database=0,
retry_on_timeout=True,
socket_timeout=30
)
kwargs.setdefault("task_queue", self.redis_queue)
super().__init__(**kwargs)
def safe_redis_operation(self, operation, *args, **kwargs):
"""安全的 Redis 操作"""
max_retries = 3
for attempt in range(max_retries):
try:
return operation(*args, **kwargs)
except Exception as e:
print(f"Redis 操作失败 (尝试 {attempt + 1}/{max_retries}): {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
2. 数据备份
def backup_redis_queue(source_queue, backup_file):
"""备份 Redis 队列数据"""
import json
backup_data = {}
# 备份所有队列类型
qtypes = ["current", "temp", "failure"]
for qtype in qtypes:
key = source_queue.name2key("spider_name", qtype)
if source_queue.genre == "list":
data = source_queue.redis_db.lrange(key, 0, -1)
elif source_queue.genre == "set":
data = list(source_queue.redis_db.smembers(key))
elif source_queue.genre == "zset":
data = source_queue.redis_db.zrange(key, 0, -1, withscores=True)
backup_data[qtype] = data
# 保存到文件
with open(backup_file, 'w') as f:
json.dump(backup_data, f, default=str)
print(f"队列数据已备份到: {backup_file}")
def restore_redis_queue(target_queue, backup_file):
"""恢复 Redis 队列数据"""
import json
with open(backup_file, 'r') as f:
backup_data = json.load(f)
for qtype, data in backup_data.items():
if data:
target_queue.put("spider_name", *data, qtypes=qtype)
print(f"队列数据已从 {backup_file} 恢复")
RedisQueue 是 Bricks 框架中功能最强大的队列实现,特别适合大规模分布式爬取场景。