bricks
开发指南
2.5 RPC篇
2.5.5 爬虫API化

2.5.5 爬虫 API 化

将爬虫转换为 API 服务是Bricks框架的一个重要特色功能。通过简单的配置,就可以将任何爬虫转换为可远程调用的 RPC 服务,实现爬虫的服务化部署和调用。

核心概念

什么是爬虫 API 化

爬虫 API 化是指将传统的本地运行爬虫转换为可通过网络调用的 API 服务。这样做的好处包括:

  • 服务化部署:爬虫作为独立服务运行,便于管理和扩展
  • 远程调用:其他系统可以通过 API 调用爬虫功能
  • 资源隔离:爬虫运行在独立环境中,不影响调用方
  • 负载均衡:可以部署多个爬虫实例,实现负载分担
  • 统一接口:提供标准化的调用接口

Rpc.wrap 方法

Bricks提供了Rpc.wrap方法,可以轻松将任何爬虫类包装成 RPC 服务:

from bricks.spider.addon import Rpc
 
# 包装爬虫为RPC服务
rpc = Rpc.wrap(MySpider)
 
# 启动RPC服务
rpc.serve(concurrency=10, mode="http", ident=8080)

基础爬虫 API 化

1. 定义爬虫类

首先,我们需要一个标准的爬虫类:

# spider_api.py
from loguru import logger
from bricks import Request, const
from bricks.core import events, signals
from bricks.spider import air
from bricks.spider.air import Context
 
class ApiSpider(air.Spider):
    """API化的爬虫示例"""
 
    def make_seeds(self, context: Context, **kwargs):
        """
        生成种子数据
        kwargs 参数来自RPC调用时传入的参数
        """
        # 从RPC调用参数中获取页面范围
        start_page = kwargs.get("start_page", 1)
        end_page = kwargs.get("end_page", 5)
 
        # 生成页面种子
        seeds = []
        for page in range(start_page, end_page + 1):
            seeds.append({
                "page": page,
                "category": kwargs.get("category", "default")
            })
 
        logger.info(f"生成了 {len(seeds)} 个种子")
        return seeds
 
    def make_request(self, context: Context) -> Request:
        """构造请求"""
        seeds = context.seeds
        page = seeds["page"]
        category = seeds.get("category", "default")
 
        # 根据不同类别构造不同的请求
        if category == "music":
            return Request(
                url="https://fx1.service.kugou.com/mfanxing-home/h5/cdn/room/index/list_v2",
                params={"page": page, "cid": 6000},
                headers={
                    'User-Agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
                }
            )
        else:
            return Request(
                url=f"https://httpbin.org/json?page={page}&category={category}",
                headers={
                    'User-Agent': "Bricks Spider API 1.0"
                }
            )
 
    def parse(self, context: Context):
        """解析响应"""
        response = context.response
        seeds = context.seeds
        category = seeds.get("category", "default")
 
        if category == "music":
            # 解析音乐数据
            return response.extract(
                engine="json",
                rules={
                    "data.list": {
                        "userId": "userId",
                        "roomId": "roomId",
                        "score": "score",
                        "startTime": "startTime",
                        "status": "status",
                    }
                }
            )
        else:
            # 解析通用JSON数据
            return response.extract(
                engine="json",
                rules={
                    "args": {
                        "page": "page",
                        "category": "category"
                    },
                    "headers": {
                        "user_agent": "User-Agent"
                    }
                }
            )
 
    def item_pipeline(self, context: Context):
        """处理数据"""
        items = context.items
        logger.info(f"处理了 {len(items)} 条数据: {items}")
 
        # 标记种子处理完成
        context.success()
 
    @staticmethod
    @events.on(const.AFTER_REQUEST)
    def validate_response(context: Context):
        """验证响应是否成功"""
        response = context.response
 
        # 检查HTTP状态码
        if response.status_code >= 400:
            logger.warning(f"HTTP错误: {response.status_code}")
            raise signals.Retry
 
        # 检查响应内容
        if context.seeds.get("category") == "music":
            try:
                data = response.json()
                if data.get("code") != 0:
                    logger.warning(f"业务错误: {data.get('message')}")
                    raise signals.Retry
            except:
                logger.warning("响应不是有效的JSON")
                raise signals.Retry

2. 启动 API 服务

# api_server.py
from bricks.spider.addon import Rpc
from spider_api import ApiSpider
 
def on_crawl_finish(ctx: ApiSpider.Context, err: Exception):
    """爬虫完成回调"""
    if err:
        logger.error(f"爬虫执行出错: {err}")
    else:
        logger.info(f"爬虫执行完成,处理了种子: {ctx.seeds}")
 
if __name__ == '__main__':
    # 包装爬虫为RPC服务
    rpc = Rpc.wrap(ApiSpider)
 
    # 添加完成回调
    rpc.with_callback(on_crawl_finish)
 
    # 启动HTTP RPC服务
    rpc.serve(
        concurrency=10,     # 并发数
        mode="http",        # 协议模式
        ident=8080         # 端口
    )

3. 客户端调用

# api_client.py
from bricks.rpc.http_.service import Client
import json
 
def test_spider_api():
    """测试爬虫API"""
    client = Client("localhost:8080")
 
    # 调用爬虫执行方法
    response = client.rpc("execute", {
        "start_page": 1,
        "end_page": 3,
        "category": "music"
    })
 
    if response.code == 0:
        result = json.loads(response.data)
        print(f"爬虫执行成功: {result}")
    else:
        print(f"爬虫执行失败: {response.message}")
 
def test_custom_category():
    """测试自定义分类"""
    client = Client("localhost:8080")
 
    response = client.rpc("execute", {
        "start_page": 1,
        "end_page": 2,
        "category": "test",
        "custom_param": "value"
    })
 
    print(f"响应: {response.data}")
 
if __name__ == '__main__':
    test_spider_api()
    test_custom_category()

高级 API 化功能

1. 动态参数配置

class ConfigurableSpider(air.Spider):
    """支持动态配置的爬虫"""
 
    def make_seeds(self, context: Context, **kwargs):
        """根据配置生成种子"""
        # 获取URL列表
        urls = kwargs.get("urls", [])
        if urls:
            return [{"url": url} for url in urls]
 
        # 或者根据其他参数生成
        base_url = kwargs.get("base_url", "https://httpbin.org")
        pages = kwargs.get("pages", [1, 2, 3])
 
        return [{"url": f"{base_url}/json?page={page}"} for page in pages]
 
    def make_request(self, context: Context) -> Request:
        """构造请求"""
        seeds = context.seeds
        url = seeds["url"]
 
        # 从种子中获取请求配置
        headers = seeds.get("headers", {})
        params = seeds.get("params", {})
        proxy = seeds.get("proxy")
 
        return Request(
            url=url,
            headers=headers,
            params=params,
            proxy=proxy,
            timeout=seeds.get("timeout", 30)
        )
 
    def parse(self, context: Context):
        """解析响应"""
        response = context.response
 
        # 根据URL判断解析方式
        if "json" in response.url:
            return response.json()
        elif "html" in response.url:
            return {
                "title": response.xpath("//title/text()").get(""),
                "content_length": len(response.text)
            }
        else:
            return {"content": response.text[:100]}
 
    def item_pipeline(self, context: Context):
        """处理数据"""
        items = context.items
        logger.info(f"处理数据: {items}")
        context.success()
 
# 启动可配置爬虫API
if __name__ == '__main__':
    rpc = Rpc.wrap(ConfigurableSpider)
    rpc.serve(concurrency=10, mode="http", ident=8081)

2. 批量 URL 处理

# batch_client.py
from bricks.rpc.http_.service import Client
 
def batch_crawl_urls():
    """批量爬取URL"""
    client = Client("localhost:8081")
 
    # 批量URL配置
    config = {
        "urls": [
            "https://httpbin.org/json",
            "https://httpbin.org/html",
            "https://httpbin.org/xml"
        ],
        "headers": {
            "User-Agent": "Batch Crawler 1.0"
        },
        "timeout": 30
    }
 
    response = client.rpc("execute", config)
 
    if response.code == 0:
        print("批量爬取成功")
        result = json.loads(response.data)
        print(f"结果: {result}")
    else:
        print(f"批量爬取失败: {response.message}")
 
if __name__ == '__main__':
    batch_crawl_urls()

3. 异步 API 调用

# async_client.py
import asyncio
import aiohttp
import json
 
class AsyncSpiderClient:
    """异步爬虫API客户端"""
 
    def __init__(self, base_url: str):
        self.base_url = base_url.rstrip('/')
 
    async def execute_spider(self, session, config):
        """异步执行爬虫"""
        request_data = {
            "method": "execute",
            "data": json.dumps({"args": [], "kwargs": config}),
            "request_id": f"req_{id(config)}"
        }
 
        async with session.post(
            f"{self.base_url}/rpc",
            json=request_data
        ) as response:
            result = await response.json()
            return result
 
    async def batch_execute(self, configs):
        """批量异步执行"""
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.execute_spider(session, config)
                for config in configs
            ]
            results = await asyncio.gather(*tasks)
            return results
 
async def test_async_batch():
    """测试异步批量调用"""
    client = AsyncSpiderClient("http://localhost:8081")
 
    # 多个配置
    configs = [
        {"urls": ["https://httpbin.org/json"], "category": "json"},
        {"urls": ["https://httpbin.org/html"], "category": "html"},
        {"urls": ["https://httpbin.org/xml"], "category": "xml"}
    ]
 
    results = await client.batch_execute(configs)
 
    for i, result in enumerate(results):
        print(f"任务 {i+1} 结果: {result}")
 
if __name__ == '__main__':
    asyncio.run(test_async_batch())

多协议 API 服务

1. 同时支持多种协议

# multi_protocol_server.py
import asyncio
from bricks.rpc.common import start_rpc_server
from spider_api import ApiSpider
 
async def start_multi_protocol():
    """启动多协议API服务"""
    spider_class = ApiSpider
 
    # 同时启动多种协议服务
    tasks = [
        start_rpc_server(spider_class, mode="http", ident=8080),
        start_rpc_server(spider_class, mode="websocket", ident=8081),
        start_rpc_server(spider_class, mode="grpc", ident=50051),
    ]
 
    print("启动多协议爬虫API服务...")
    print("HTTP: http://localhost:8080")
    print("WebSocket: ws://localhost:8081")
    print("gRPC: localhost:50051")
 
    await asyncio.gather(*tasks)
 
if __name__ == '__main__':
    asyncio.run(start_multi_protocol())

2. 协议特定的客户端

# protocol_clients.py
import json
 
def test_http_api():
    """测试HTTP API"""
    from bricks.rpc.http_.service import Client
 
    client = Client("localhost:8080")
    response = client.rpc("execute", {"start_page": 1, "end_page": 2})
    print(f"HTTP响应: {response.data}")
 
def test_websocket_api():
    """测试WebSocket API"""
    from bricks.rpc.websocket_.service import Client
 
    client = Client("localhost:8081")
    try:
        response = client.rpc("execute", {"start_page": 1, "end_page": 2})
        print(f"WebSocket响应: {response.data}")
    finally:
        client.close()
 
def test_grpc_api():
    """测试gRPC API"""
    from bricks.rpc.grpc_.service import Client
 
    client = Client("localhost:50051")
    response = client.rpc("execute", {"start_page": 1, "end_page": 2})
    print(f"gRPC响应: {response.data}")
 
if __name__ == '__main__':
    print("测试不同协议的爬虫API...")
    test_http_api()
    test_websocket_api()
    test_grpc_api()

生产环境部署

1. Docker 化部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8080

# 启动命令
CMD ["python", "api_server.py"]

2. 配置文件管理

# config.py
import os
from dataclasses import dataclass
 
@dataclass
class SpiderConfig:
    """爬虫配置"""
    concurrency: int = int(os.getenv("SPIDER_CONCURRENCY", "10"))
    mode: str = os.getenv("SPIDER_MODE", "http")
    port: int = int(os.getenv("SPIDER_PORT", "8080"))
    proxy: str = os.getenv("SPIDER_PROXY", "")
    timeout: int = int(os.getenv("SPIDER_TIMEOUT", "30"))
    max_retry: int = int(os.getenv("SPIDER_MAX_RETRY", "3"))
 
# production_server.py
from bricks.spider.addon import Rpc
from spider_api import ApiSpider
from config import SpiderConfig
 
def create_production_spider():
    """创建生产环境爬虫"""
    config = SpiderConfig()
 
    # 配置爬虫
    spider_class = type('ProductionSpider', (ApiSpider,), {
        'proxy': config.proxy,
        'timeout': config.timeout,
        'max_retry': config.max_retry
    })
 
    return spider_class
 
if __name__ == '__main__':
    config = SpiderConfig()
    spider_class = create_production_spider()
 
    rpc = Rpc.wrap(spider_class)
    rpc.serve(
        concurrency=config.concurrency,
        mode=config.mode,
        ident=config.port
    )

3. 健康检查和监控

# health_check.py
from bricks.rpc.http_.service import Client
import time
import json
 
def health_check(api_url: str):
    """健康检查"""
    try:
        client = Client(api_url)
 
        # 发送PING请求
        response = client.rpc("PING")
 
        if response.code == 0 and response.data == "PONG":
            return {"status": "healthy", "timestamp": time.time()}
        else:
            return {"status": "unhealthy", "error": response.message}
 
    except Exception as e:
        return {"status": "error", "error": str(e)}
 
def monitor_spider_api():
    """监控爬虫API"""
    api_url = "localhost:8080"
 
    while True:
        health = health_check(api_url)
        print(f"健康检查: {json.dumps(health, indent=2)}")
 
        if health["status"] != "healthy":
            # 发送告警
            print("⚠️ 爬虫API异常!")
 
        time.sleep(30)  # 30秒检查一次
 
if __name__ == '__main__':
    monitor_spider_api()

API 参数传递机制

1. 参数传递流程

当通过 RPC 调用爬虫时,参数传递流程如下:

RPC调用参数 → make_seeds(**kwargs) → 种子数据 → 爬虫执行流程

2. 参数类型和用法

class ParameterSpider(air.Spider):
    """演示参数传递的爬虫"""
 
    def make_seeds(self, context: Context, **kwargs):
        """
        kwargs 包含所有RPC调用时传入的参数
        """
        # 基础参数
        urls = kwargs.get("urls", [])
        pages = kwargs.get("pages", [1])
 
        # 配置参数
        proxy = kwargs.get("proxy")
        timeout = kwargs.get("timeout", 30)
        headers = kwargs.get("headers", {})
 
        # 业务参数
        category = kwargs.get("category", "default")
        filters = kwargs.get("filters", {})
 
        # 生成种子时将参数传递下去
        seeds = []
        for page in pages:
            seed = {
                "page": page,
                "category": category,
                "proxy": proxy,
                "timeout": timeout,
                "headers": headers,
                "filters": filters
            }
            seeds.append(seed)
 
        return seeds
 
    def make_request(self, context: Context) -> Request:
        """使用种子中的参数构造请求"""
        seeds = context.seeds
 
        return Request(
            url=f"https://api.example.com/data?page={seeds['page']}&category={seeds['category']}",
            headers=seeds.get("headers", {}),
            proxy=seeds.get("proxy"),
            timeout=seeds.get("timeout", 30)
        )
 
# 客户端调用示例
client = Client("localhost:8080")
response = client.rpc("execute", {
    "pages": [1, 2, 3],
    "category": "electronics",
    "proxy": "http://proxy.example.com:8080",
    "timeout": 60,
    "headers": {
        "User-Agent": "Custom Spider 1.0",
        "Authorization": "Bearer token123"
    },
    "filters": {
        "min_price": 100,
        "max_price": 1000
    }
})

错误处理和重试机制

1. API 级别的错误处理

class RobustSpider(air.Spider):
    """具有健壮错误处理的爬虫"""
 
    def make_seeds(self, context: Context, **kwargs):
        """验证参数并生成种子"""
        # 参数验证
        required_params = kwargs.get("required_params", [])
        for param in required_params:
            if param not in kwargs:
                raise ValueError(f"缺少必需参数: {param}")
 
        urls = kwargs.get("urls", [])
        if not urls:
            raise ValueError("urls参数不能为空")
 
        return [{"url": url, "retry_count": 0} for url in urls]
 
    def make_request(self, context: Context) -> Request:
        """构造请求"""
        seeds = context.seeds
        return Request(
            url=seeds["url"],
            retry=seeds.get("retry_count", 0),
            max_retry=3
        )
 
    def parse(self, context: Context):
        """解析响应"""
        response = context.response
 
        # 检查响应状态
        if response.status_code >= 400:
            raise Exception(f"HTTP错误: {response.status_code}")
 
        try:
            data = response.json()
            if not data:
                raise Exception("响应数据为空")
            return data
        except json.JSONDecodeError:
            raise Exception("响应不是有效的JSON格式")
 
    def item_pipeline(self, context: Context):
        """处理数据"""
        items = context.items
 
        # 数据验证
        if not items:
            logger.warning("没有提取到数据")
 
        logger.info(f"成功处理 {len(items)} 条数据")
        context.success()
 
    @staticmethod
    @events.on(const.AFTER_REQUEST)
    def handle_errors(context: Context):
        """统一错误处理"""
        response = context.response
 
        # 根据状态码决定是否重试
        if response.status_code == 429:  # 限流
            logger.warning("遇到限流,等待后重试")
            time.sleep(5)
            raise signals.Retry
        elif response.status_code >= 500:  # 服务器错误
            logger.warning("服务器错误,重试")
            raise signals.Retry
        elif response.status_code == 403:  # 禁止访问
            logger.error("访问被禁止,跳过")
            raise signals.Drop
 
# 带错误处理的API服务
def create_robust_api():
    """创建健壮的API服务"""
    def error_callback(ctx: RobustSpider.Context, err: Exception):
        """错误回调"""
        if err:
            logger.error(f"爬虫执行失败: {err}")
            # 可以在这里发送告警、记录日志等
        else:
            logger.info("爬虫执行成功")
 
    rpc = Rpc.wrap(RobustSpider)
    rpc.with_callback(error_callback)
    return rpc
 
if __name__ == '__main__':
    api = create_robust_api()
    api.serve(concurrency=10, mode="http", ident=8080)

2. 客户端错误处理

# robust_client.py
from bricks.rpc.http_.service import Client
import json
import time
 
class RobustApiClient:
    """健壮的API客户端"""
 
    def __init__(self, api_url: str, max_retries: int = 3):
        self.client = Client(api_url)
        self.max_retries = max_retries
 
    def execute_with_retry(self, config: dict):
        """带重试的执行"""
        last_error = None
 
        for attempt in range(self.max_retries + 1):
            try:
                response = self.client.rpc("execute", config)
 
                if response.code == 0:
                    return json.loads(response.data)
                else:
                    last_error = response.message
                    logger.warning(f"尝试 {attempt + 1} 失败: {last_error}")
 
            except Exception as e:
                last_error = str(e)
                logger.warning(f"尝试 {attempt + 1} 异常: {last_error}")
 
            # 如果不是最后一次尝试,等待后重试
            if attempt < self.max_retries:
                wait_time = 2 ** attempt  # 指数退避
                logger.info(f"等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
 
        raise Exception(f"执行失败,已重试 {self.max_retries} 次: {last_error}")
 
    def validate_config(self, config: dict):
        """验证配置"""
        required_fields = ["urls"]
        for field in required_fields:
            if field not in config:
                raise ValueError(f"配置缺少必需字段: {field}")
 
        if not isinstance(config["urls"], list):
            raise ValueError("urls必须是列表类型")
 
        if not config["urls"]:
            raise ValueError("urls不能为空")
 
# 使用示例
def safe_api_call():
    """安全的API调用"""
    client = RobustApiClient("localhost:8080", max_retries=3)
 
    config = {
        "urls": ["https://httpbin.org/json", "https://httpbin.org/html"],
        "timeout": 30,
        "headers": {"User-Agent": "Robust Client 1.0"}
    }
 
    try:
        # 验证配置
        client.validate_config(config)
 
        # 执行爬虫
        result = client.execute_with_retry(config)
        print(f"执行成功: {result}")
 
    except Exception as e:
        print(f"执行失败: {e}")
 
if __name__ == '__main__':
    safe_api_call()

性能优化和监控

1. 性能监控

# performance_monitor.py
import time
import threading
from collections import defaultdict
from bricks.spider.addon import Rpc
from bricks.spider import air
 
class MonitoredSpider(air.Spider):
    """带性能监控的爬虫"""
 
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metrics = defaultdict(list)
        self.start_time = None
 
    def make_seeds(self, context, **kwargs):
        """记录开始时间"""
        self.start_time = time.time()
        urls = kwargs.get("urls", ["https://httpbin.org/json"])
        return [{"url": url} for url in urls]
 
    def make_request(self, context):
        """记录请求指标"""
        seeds = context.seeds
        request_start = time.time()
        seeds["request_start"] = request_start
 
        return Request(url=seeds["url"])
 
    def parse(self, context):
        """记录解析指标"""
        response = context.response
        seeds = context.seeds
 
        # 计算请求耗时
        request_duration = time.time() - seeds["request_start"]
        self.metrics["request_duration"].append(request_duration)
 
        # 记录响应大小
        response_size = len(response.content)
        self.metrics["response_size"].append(response_size)
 
        return response.json()
 
    def item_pipeline(self, context):
        """记录处理指标"""
        items = context.items
 
        # 记录数据量
        self.metrics["items_count"].append(len(items))
 
        # 计算总耗时
        if self.start_time:
            total_duration = time.time() - self.start_time
            self.metrics["total_duration"].append(total_duration)
 
        context.success()
 
    def get_metrics(self):
        """获取性能指标"""
        if not self.metrics:
            return {"message": "暂无指标数据"}
 
        summary = {}
        for metric_name, values in self.metrics.items():
            if values:
                summary[metric_name] = {
                    "count": len(values),
                    "avg": sum(values) / len(values),
                    "min": min(values),
                    "max": max(values),
                    "total": sum(values)
                }
 
        return summary
 
# 创建监控API
class MonitoringApi:
    """监控API包装器"""
 
    def __init__(self, spider_class):
        self.spider_instance = None
        self.spider_class = spider_class
 
    def execute(self, **kwargs):
        """执行爬虫并返回结果"""
        self.spider_instance = self.spider_class()
 
        # 执行爬虫
        context = self.spider_instance.Context(
            target=self.spider_instance,
            seeds={}
        )
 
        # 这里简化了执行流程,实际应该使用完整的爬虫运行机制
        seeds = self.spider_instance.make_seeds(context, **kwargs)
        results = []
 
        for seed in seeds:
            context.seeds = seed
            request = self.spider_instance.make_request(context)
            # 模拟请求执行...
 
        return {"status": "completed", "results": results}
 
    def get_metrics(self):
        """获取性能指标"""
        if self.spider_instance:
            return self.spider_instance.get_metrics()
        return {"message": "爬虫尚未执行"}
 
# 启动监控API
if __name__ == '__main__':
    api = MonitoringApi(MonitoredSpider)
    rpc = Rpc.wrap(api)
    rpc.serve(concurrency=10, mode="http", ident=8080)

2. 负载均衡和扩展

# load_balancer.py
import random
from bricks.rpc.http_.service import Client
 
class SpiderLoadBalancer:
    """爬虫负载均衡器"""
 
    def __init__(self, spider_nodes: list):
        self.nodes = spider_nodes
        self.clients = {node: Client(node) for node in spider_nodes}
        self.node_stats = {node: {"requests": 0, "errors": 0} for node in spider_nodes}
 
    def select_node(self, strategy="round_robin"):
        """选择节点"""
        if strategy == "random":
            return random.choice(self.nodes)
        elif strategy == "least_requests":
            return min(self.nodes, key=lambda n: self.node_stats[n]["requests"])
        else:  # round_robin
            # 简单的轮询实现
            if not hasattr(self, "_current_index"):
                self._current_index = 0
            node = self.nodes[self._current_index]
            self._current_index = (self._current_index + 1) % len(self.nodes)
            return node
 
    def execute_spider(self, config: dict, strategy="round_robin"):
        """执行爬虫任务"""
        node = self.select_node(strategy)
        client = self.clients[node]
 
        try:
            self.node_stats[node]["requests"] += 1
            response = client.rpc("execute", config)
 
            if response.code == 0:
                return {
                    "success": True,
                    "data": response.data,
                    "node": node
                }
            else:
                self.node_stats[node]["errors"] += 1
                return {
                    "success": False,
                    "error": response.message,
                    "node": node
                }
 
        except Exception as e:
            self.node_stats[node]["errors"] += 1
            return {
                "success": False,
                "error": str(e),
                "node": node
            }
 
    def get_stats(self):
        """获取节点统计"""
        return self.node_stats
 
# 使用负载均衡器
def test_load_balancer():
    """测试负载均衡"""
    # 假设有3个爬虫节点
    nodes = [
        "localhost:8080",
        "localhost:8081",
        "localhost:8082"
    ]
 
    balancer = SpiderLoadBalancer(nodes)
 
    # 执行多个任务
    for i in range(10):
        config = {
            "urls": [f"https://httpbin.org/json?id={i}"],
            "timeout": 30
        }
 
        result = balancer.execute_spider(config, strategy="least_requests")
        print(f"任务 {i}: {result}")
 
    # 查看统计
    print("节点统计:", balancer.get_stats())
 
if __name__ == '__main__':
    test_load_balancer()

最佳实践

1. 配置管理

# spider_config.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
 
@dataclass
class SpiderApiConfig:
    """爬虫API配置"""
    # 服务配置
    host: str = "0.0.0.0"
    port: int = 8080
    mode: str = "http"
    concurrency: int = 10
 
    # 爬虫配置
    default_timeout: int = 30
    max_retry: int = 3
    default_headers: Dict[str, str] = field(default_factory=dict)
 
    # 限制配置
    max_urls_per_request: int = 100
    max_pages_per_request: int = 50
 
    # 安全配置
    allowed_domains: List[str] = field(default_factory=list)
    blocked_domains: List[str] = field(default_factory=list)
 
    @classmethod
    def from_env(cls):
        """从环境变量创建配置"""
        import os
        return cls(
            host=os.getenv("SPIDER_HOST", "0.0.0.0"),
            port=int(os.getenv("SPIDER_PORT", "8080")),
            mode=os.getenv("SPIDER_MODE", "http"),
            concurrency=int(os.getenv("SPIDER_CONCURRENCY", "10")),
            default_timeout=int(os.getenv("SPIDER_TIMEOUT", "30")),
            max_retry=int(os.getenv("SPIDER_MAX_RETRY", "3"))
        )
 
class ConfigurableApiSpider(air.Spider):
    """可配置的API爬虫"""
 
    def __init__(self, config: SpiderApiConfig = None):
        super().__init__()
        self.config = config or SpiderApiConfig()
 
    def make_seeds(self, context, **kwargs):
        """验证并生成种子"""
        urls = kwargs.get("urls", [])
 
        # 验证URL数量限制
        if len(urls) > self.config.max_urls_per_request:
            raise ValueError(f"URL数量超过限制: {len(urls)} > {self.config.max_urls_per_request}")
 
        # 验证域名白名单
        if self.config.allowed_domains:
            for url in urls:
                domain = self._extract_domain(url)
                if domain not in self.config.allowed_domains:
                    raise ValueError(f"域名不在允许列表中: {domain}")
 
        # 验证域名黑名单
        for url in urls:
            domain = self._extract_domain(url)
            if domain in self.config.blocked_domains:
                raise ValueError(f"域名在禁止列表中: {domain}")
 
        return [{"url": url} for url in urls]
 
    def _extract_domain(self, url: str) -> str:
        """提取域名"""
        from urllib.parse import urlparse
        return urlparse(url).netloc
 
    def make_request(self, context):
        """使用配置构造请求"""
        seeds = context.seeds
 
        # 合并默认headers
        headers = {**self.config.default_headers}
        headers.update(seeds.get("headers", {}))
 
        return Request(
            url=seeds["url"],
            headers=headers,
            timeout=seeds.get("timeout", self.config.default_timeout),
            max_retry=self.config.max_retry
        )
 
# 生产环境启动脚本
def start_production_api():
    """启动生产环境API"""
    config = SpiderApiConfig.from_env()
    spider = ConfigurableApiSpider(config)
 
    rpc = Rpc.wrap(spider)
    rpc.serve(
        concurrency=config.concurrency,
        mode=config.mode,
        ident=config.port
    )
 
if __name__ == '__main__':
    start_production_api()

2. 日志和审计

# audit_spider.py
import json
import time
from datetime import datetime
from bricks.spider import air
 
class AuditSpider(air.Spider):
    """带审计功能的爬虫"""
 
    def __init__(self):
        super().__init__()
        self.audit_log = []
 
    def make_seeds(self, context, **kwargs):
        """记录请求审计"""
        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "action": "request_received",
            "params": kwargs,
            "client_ip": kwargs.get("client_ip", "unknown")
        }
        self.audit_log.append(audit_entry)
 
        urls = kwargs.get("urls", [])
        return [{"url": url} for url in urls]
 
    def make_request(self, context):
        """记录请求审计"""
        seeds = context.seeds
        url = seeds["url"]
 
        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "action": "request_sent",
            "url": url
        }
        self.audit_log.append(audit_entry)
 
        return Request(url=url)
 
    def parse(self, context):
        """记录解析审计"""
        response = context.response
 
        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "action": "response_received",
            "url": response.url,
            "status_code": response.status_code,
            "content_length": len(response.content)
        }
        self.audit_log.append(audit_entry)
 
        return response.json()
 
    def item_pipeline(self, context):
        """记录处理审计"""
        items = context.items
 
        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "action": "data_processed",
            "items_count": len(items)
        }
        self.audit_log.append(audit_entry)
 
        context.success()
 
    def get_audit_log(self):
        """获取审计日志"""
        return self.audit_log
 
    def clear_audit_log(self):
        """清空审计日志"""
        self.audit_log.clear()
 
# 审计API包装器
class AuditApiWrapper:
    """审计API包装器"""
 
    def __init__(self):
        self.spider_instance = None
 
    def execute(self, **kwargs):
        """执行爬虫"""
        self.spider_instance = AuditSpider()
        # 执行爬虫逻辑...
        return {"status": "completed"}
 
    def get_audit_log(self):
        """获取审计日志"""
        if self.spider_instance:
            return self.spider_instance.get_audit_log()
        return []
 
    def clear_audit_log(self):
        """清空审计日志"""
        if self.spider_instance:
            self.spider_instance.clear_audit_log()
        return {"status": "cleared"}
 
if __name__ == '__main__':
    wrapper = AuditApiWrapper()
    rpc = Rpc.wrap(wrapper)
    rpc.serve(concurrency=10, mode="http", ident=8080)

通过这些功能和最佳实践,可以构建出生产级别的爬虫 API 服务,具备完善的错误处理、性能监控、负载均衡和审计功能。