2.7.5 分布式爬取
分布式爬取是 Bricks 框架的核心特性之一,通过 Redis 队列实现多机器协同工作,大幅提升爬取效率。
分布式架构
1. 架构概览
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Master │ │ Worker1 │ │ Worker2 │
│ (初始化) │ │ (工作节点) │ │ (工作节点) │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌─────────────┐
│ Redis Queue │
│ (任务队列) │
└─────────────┘
2. 角色分工
角色 | 职责 | 数量 |
---|---|---|
Master | 生成初始种子,监控整体进度 | 1个 |
Worker | 处理任务,执行爬取逻辑 | 多个 |
Redis | 存储任务队列,协调各节点 | 1个集群 |
基础分布式爬虫
1. Master 节点
from bricks.spider import air
from bricks.lib.queues import RedisQueue
from bricks import Request
import time
class MasterSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="redis.cluster.com",
port=6379,
database=0,
password="your_password"
))
kwargs.setdefault("queue_name", "distributed_news_spider")
kwargs.setdefault("concurrency", 1) # Master 只负责初始化
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
"""生成大量种子"""
print("Master: 开始生成种子...")
# 生成分页种子
seeds = []
for category in ["tech", "business", "sports", "entertainment"]:
for page in range(1, 101): # 每个分类100页
seeds.append({
"category": category,
"page": page,
"url": f"https://news.example.com/{category}?page={page}"
})
print(f"Master: 生成了 {len(seeds)} 个种子")
return seeds
def make_request(self, context) -> Request:
"""Master 通常不处理请求,只负责初始化"""
return None
def parse(self, context):
"""Master 不解析数据"""
return []
def item_pipeline(self, context):
"""Master 不处理数据"""
context.success()
# 启动 Master
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "master":
spider = MasterSpider()
result = spider.run_init() # 只运行初始化
print(f"Master 初始化完成: {result}")
2. Worker 节点
class WorkerSpider(air.Spider):
def __init__(self, worker_id="worker_001", **kwargs):
self.worker_id = worker_id
kwargs.setdefault("task_queue", RedisQueue(
host="redis.cluster.com",
port=6379,
database=0,
password="your_password"
))
kwargs.setdefault("queue_name", "distributed_news_spider")
kwargs.setdefault("concurrency", 10) # Worker 高并发处理
kwargs.setdefault("forever", True) # 持续运行
super().__init__(**kwargs)
def make_seeds(self, context, **kwargs):
"""Worker 不生成种子"""
return []
def make_request(self, context) -> Request:
"""构造请求"""
return Request(
url=context.seeds["url"],
headers={
"User-Agent": f"Worker-{self.worker_id}",
"X-Worker-ID": self.worker_id
}
)
def parse(self, context):
"""解析新闻列表"""
response = context.response
# 解析新闻链接
news_links = response.xpath("//a[@class='news-link']/@href").getall()
# 生成详情页种子
detail_seeds = []
for link in news_links:
detail_seeds.append({
"type": "detail",
"url": link,
"category": context.seeds["category"],
"source_page": context.seeds["page"]
})
# 提交详情页任务
if detail_seeds:
context.submit(*detail_seeds)
return [{"type": "list", "count": len(news_links)}]
def item_pipeline(self, context):
"""处理数据"""
print(f"Worker-{self.worker_id}: 处理了 {context.items}")
context.success()
# 启动 Worker
if __name__ == "__main__":
import sys
worker_id = sys.argv[2] if len(sys.argv) > 2 else "worker_001"
if len(sys.argv) > 1 and sys.argv[1] == "worker":
spider = WorkerSpider(worker_id=worker_id)
spider.run_spider() # 只运行爬虫逻辑
高级分布式特性
1. 智能负载均衡
class SmartWorkerSpider(air.Spider):
def __init__(self, **kwargs):
kwargs.setdefault("task_queue", RedisQueue(
host="redis.cluster.com",
database=0
))
super().__init__(**kwargs)
def run_spider(self):
"""重写运行逻辑,添加智能调度"""
consecutive_empty = 0
max_empty_attempts = 5
while True:
try:
# 尝试获取任务
seeds = self.task_queue.get(self.queue_name, count=5)
if seeds:
consecutive_empty = 0
# 处理任务
for seed in seeds:
context = self.Context(target=self, seeds=seed)
self.process_task(context)
else:
consecutive_empty += 1
# 尝试智能翻转
if self.task_queue.smart_reverse(self.queue_name):
print("队列翻转成功,继续处理")
consecutive_empty = 0
continue
# 连续多次为空,增加等待时间
wait_time = min(consecutive_empty * 2, 30)
print(f"暂无任务,等待 {wait_time} 秒...")
time.sleep(wait_time)
if consecutive_empty >= max_empty_attempts:
print("长时间无任务,Worker 退出")
break
except KeyboardInterrupt:
print("收到中断信号,Worker 退出")
break
except Exception as e:
print(f"Worker 异常: {e}")
time.sleep(5)
2. 分布式状态监控
class MonitoredDistributedSpider(air.Spider):
def __init__(self, **kwargs):
self.redis_queue = RedisQueue(host="redis.cluster.com", database=0)
kwargs.setdefault("task_queue", self.redis_queue)
super().__init__(**kwargs)
def start_monitoring(self):
"""启动监控"""
import threading
# 启动状态上报
threading.Thread(target=self.report_status, daemon=True).start()
# 启动订阅监听
self.start_subscriber()
def report_status(self):
"""定期上报状态"""
while True:
try:
status = {
"worker_id": self.worker_id,
"running_tasks": self.dispatcher.running,
"processed_count": getattr(self, "processed_count", 0),
"timestamp": time.time()
}
# 上报到 Redis
self.redis_queue.redis_db.hset(
f"{self.queue_name}:status",
self.worker_id,
json.dumps(status)
)
# 设置过期时间
self.redis_queue.redis_db.expire(
f"{self.queue_name}:status", 60
)
time.sleep(30) # 每30秒上报一次
except Exception as e:
print(f"状态上报失败: {e}")
time.sleep(5)
def start_subscriber(self):
"""启动订阅监听"""
def handle_message(message):
data = json.loads(message["data"])
action = data.get("action")
if action == "collect-status":
# 响应状态收集请求
status = self.get_current_status()
key = data["key"]
self.redis_queue.redis_db.hset(key, {
self.worker_id: status
})
elif action == "shutdown":
# 响应关闭请求
print("收到关闭指令")
self.shutdown_requested = True
# 订阅控制频道
pubsub = self.redis_queue.redis_db.pubsub()
pubsub.subscribe(**{
f"{self.queue_name}-control": handle_message
})
# 在后台运行
thread = pubsub.run_in_thread(sleep_time=0.001, daemon=True)
return thread
def get_current_status(self):
"""获取当前状态"""
return {
"running": self.dispatcher.running,
"queue_size": self.task_queue.size(self.queue_name),
"processed": getattr(self, "processed_count", 0)
}
3. 分布式协调器
class DistributedCoordinator:
def __init__(self, redis_queue, spider_name):
self.redis_queue = redis_queue
self.spider_name = spider_name
def get_cluster_status(self):
"""获取集群状态"""
status_key = f"{self.spider_name}:status"
worker_statuses = self.redis_queue.redis_db.hgetall(status_key)
cluster_stats = {
"total_workers": len(worker_statuses),
"total_running_tasks": 0,
"total_processed": 0,
"workers": {}
}
for worker_id, status_json in worker_statuses.items():
try:
status = json.loads(status_json)
cluster_stats["workers"][worker_id] = status
cluster_stats["total_running_tasks"] += status.get("running_tasks", 0)
cluster_stats["total_processed"] += status.get("processed_count", 0)
except:
pass
return cluster_stats
def broadcast_command(self, command, **kwargs):
"""广播命令到所有 Worker"""
message = {
"action": command,
"timestamp": time.time(),
**kwargs
}
self.redis_queue.redis_db.publish(
f"{self.spider_name}-control",
json.dumps(message)
)
def scale_workers(self, target_count):
"""动态调整 Worker 数量"""
current_status = self.get_cluster_status()
current_count = current_status["total_workers"]
if target_count > current_count:
# 需要启动更多 Worker
print(f"需要启动 {target_count - current_count} 个 Worker")
# 这里可以集成容器编排系统
elif target_count < current_count:
# 需要关闭一些 Worker
excess_count = current_count - target_count
print(f"需要关闭 {excess_count} 个 Worker")
self.broadcast_command("shutdown", count=excess_count)
def auto_scale(self):
"""自动扩缩容"""
queue_size = self.redis_queue.size(self.spider_name)
cluster_stats = self.get_cluster_status()
# 根据队列大小和当前负载决定扩缩容
if queue_size > 1000 and cluster_stats["total_workers"] < 10:
self.scale_workers(min(10, cluster_stats["total_workers"] + 2))
elif queue_size < 100 and cluster_stats["total_workers"] > 2:
self.scale_workers(max(2, cluster_stats["total_workers"] - 1))
# 使用协调器
coordinator = DistributedCoordinator(redis_queue, "distributed_spider")
# 监控和自动扩缩容
def auto_scaling_loop():
while True:
try:
coordinator.auto_scale()
time.sleep(60) # 每分钟检查一次
except Exception as e:
print(f"自动扩缩容异常: {e}")
time.sleep(10)
threading.Thread(target=auto_scaling_loop, daemon=True).start()
部署和运维
1. Docker 部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "distributed_spider.py"]
# docker-compose.yml
version: '3.8'
services:
redis:
image: redis:6-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
master:
build: .
depends_on:
- redis
environment:
- REDIS_HOST=redis
- SPIDER_MODE=master
command: python distributed_spider.py master
worker:
build: .
depends_on:
- redis
- master
environment:
- REDIS_HOST=redis
- SPIDER_MODE=worker
command: python distributed_spider.py worker
deploy:
replicas: 3
volumes:
redis_data:
2. Kubernetes 部署
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: spider-worker
spec:
replicas: 5
selector:
matchLabels:
app: spider-worker
template:
metadata:
labels:
app: spider-worker
spec:
containers:
- name: worker
image: your-registry/spider:latest
env:
- name: REDIS_HOST
value: "redis-service"
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
command: ["python", "distributed_spider.py", "worker"]
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
3. 监控和告警
class DistributedMonitor:
def __init__(self, redis_queue, spider_name):
self.redis_queue = redis_queue
self.spider_name = spider_name
def check_health(self):
"""健康检查"""
issues = []
# 检查队列积压
queue_size = self.redis_queue.size(self.spider_name)
if queue_size > 10000:
issues.append(f"队列积压严重: {queue_size}")
# 检查失败率
failure_size = self.redis_queue.size(self.spider_name, qtypes=("failure",))
if failure_size > queue_size * 0.1:
issues.append(f"失败率过高: {failure_size}/{queue_size}")
# 检查 Worker 状态
cluster_stats = self.get_cluster_status()
if cluster_stats["total_workers"] == 0:
issues.append("没有活跃的 Worker")
return issues
def send_alert(self, message):
"""发送告警"""
# 集成告警系统(如钉钉、邮件等)
print(f"🚨 告警: {message}")
def monitor_loop(self):
"""监控循环"""
while True:
try:
issues = self.check_health()
if issues:
for issue in issues:
self.send_alert(issue)
time.sleep(300) # 每5分钟检查一次
except Exception as e:
print(f"监控异常: {e}")
time.sleep(60)
# 启动监控
monitor = DistributedMonitor(redis_queue, "distributed_spider")
threading.Thread(target=monitor.monitor_loop, daemon=True).start()
最佳实践
1. 容错设计
class FaultTolerantWorker(air.Spider):
def __init__(self, **kwargs):
self.max_failures = 5
self.failure_count = 0
super().__init__(**kwargs)
def item_pipeline(self, context):
try:
# 处理数据
self.process_data(context.items)
context.success()
self.failure_count = 0 # 重置失败计数
except Exception as e:
self.failure_count += 1
print(f"处理失败 ({self.failure_count}/{self.max_failures}): {e}")
if self.failure_count >= self.max_failures:
print("连续失败次数过多,Worker 退出")
raise SystemExit
context.failure()
2. 资源优化
class OptimizedWorker(air.Spider):
def __init__(self, **kwargs):
# 根据机器配置调整并发数
import psutil
cpu_count = psutil.cpu_count()
memory_gb = psutil.virtual_memory().total // (1024**3)
# 动态调整并发数
optimal_concurrency = min(cpu_count * 2, memory_gb, 20)
kwargs.setdefault("concurrency", optimal_concurrency)
super().__init__(**kwargs)
print(f"Worker 启动,并发数: {optimal_concurrency}")
3. 数据一致性
class ConsistentWorker(air.Spider):
def item_pipeline(self, context):
# 使用事务确保数据一致性
with self.database.transaction():
try:
self.save_data(context.items)
context.success()
except Exception as e:
print(f"数据保存失败: {e}")
context.failure()
raise
分布式爬取是 Bricks 框架的强大特性,通过合理的架构设计和运维实践,可以构建高效、稳定的大规模爬虫系统。