bricks
开发指南
2.7 任务队列
2.7.5 分布式爬取

2.7.5 分布式爬取

分布式爬取是 Bricks 框架的核心特性之一,通过 Redis 队列实现多机器协同工作,大幅提升爬取效率。

分布式架构

1. 架构概览

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Master    │    │   Worker1   │    │   Worker2   │
│   (初始化)   │    │   (工作节点) │    │   (工作节点) │
└─────────────┘    └─────────────┘    └─────────────┘
       │                   │                   │
       └───────────────────┼───────────────────┘

                  ┌─────────────┐
                  │ Redis Queue │
                  │  (任务队列)  │
                  └─────────────┘

2. 角色分工

角色职责数量
Master生成初始种子,监控整体进度1个
Worker处理任务,执行爬取逻辑多个
Redis存储任务队列,协调各节点1个集群

基础分布式爬虫

1. Master 节点

from bricks.spider import air
from bricks.lib.queues import RedisQueue
from bricks import Request
import time
 
class MasterSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="redis.cluster.com",
            port=6379,
            database=0,
            password="your_password"
        ))
        kwargs.setdefault("queue_name", "distributed_news_spider")
        kwargs.setdefault("concurrency", 1)  # Master 只负责初始化
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        """生成大量种子"""
        print("Master: 开始生成种子...")
        
        # 生成分页种子
        seeds = []
        for category in ["tech", "business", "sports", "entertainment"]:
            for page in range(1, 101):  # 每个分类100页
                seeds.append({
                    "category": category,
                    "page": page,
                    "url": f"https://news.example.com/{category}?page={page}"
                })
        
        print(f"Master: 生成了 {len(seeds)} 个种子")
        return seeds
    
    def make_request(self, context) -> Request:
        """Master 通常不处理请求,只负责初始化"""
        return None
    
    def parse(self, context):
        """Master 不解析数据"""
        return []
    
    def item_pipeline(self, context):
        """Master 不处理数据"""
        context.success()
 
# 启动 Master
if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "master":
        spider = MasterSpider()
        result = spider.run_init()  # 只运行初始化
        print(f"Master 初始化完成: {result}")

2. Worker 节点

class WorkerSpider(air.Spider):
    def __init__(self, worker_id="worker_001", **kwargs):
        self.worker_id = worker_id
        kwargs.setdefault("task_queue", RedisQueue(
            host="redis.cluster.com",
            port=6379,
            database=0,
            password="your_password"
        ))
        kwargs.setdefault("queue_name", "distributed_news_spider")
        kwargs.setdefault("concurrency", 10)  # Worker 高并发处理
        kwargs.setdefault("forever", True)    # 持续运行
        super().__init__(**kwargs)
    
    def make_seeds(self, context, **kwargs):
        """Worker 不生成种子"""
        return []
    
    def make_request(self, context) -> Request:
        """构造请求"""
        return Request(
            url=context.seeds["url"],
            headers={
                "User-Agent": f"Worker-{self.worker_id}",
                "X-Worker-ID": self.worker_id
            }
        )
    
    def parse(self, context):
        """解析新闻列表"""
        response = context.response
        
        # 解析新闻链接
        news_links = response.xpath("//a[@class='news-link']/@href").getall()
        
        # 生成详情页种子
        detail_seeds = []
        for link in news_links:
            detail_seeds.append({
                "type": "detail",
                "url": link,
                "category": context.seeds["category"],
                "source_page": context.seeds["page"]
            })
        
        # 提交详情页任务
        if detail_seeds:
            context.submit(*detail_seeds)
        
        return [{"type": "list", "count": len(news_links)}]
    
    def item_pipeline(self, context):
        """处理数据"""
        print(f"Worker-{self.worker_id}: 处理了 {context.items}")
        context.success()
 
# 启动 Worker
if __name__ == "__main__":
    import sys
    worker_id = sys.argv[2] if len(sys.argv) > 2 else "worker_001"
    
    if len(sys.argv) > 1 and sys.argv[1] == "worker":
        spider = WorkerSpider(worker_id=worker_id)
        spider.run_spider()  # 只运行爬虫逻辑

高级分布式特性

1. 智能负载均衡

class SmartWorkerSpider(air.Spider):
    def __init__(self, **kwargs):
        kwargs.setdefault("task_queue", RedisQueue(
            host="redis.cluster.com",
            database=0
        ))
        super().__init__(**kwargs)
    
    def run_spider(self):
        """重写运行逻辑,添加智能调度"""
        consecutive_empty = 0
        max_empty_attempts = 5
        
        while True:
            try:
                # 尝试获取任务
                seeds = self.task_queue.get(self.queue_name, count=5)
                
                if seeds:
                    consecutive_empty = 0
                    # 处理任务
                    for seed in seeds:
                        context = self.Context(target=self, seeds=seed)
                        self.process_task(context)
                else:
                    consecutive_empty += 1
                    
                    # 尝试智能翻转
                    if self.task_queue.smart_reverse(self.queue_name):
                        print("队列翻转成功,继续处理")
                        consecutive_empty = 0
                        continue
                    
                    # 连续多次为空,增加等待时间
                    wait_time = min(consecutive_empty * 2, 30)
                    print(f"暂无任务,等待 {wait_time} 秒...")
                    time.sleep(wait_time)
                    
                    if consecutive_empty >= max_empty_attempts:
                        print("长时间无任务,Worker 退出")
                        break
                        
            except KeyboardInterrupt:
                print("收到中断信号,Worker 退出")
                break
            except Exception as e:
                print(f"Worker 异常: {e}")
                time.sleep(5)

2. 分布式状态监控

class MonitoredDistributedSpider(air.Spider):
    def __init__(self, **kwargs):
        self.redis_queue = RedisQueue(host="redis.cluster.com", database=0)
        kwargs.setdefault("task_queue", self.redis_queue)
        super().__init__(**kwargs)
    
    def start_monitoring(self):
        """启动监控"""
        import threading
        
        # 启动状态上报
        threading.Thread(target=self.report_status, daemon=True).start()
        
        # 启动订阅监听
        self.start_subscriber()
    
    def report_status(self):
        """定期上报状态"""
        while True:
            try:
                status = {
                    "worker_id": self.worker_id,
                    "running_tasks": self.dispatcher.running,
                    "processed_count": getattr(self, "processed_count", 0),
                    "timestamp": time.time()
                }
                
                # 上报到 Redis
                self.redis_queue.redis_db.hset(
                    f"{self.queue_name}:status",
                    self.worker_id,
                    json.dumps(status)
                )
                
                # 设置过期时间
                self.redis_queue.redis_db.expire(
                    f"{self.queue_name}:status", 60
                )
                
                time.sleep(30)  # 每30秒上报一次
                
            except Exception as e:
                print(f"状态上报失败: {e}")
                time.sleep(5)
    
    def start_subscriber(self):
        """启动订阅监听"""
        def handle_message(message):
            data = json.loads(message["data"])
            action = data.get("action")
            
            if action == "collect-status":
                # 响应状态收集请求
                status = self.get_current_status()
                key = data["key"]
                self.redis_queue.redis_db.hset(key, {
                    self.worker_id: status
                })
            elif action == "shutdown":
                # 响应关闭请求
                print("收到关闭指令")
                self.shutdown_requested = True
        
        # 订阅控制频道
        pubsub = self.redis_queue.redis_db.pubsub()
        pubsub.subscribe(**{
            f"{self.queue_name}-control": handle_message
        })
        
        # 在后台运行
        thread = pubsub.run_in_thread(sleep_time=0.001, daemon=True)
        return thread
    
    def get_current_status(self):
        """获取当前状态"""
        return {
            "running": self.dispatcher.running,
            "queue_size": self.task_queue.size(self.queue_name),
            "processed": getattr(self, "processed_count", 0)
        }

3. 分布式协调器

class DistributedCoordinator:
    def __init__(self, redis_queue, spider_name):
        self.redis_queue = redis_queue
        self.spider_name = spider_name
    
    def get_cluster_status(self):
        """获取集群状态"""
        status_key = f"{self.spider_name}:status"
        worker_statuses = self.redis_queue.redis_db.hgetall(status_key)
        
        cluster_stats = {
            "total_workers": len(worker_statuses),
            "total_running_tasks": 0,
            "total_processed": 0,
            "workers": {}
        }
        
        for worker_id, status_json in worker_statuses.items():
            try:
                status = json.loads(status_json)
                cluster_stats["workers"][worker_id] = status
                cluster_stats["total_running_tasks"] += status.get("running_tasks", 0)
                cluster_stats["total_processed"] += status.get("processed_count", 0)
            except:
                pass
        
        return cluster_stats
    
    def broadcast_command(self, command, **kwargs):
        """广播命令到所有 Worker"""
        message = {
            "action": command,
            "timestamp": time.time(),
            **kwargs
        }
        
        self.redis_queue.redis_db.publish(
            f"{self.spider_name}-control",
            json.dumps(message)
        )
    
    def scale_workers(self, target_count):
        """动态调整 Worker 数量"""
        current_status = self.get_cluster_status()
        current_count = current_status["total_workers"]
        
        if target_count > current_count:
            # 需要启动更多 Worker
            print(f"需要启动 {target_count - current_count} 个 Worker")
            # 这里可以集成容器编排系统
        elif target_count < current_count:
            # 需要关闭一些 Worker
            excess_count = current_count - target_count
            print(f"需要关闭 {excess_count} 个 Worker")
            self.broadcast_command("shutdown", count=excess_count)
    
    def auto_scale(self):
        """自动扩缩容"""
        queue_size = self.redis_queue.size(self.spider_name)
        cluster_stats = self.get_cluster_status()
        
        # 根据队列大小和当前负载决定扩缩容
        if queue_size > 1000 and cluster_stats["total_workers"] < 10:
            self.scale_workers(min(10, cluster_stats["total_workers"] + 2))
        elif queue_size < 100 and cluster_stats["total_workers"] > 2:
            self.scale_workers(max(2, cluster_stats["total_workers"] - 1))
 
# 使用协调器
coordinator = DistributedCoordinator(redis_queue, "distributed_spider")
 
# 监控和自动扩缩容
def auto_scaling_loop():
    while True:
        try:
            coordinator.auto_scale()
            time.sleep(60)  # 每分钟检查一次
        except Exception as e:
            print(f"自动扩缩容异常: {e}")
            time.sleep(10)
 
threading.Thread(target=auto_scaling_loop, daemon=True).start()

部署和运维

1. Docker 部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["python", "distributed_spider.py"]
# docker-compose.yml
version: '3.8'
 
services:
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes
 
  master:
    build: .
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - SPIDER_MODE=master
    command: python distributed_spider.py master
 
  worker:
    build: .
    depends_on:
      - redis
      - master
    environment:
      - REDIS_HOST=redis
      - SPIDER_MODE=worker
    command: python distributed_spider.py worker
    deploy:
      replicas: 3
 
volumes:
  redis_data:

2. Kubernetes 部署

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: spider-worker
spec:
  replicas: 5
  selector:
    matchLabels:
      app: spider-worker
  template:
    metadata:
      labels:
        app: spider-worker
    spec:
      containers:
      - name: worker
        image: your-registry/spider:latest
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: WORKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        command: ["python", "distributed_spider.py", "worker"]
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
 
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379

3. 监控和告警

class DistributedMonitor:
    def __init__(self, redis_queue, spider_name):
        self.redis_queue = redis_queue
        self.spider_name = spider_name
    
    def check_health(self):
        """健康检查"""
        issues = []
        
        # 检查队列积压
        queue_size = self.redis_queue.size(self.spider_name)
        if queue_size > 10000:
            issues.append(f"队列积压严重: {queue_size}")
        
        # 检查失败率
        failure_size = self.redis_queue.size(self.spider_name, qtypes=("failure",))
        if failure_size > queue_size * 0.1:
            issues.append(f"失败率过高: {failure_size}/{queue_size}")
        
        # 检查 Worker 状态
        cluster_stats = self.get_cluster_status()
        if cluster_stats["total_workers"] == 0:
            issues.append("没有活跃的 Worker")
        
        return issues
    
    def send_alert(self, message):
        """发送告警"""
        # 集成告警系统(如钉钉、邮件等)
        print(f"🚨 告警: {message}")
    
    def monitor_loop(self):
        """监控循环"""
        while True:
            try:
                issues = self.check_health()
                if issues:
                    for issue in issues:
                        self.send_alert(issue)
                
                time.sleep(300)  # 每5分钟检查一次
            except Exception as e:
                print(f"监控异常: {e}")
                time.sleep(60)
 
# 启动监控
monitor = DistributedMonitor(redis_queue, "distributed_spider")
threading.Thread(target=monitor.monitor_loop, daemon=True).start()

最佳实践

1. 容错设计

class FaultTolerantWorker(air.Spider):
    def __init__(self, **kwargs):
        self.max_failures = 5
        self.failure_count = 0
        super().__init__(**kwargs)
    
    def item_pipeline(self, context):
        try:
            # 处理数据
            self.process_data(context.items)
            context.success()
            self.failure_count = 0  # 重置失败计数
        except Exception as e:
            self.failure_count += 1
            print(f"处理失败 ({self.failure_count}/{self.max_failures}): {e}")
            
            if self.failure_count >= self.max_failures:
                print("连续失败次数过多,Worker 退出")
                raise SystemExit
            
            context.failure()

2. 资源优化

class OptimizedWorker(air.Spider):
    def __init__(self, **kwargs):
        # 根据机器配置调整并发数
        import psutil
        cpu_count = psutil.cpu_count()
        memory_gb = psutil.virtual_memory().total // (1024**3)
        
        # 动态调整并发数
        optimal_concurrency = min(cpu_count * 2, memory_gb, 20)
        kwargs.setdefault("concurrency", optimal_concurrency)
        
        super().__init__(**kwargs)
        print(f"Worker 启动,并发数: {optimal_concurrency}")

3. 数据一致性

class ConsistentWorker(air.Spider):
    def item_pipeline(self, context):
        # 使用事务确保数据一致性
        with self.database.transaction():
            try:
                self.save_data(context.items)
                context.success()
            except Exception as e:
                print(f"数据保存失败: {e}")
                context.failure()
                raise

分布式爬取是 Bricks 框架的强大特性,通过合理的架构设计和运维实践,可以构建高效、稳定的大规模爬虫系统。