bricks
开发指南
2.5 RPC篇
2.5.6 实际应用

2.5.5 实际应用

本节通过具体的应用场景和完整示例,展示如何在实际项目中使用 Bricks 的 RPC 功能。

分布式爬虫系统

系统架构

┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Master    │    │   Worker1   │    │   Worker2   │
│   调度节点   │◄──►│   工作节点   │    │   工作节点   │
└─────────────┘    └─────────────┘    └─────────────┘
       │                   │                   │
       └───────────────────┼───────────────────┘

                   ┌─────────────┐
                   │   Storage   │
                   │   存储节点   │
                   └─────────────┘

Master 节点实现

# master.py
import time
import uuid
from typing import List, Dict
from bricks.rpc.common import serve
from bricks.rpc.http_.service import Client
 
class TaskManager:
    """任务管理器"""
 
    def __init__(self):
        self.tasks: Dict[str, dict] = {}
        self.workers: List[str] = []
        self.worker_clients: Dict[str, Client] = {}
 
    def register_worker(self, worker_url: str):
        """注册工作节点"""
        if worker_url not in self.workers:
            self.workers.append(worker_url)
            self.worker_clients[worker_url] = Client(worker_url)
            print(f"工作节点已注册: {worker_url}")
        return {"success": True, "worker_count": len(self.workers)}
 
    def submit_crawl_task(self, urls: List[str], config: dict = None):
        """提交爬取任务"""
        task_id = str(uuid.uuid4())
        task = {
            "id": task_id,
            "urls": urls,
            "config": config or {},
            "status": "pending",
            "created_at": time.time(),
            "results": []
        }
        self.tasks[task_id] = task
 
        # 分发任务到工作节点
        self._distribute_task(task)
 
        return {"task_id": task_id, "status": "submitted"}
 
    def _distribute_task(self, task: dict):
        """分发任务到工作节点"""
        if not self.workers:
            print("没有可用的工作节点")
            return
 
        urls = task["urls"]
        urls_per_worker = len(urls) // len(self.workers)
 
        for i, worker_url in enumerate(self.workers):
            start_idx = i * urls_per_worker
            end_idx = start_idx + urls_per_worker if i < len(self.workers) - 1 else len(urls)
            worker_urls = urls[start_idx:end_idx]
 
            if worker_urls:
                try:
                    client = self.worker_clients[worker_url]
                    response = client.rpc("crawl_urls", worker_urls, task["config"])
                    print(f"任务已分发到 {worker_url}: {len(worker_urls)} 个URL")
                except Exception as e:
                    print(f"分发任务到 {worker_url} 失败: {e}")
 
    def get_task_status(self, task_id: str):
        """获取任务状态"""
        task = self.tasks.get(task_id)
        if not task:
            return {"error": "任务不存在"}
 
        return {
            "task_id": task_id,
            "status": task["status"],
            "url_count": len(task["urls"]),
            "result_count": len(task["results"]),
            "created_at": task["created_at"]
        }
 
    def collect_result(self, task_id: str, worker_url: str, results: List[dict]):
        """收集工作节点的结果"""
        if task_id in self.tasks:
            self.tasks[task_id]["results"].extend(results)
            print(f"收到来自 {worker_url}{len(results)} 条结果")
            return {"success": True}
        return {"error": "任务不存在"}
 
# 启动 Master 节点
if __name__ == "__main__":
    manager = TaskManager()
    serve(manager, mode="http", ident=8080)

Worker 节点实现

# worker.py
import requests
from typing import List
from bricks import Request
from bricks.spider import air
from bricks.rpc.common import serve
from bricks.rpc.http_.service import Client
 
class CrawlWorker:
    """爬取工作节点"""
 
    def __init__(self, master_url: str, worker_port: int):
        self.master_url = master_url
        self.worker_port = worker_port
        self.master_client = Client(master_url)
        self.spider = self._create_spider()
 
    def _create_spider(self):
        """创建爬虫实例"""
        class WorkerSpider(air.Spider):
            def make_request(self, context):
                url = context.seeds["url"]
                return Request(url=url)
 
            def parse(self, context):
                response = context.response
                return {
                    "url": response.url,
                    "title": response.xpath("//title/text()").get(""),
                    "status_code": response.status_code,
                    "content_length": len(response.text)
                }
 
        return WorkerSpider()
 
    def crawl_urls(self, urls: List[str], config: dict = None):
        """爬取URL列表"""
        print(f"开始爬取 {len(urls)} 个URL")
 
        results = []
        for url in urls:
            try:
                # 使用爬虫爬取单个URL
                context = self.spider.Context(
                    target=self.spider,
                    seeds={"url": url}
                )
 
                request = self.spider.make_request(context)
                # 这里简化处理,实际应该使用完整的爬虫流程
                response = requests.get(request.url, timeout=10)
 
                result = {
                    "url": url,
                    "status_code": response.status_code,
                    "content_length": len(response.text),
                    "success": True
                }
                results.append(result)
 
            except Exception as e:
                results.append({
                    "url": url,
                    "error": str(e),
                    "success": False
                })
 
        print(f"爬取完成,成功: {sum(1 for r in results if r.get('success'))}")
        return {"results": results, "worker_url": f"localhost:{self.worker_port}"}
 
    def register_to_master(self):
        """向主节点注册"""
        try:
            response = self.master_client.rpc(
                "register_worker",
                f"localhost:{self.worker_port}"
            )
            print(f"注册到主节点成功: {response.data}")
        except Exception as e:
            print(f"注册到主节点失败: {e}")
 
# 启动 Worker 节点
if __name__ == "__main__":
    import sys
 
    master_url = "localhost:8080"
    worker_port = int(sys.argv[1]) if len(sys.argv) > 1 else 8081
 
    worker = CrawlWorker(master_url, worker_port)
 
    # 启动服务后注册到主节点
    def on_server_started(port):
        worker.register_to_master()
 
    serve(worker, mode="http", ident=worker_port, on_server_started=on_server_started)

客户端使用示例

# client.py
from bricks.rpc.http_.service import Client
import time
 
def main():
    # 连接到 Master 节点
    master = Client("localhost:8080")
 
    # 提交爬取任务
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml",
        "https://httpbin.org/robots.txt"
    ]
 
    response = master.rpc("submit_crawl_task", urls)
    task_id = response.data.get("task_id")
    print(f"任务已提交: {task_id}")
 
    # 轮询任务状态
    while True:
        status_response = master.rpc("get_task_status", task_id)
        status = status_response.data
        print(f"任务状态: {status}")
 
        if status.get("result_count", 0) >= len(urls):
            print("任务完成!")
            break
 
        time.sleep(2)
 
if __name__ == "__main__":
    main()

微服务架构示例

用户服务

# user_service.py
from bricks.rpc.common import serve
from bricks.rpc.grpc_.service import Client
 
class UserService:
    """用户服务"""
 
    def __init__(self):
        self.users = {
            1: {"id": 1, "name": "张三", "email": "zhangsan@example.com"},
            2: {"id": 2, "name": "李四", "email": "lisi@example.com"}
        }
        # 连接到其他服务
        self.order_service = Client("localhost:50052")
 
    def get_user(self, user_id: int):
        """获取用户信息"""
        user = self.users.get(user_id)
        if not user:
            return {"error": "用户不存在"}
        return user
 
    def get_user_with_orders(self, user_id: int):
        """获取用户信息及其订单"""
        user = self.get_user(user_id)
        if "error" in user:
            return user
 
        # 调用订单服务
        try:
            orders_response = self.order_service.rpc("get_user_orders", user_id)
            orders = orders_response.data if orders_response.code == 0 else []
            user["orders"] = orders
        except Exception as e:
            user["orders"] = []
            user["order_error"] = str(e)
 
        return user
 
    def create_user(self, name: str, email: str):
        """创建用户"""
        user_id = max(self.users.keys()) + 1 if self.users else 1
        user = {"id": user_id, "name": name, "email": email}
        self.users[user_id] = user
        return user
 
if __name__ == "__main__":
    serve(UserService(), mode="grpc", ident=50051)

订单服务

# order_service.py
from bricks.rpc.common import serve
 
class OrderService:
    """订单服务"""
 
    def __init__(self):
        self.orders = {
            1: [
                {"id": 101, "amount": 99.99, "status": "completed"},
                {"id": 102, "amount": 149.99, "status": "pending"}
            ],
            2: [
                {"id": 103, "amount": 79.99, "status": "completed"}
            ]
        }
 
    def get_user_orders(self, user_id: int):
        """获取用户订单"""
        return self.orders.get(user_id, [])
 
    def create_order(self, user_id: int, amount: float):
        """创建订单"""
        order_id = 200 + len([o for orders in self.orders.values() for o in orders])
        order = {"id": order_id, "amount": amount, "status": "pending"}
 
        if user_id not in self.orders:
            self.orders[user_id] = []
        self.orders[user_id].append(order)
 
        return order
 
if __name__ == "__main__":
    serve(OrderService(), mode="grpc", ident=50052)

API 网关

# api_gateway.py
from bricks.rpc.common import serve
from bricks.rpc.grpc_.service import Client
 
class ApiGateway:
    """API 网关"""
 
    def __init__(self):
        self.user_service = Client("localhost:50051")
        self.order_service = Client("localhost:50052")
 
    def get_user_profile(self, user_id: int):
        """获取用户完整档案"""
        try:
            response = self.user_service.rpc("get_user_with_orders", user_id)
            return response.data if response.code == 0 else {"error": response.message}
        except Exception as e:
            return {"error": f"服务调用失败: {e}"}
 
    def create_user_order(self, user_id: int, amount: float):
        """为用户创建订单"""
        # 先验证用户存在
        user_response = self.user_service.rpc("get_user", user_id)
        if user_response.code != 0:
            return {"error": "用户不存在"}
 
        # 创建订单
        try:
            order_response = self.order_service.rpc("create_order", user_id, amount)
            return order_response.data if order_response.code == 0 else {"error": order_response.message}
        except Exception as e:
            return {"error": f"创建订单失败: {e}"}
 
if __name__ == "__main__":
    serve(ApiGateway(), mode="http", ident=8080)

实时监控系统

监控服务

# monitor_service.py
import time
import psutil
import threading
from bricks.rpc.common import serve
 
class MonitorService:
    """系统监控服务"""
 
    def __init__(self):
        self.metrics = {}
        self.subscribers = set()
        self.running = True
 
        # 启动监控线程
        self.monitor_thread = threading.Thread(target=self._collect_metrics, daemon=True)
        self.monitor_thread.start()
 
    def _collect_metrics(self):
        """收集系统指标"""
        while self.running:
            self.metrics = {
                "timestamp": time.time(),
                "cpu_percent": psutil.cpu_percent(),
                "memory_percent": psutil.virtual_memory().percent,
                "disk_percent": psutil.disk_usage('/').percent,
                "network_io": psutil.net_io_counters()._asdict()
            }
            time.sleep(5)
 
    def get_current_metrics(self):
        """获取当前指标"""
        return self.metrics
 
    def get_system_info(self):
        """获取系统信息"""
        return {
            "cpu_count": psutil.cpu_count(),
            "memory_total": psutil.virtual_memory().total,
            "disk_total": psutil.disk_usage('/').total,
            "boot_time": psutil.boot_time()
        }
 
    def subscribe_metrics(self, client_id: str):
        """订阅指标推送"""
        self.subscribers.add(client_id)
        return {"subscribed": True, "client_id": client_id}
 
    def unsubscribe_metrics(self, client_id: str):
        """取消订阅"""
        self.subscribers.discard(client_id)
        return {"unsubscribed": True, "client_id": client_id}
 
if __name__ == "__main__":
    serve(MonitorService(), mode="websocket", ident=8080)

监控客户端

# monitor_client.py
import json
import time
from bricks.rpc.websocket_.service import Client
 
class MonitorClient:
    """监控客户端"""
 
    def __init__(self, server_url: str):
        self.client = Client(server_url)
        self.client_id = f"client_{int(time.time())}"
 
    def start_monitoring(self):
        """开始监控"""
        try:
            # 订阅指标
            response = self.client.rpc("subscribe_metrics", self.client_id)
            print(f"订阅成功: {response.data}")
 
            # 获取系统信息
            info_response = self.client.rpc("get_system_info")
            print(f"系统信息: {json.dumps(info_response.data, indent=2)}")
 
            # 持续获取指标
            while True:
                metrics_response = self.client.rpc("get_current_metrics")
                if metrics_response.code == 0:
                    metrics = metrics_response.data
                    print(f"CPU: {metrics['cpu_percent']}%, "
                          f"内存: {metrics['memory_percent']}%, "
                          f"磁盘: {metrics['disk_percent']}%")
 
                time.sleep(5)
 
        except KeyboardInterrupt:
            print("停止监控...")
        finally:
            # 取消订阅
            self.client.rpc("unsubscribe_metrics", self.client_id)
            self.client.close()
 
if __name__ == "__main__":
    client = MonitorClient("ws://localhost:8080")
    client.start_monitoring()

部署和运维

Docker 部署

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8080

CMD ["python", "service.py"]

Docker Compose

# docker-compose.yml
version: "3.8"
 
services:
  master:
    build: .
    command: python master.py
    ports:
      - "8080:8080"
    environment:
      - SERVICE_TYPE=master
 
  worker1:
    build: .
    command: python worker.py 8081
    ports:
      - "8081:8081"
    depends_on:
      - master
    environment:
      - SERVICE_TYPE=worker
      - MASTER_URL=master:8080
 
  worker2:
    build: .
    command: python worker.py 8082
    ports:
      - "8082:8082"
    depends_on:
      - master
    environment:
      - SERVICE_TYPE=worker
      - MASTER_URL=master:8080
 
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

健康检查

# health_check.py
from bricks.rpc.http_.service import Client
 
def health_check(service_url: str):
    """服务健康检查"""
    try:
        client = Client(service_url)
        response = client.rpc("PING")
        return response.code == 0 and response.data == "PONG"
    except:
        return False
 
def check_all_services():
    """检查所有服务"""
    services = {
        "master": "localhost:8080",
        "worker1": "localhost:8081",
        "worker2": "localhost:8082"
    }
 
    for name, url in services.items():
        status = "健康" if health_check(url) else "异常"
        print(f"{name}: {status}")
 
if __name__ == "__main__":
    check_all_services()

爬虫 RPC 服务

使用 Rpc.wrap 包装爬虫

Bricks 提供了 Rpc.wrap 方法,可以轻松将爬虫包装成 RPC 服务:

# spider_rpc_server.py
from loguru import logger
from bricks.spider.addon import Rpc
from bricks.spider import air
from bricks import Request
 
class MySpider(air.Spider):
    """示例爬虫"""
 
    def make_seeds(self, context, **kwargs):
        """生成种子数据"""
        return [{"page": i} for i in range(1, 6)]
 
    def make_request(self, context) -> Request:
        """构造请求"""
        page = context.seeds["page"]
        return Request(
            url=f"https://httpbin.org/json?page={page}",
            headers={
                'User-Agent': 'Bricks Spider 1.0'
            }
        )
 
    def parse(self, context):
        """解析响应"""
        response = context.response
        return response.extract(
            engine="json",
            rules={
                "args": {
                    "page": "page"
                },
                "headers": {
                    "user_agent": "User-Agent"
                }
            }
        )
 
    def item_pipeline(self, context):
        """处理数据"""
        items = context.items
        logger.info(f"处理了 {len(items)} 条数据")
        # 确认处理完成
        context.success()
 
def on_finish(ctx: MySpider.Context, err: Exception):
    """爬虫完成回调"""
    if err:
        logger.error(f"爬虫执行出错: {err}")
    else:
        logger.info(f"爬虫执行完成: {ctx.response.url}")
 
if __name__ == '__main__':
    # 包装爬虫为 RPC 服务
    rpc = Rpc.wrap(MySpider)
 
    # 添加完成回调
    rpc.with_callback(on_finish)
 
    # 启动 RPC 服务
    rpc.serve(
        concurrency=10,
        mode="http",        # 可选: http, websocket, socket, grpc, redis
        ident=8080
    )

RPC 爬虫客户端调用

# spider_rpc_client.py
from bricks.rpc.http_.service import Client
import json
 
def test_spider_rpc():
    """测试爬虫 RPC 调用"""
    client = Client("localhost:8080")
 
    # 调用爬虫执行方法
    response = client.rpc("execute", {"custom_param": "test_value"})
 
    if response.code == 0:
        result = json.loads(response.data)
        print(f"爬虫执行成功: {result}")
    else:
        print(f"爬虫执行失败: {response.message}")
 
def test_spider_with_config():
    """测试带配置的爬虫调用"""
    client = Client("localhost:8080")
 
    # 传递配置参数
    config = {
        "proxy": "http://127.0.0.1:8080",
        "timeout": 30,
        "max_retry": 3
    }
 
    response = client.rpc("execute", config)
    print(f"响应: {response.data}")
 
if __name__ == '__main__':
    test_spider_rpc()
    test_spider_with_config()

分布式爬虫任务调度

# distributed_crawler.py
from bricks.spider.addon import Rpc
from bricks.spider import air
from bricks.rpc.http_.service import Client
import threading
import time
 
class DistributedCrawler:
    """分布式爬虫调度器"""
 
    def __init__(self):
        self.workers = []  # 工作节点列表
        self.task_queue = []  # 任务队列
        self.results = []  # 结果收集
 
    def register_worker(self, worker_url: str):
        """注册工作节点"""
        self.workers.append(worker_url)
        print(f"工作节点已注册: {worker_url}")
 
    def submit_crawl_task(self, urls: list, spider_config: dict = None):
        """提交爬取任务"""
        task_id = int(time.time())
 
        # 将URL分配给不同的工作节点
        chunk_size = len(urls) // len(self.workers) if self.workers else len(urls)
 
        for i, worker_url in enumerate(self.workers):
            start_idx = i * chunk_size
            end_idx = start_idx + chunk_size if i < len(self.workers) - 1 else len(urls)
            worker_urls = urls[start_idx:end_idx]
 
            if worker_urls:
                # 异步分发任务
                thread = threading.Thread(
                    target=self._dispatch_task,
                    args=(worker_url, worker_urls, spider_config, task_id)
                )
                thread.start()
 
        return {"task_id": task_id, "worker_count": len(self.workers)}
 
    def _dispatch_task(self, worker_url: str, urls: list, config: dict, task_id: int):
        """分发任务到工作节点"""
        try:
            client = Client(worker_url)
 
            # 构造爬虫配置
            spider_config = {
                "urls": urls,
                "task_id": task_id,
                **(config or {})
            }
 
            response = client.rpc("execute", spider_config)
 
            if response.code == 0:
                print(f"任务 {task_id}{worker_url} 执行成功")
                self.results.append({
                    "task_id": task_id,
                    "worker": worker_url,
                    "result": response.data
                })
            else:
                print(f"任务 {task_id}{worker_url} 执行失败: {response.message}")
 
        except Exception as e:
            print(f"分发任务到 {worker_url} 失败: {e}")
 
# 启动分布式调度器
if __name__ == '__main__':
    scheduler = DistributedCrawler()
 
    # 注册工作节点
    scheduler.register_worker("localhost:8081")
    scheduler.register_worker("localhost:8082")
 
    # 提交爬取任务
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml",
        "https://httpbin.org/robots.txt"
    ]
 
    result = scheduler.submit_crawl_task(urls, {"timeout": 30})
    print(f"任务已提交: {result}")

Redis 队列模式爬虫

# redis_queue_spider.py
from bricks.spider.addon import Rpc
from bricks.spider import air
from bricks import Request
import json
 
class QueueSpider(air.Spider):
    """队列模式爬虫"""
 
    def make_seeds(self, context, **kwargs):
        """从参数中获取种子"""
        urls = kwargs.get("urls", [])
        return [{"url": url} for url in urls]
 
    def make_request(self, context) -> Request:
        """构造请求"""
        url = context.seeds["url"]
        return Request(url=url)
 
    def parse(self, context):
        """解析响应"""
        response = context.response
        return {
            "url": response.url,
            "status_code": response.status_code,
            "title": response.xpath("//title/text()").get(""),
            "content_length": len(response.text)
        }
 
    def item_pipeline(self, context):
        """处理结果"""
        items = context.items
        print(f"处理URL: {context.seeds['url']}, 结果: {items}")
        context.success()
 
if __name__ == '__main__':
    # 使用 Redis 队列模式
    rpc = Rpc.wrap(QueueSpider)
    rpc.serve(
        concurrency=20,
        mode="redis",
        ident="redis://localhost:6379/0",
        server_id="crawler_worker_001"
    )

Redis 队列客户端

# redis_queue_client.py
from bricks.rpc.redis_.service import Client
import time
 
def submit_crawl_jobs():
    """提交爬取任务到 Redis 队列"""
    client = Client("redis://localhost:6379/0?server_id=crawler_worker_001")
 
    # 提交多个爬取任务
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml"
    ]
 
    for url in urls:
        response = client.rpc("execute", {"urls": [url]})
        print(f"任务已提交: {url}, 响应: {response.data}")
        time.sleep(1)
 
if __name__ == '__main__':
    submit_crawl_jobs()

命令行工具集成

使用 RpcProxy 运行器

# runner_example.py
from bricks.client.runner import RpcProxy
from bricks.spider import air
from bricks import Request
 
class CommandSpider(air.Spider):
    """命令行爬虫"""
 
    def make_seeds(self, context, **kwargs):
        # 从命令行参数获取URL
        urls = kwargs.get("urls", ["https://httpbin.org/json"])
        return [{"url": url} for url in urls]
 
    def make_request(self, context) -> Request:
        return Request(url=context.seeds["url"])
 
    def parse(self, context):
        response = context.response
        return response.json()
 
    def item_pipeline(self, context):
        print(f"结果: {context.items}")
        context.success()
 
def main(urls=None, **kwargs):
    """主函数"""
    urls = urls or ["https://httpbin.org/json"]
    spider = CommandSpider()
 
    # 创建 RPC 代理
    proxy = RpcProxy(lambda: spider.run(urls=urls))
 
    # 绑定爬虫对象
    proxy.bind(spider)
 
    # 注册自定义方法
    proxy.register_adapter("get_status", lambda: "爬虫运行中")
    proxy.register_adapter("stop_spider", lambda: proxy.stop(delay=2))
 
    return proxy
 
if __name__ == '__main__':
    # 启动 RPC 服务
    proxy = main()
    proxy.start(mode="http", ident=8080)

命令行调用示例

# 启动爬虫 RPC 服务
python runner_example.py
 
# 在另一个终端调用
curl -X POST http://localhost:8080/rpc \
  -H "Content-Type: application/json" \
  -d '{
    "method": "get_status",
    "data": "{\"args\": [], \"kwargs\": {}}"
  }'
 
# 停止服务
curl -X POST http://localhost:8080/rpc \
  -H "Content-Type: application/json" \
  -d '{
    "method": "stop_spider",
    "data": "{\"args\": [], \"kwargs\": {}}"
  }'

性能监控和调优

RPC 性能监控

# rpc_monitor.py
import time
import threading
from collections import defaultdict
from bricks.rpc.common import BaseRpcService, RpcRequest, RpcResponse
 
class MonitoredRpcService(BaseRpcService):
    """带监控的 RPC 服务"""
 
    def __init__(self):
        super().__init__()
        self.metrics = defaultdict(list)
        self.request_count = 0
        self.error_count = 0
 
    async def process_rpc_request(self, request: RpcRequest) -> RpcResponse:
        """处理请求并记录指标"""
        start_time = time.time()
        self.request_count += 1
 
        try:
            response = await super().process_rpc_request(request)
 
            # 记录成功指标
            duration = time.time() - start_time
            self.metrics[request.method].append({
                "duration": duration,
                "success": response.code == 0,
                "timestamp": start_time
            })
 
            return response
 
        except Exception as e:
            self.error_count += 1
            duration = time.time() - start_time
            self.metrics[request.method].append({
                "duration": duration,
                "success": False,
                "error": str(e),
                "timestamp": start_time
            })
            raise
 
    def get_metrics(self):
        """获取性能指标"""
        summary = {}
        for method, records in self.metrics.items():
            if records:
                durations = [r["duration"] for r in records]
                success_count = sum(1 for r in records if r["success"])
 
                summary[method] = {
                    "total_calls": len(records),
                    "success_calls": success_count,
                    "error_calls": len(records) - success_count,
                    "avg_duration": sum(durations) / len(durations),
                    "max_duration": max(durations),
                    "min_duration": min(durations)
                }
 
        return {
            "total_requests": self.request_count,
            "total_errors": self.error_count,
            "methods": summary
        }
 
# 使用监控服务
service = MonitoredRpcService()
service.bind_target(MySpider())

负载测试

# load_test.py
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
 
async def load_test_async(url: str, concurrent: int, total_requests: int):
    """异步负载测试"""
 
    async def single_request(session, request_id):
        try:
            start_time = time.time()
            async with session.post(
                f"{url}/rpc",
                json={
                    "method": "PING",
                    "data": '{"args": [], "kwargs": {}}',
                    "request_id": f"req_{request_id}"
                }
            ) as response:
                duration = time.time() - start_time
                status = response.status
                return {"success": status == 200, "duration": duration}
        except Exception as e:
            return {"success": False, "error": str(e)}
 
    # 创建连接池
    connector = aiohttp.TCPConnector(limit=concurrent)
    async with aiohttp.ClientSession(connector=connector) as session:
 
        # 分批发送请求
        results = []
        for batch_start in range(0, total_requests, concurrent):
            batch_size = min(concurrent, total_requests - batch_start)
            tasks = [
                single_request(session, batch_start + i)
                for i in range(batch_size)
            ]
 
            batch_results = await asyncio.gather(*tasks)
            results.extend(batch_results)
 
            print(f"完成 {len(results)}/{total_requests} 请求")
 
    # 统计结果
    success_count = sum(1 for r in results if r["success"])
    durations = [r["duration"] for r in results if "duration" in r]
 
    print(f"总请求数: {total_requests}")
    print(f"成功请求: {success_count}")
    print(f"失败请求: {total_requests - success_count}")
    print(f"平均响应时间: {sum(durations) / len(durations):.3f}s")
    print(f"最大响应时间: {max(durations):.3f}s")
    print(f"最小响应时间: {min(durations):.3f}s")
 
if __name__ == '__main__':
    asyncio.run(load_test_async("http://localhost:8080", concurrent=50, total_requests=1000))

通过这些实际应用示例,可以看到 Bricks RPC 功能在构建分布式系统、微服务架构、实时监控系统以及爬虫服务中的强大能力。无论是简单的服务调用还是复杂的分布式架构,都能通过 RPC 功能轻松实现。