bricks
开发指南
2.5 RPC篇
2.5.1 RPC 概述

2.5.1 RPC 概述

Bricks 框架提供了强大的 RPC(Remote Procedure Call,远程过程调用)功能,支持多种通信协议,使得爬虫服务可以轻松地进行分布式部署和远程调用。

RPC 功能特性

多协议支持

Bricks 内置支持多种 RPC 通信协议:

协议描述适用场景
HTTP基于 HTTP 协议的 RPCWeb 服务、API 调用、跨语言调用
WebSocket基于 WebSocket 的实时通信实时数据推送、长连接场景
Socket基于 TCP Socket 的通信高性能、低延迟场景
gRPC基于 HTTP/2 的高性能 RPC微服务架构、高并发场景
Redis基于 Redis 的消息队列异步处理、任务队列

统一的编程接口

无论使用哪种协议,Bricks 都提供了统一的编程接口:

from bricks.rpc.common import serve
 
# 服务端 - 统一的启动方式
serve(
    my_service,
    mode="http",        # 选择协议
    concurrency=10,     # 并发数
    ident=8080         # 端口或标识
)
 
# 客户端 - 统一的调用方式
client.rpc("method_name", arg1, arg2, kwarg1="value")

自动序列化

Bricks 提供了智能的序列化机制:

  • 自动 JSON 序列化:支持基本数据类型的自动序列化
  • 自定义序列化:对象可实现 on_json_dump() 方法自定义序列化
  • 容错处理:无法序列化的对象自动降级为字符串

核心组件

1. 统一的数据结构

RpcRequest - 请求对象

class RpcRequest:
    def __init__(self, method: str, data: str, request_id: str = None):
        self.method = method        # 方法名
        self.data = data           # 请求数据(JSON字符串)
        self.request_id = request_id  # 请求ID

RpcResponse - 响应对象

class RpcResponse:
    def __init__(self, data: str = "", message: str = "", code: int = 0, request_id: str = ""):
        self.data = data           # 响应数据
        self.message = message     # 响应消息
        self.code = code          # 状态码(0表示成功)
        self.request_id = request_id  # 请求ID

2. 多对象代理

MultiObjectProxy 支持将多个对象或函数组合成一个 RPC 服务:

from bricks.rpc.common import MultiObjectProxy
 
# 组合多个服务对象
class UserService:
    def get_user(self, user_id):
        return {"id": user_id, "name": "张三"}
 
class OrderService:
    def get_order(self, order_id):
        return {"id": order_id, "amount": 100}
 
def get_status():
    return "服务正常"
 
# 创建多对象代理
proxy = MultiObjectProxy([UserService, OrderService, get_status])
 
# 支持多种调用方式
proxy.get_user(123)                    # 按顺序查找方法
proxy.UserService.get_user(123)       # 类名.方法名调用
proxy.get_status()                     # 函数调用

3. 异步处理支持

Bricks RPC 框架完全支持异步处理:

import asyncio
 
class AsyncService:
    async def async_method(self, data):
        await asyncio.sleep(1)  # 模拟异步操作
        return f"处理完成: {data}"
    
    def sync_method(self, data):
        return f"同步处理: {data}"
 
# 框架自动识别并正确处理异步和同步方法

基本使用流程

1. 定义服务

class CalculatorService:
    def add(self, a, b):
        """加法运算"""
        return a + b
    
    def multiply(self, a, b):
        """乘法运算"""
        return a * b
    
    def divide(self, a, b):
        """除法运算"""
        if b == 0:
            raise ValueError("除数不能为零")
        return a / b

2. 启动服务端

from bricks.rpc.common import serve
 
# 启动 HTTP RPC 服务
serve(
    CalculatorService(),
    mode="http",
    concurrency=10,
    ident=8080
)

3. 客户端调用

from bricks.rpc.http_.service import Client
 
# 创建客户端
client = Client("localhost:8080")
 
# 调用远程方法
result = client.rpc("add", 10, 20)
print(result.data)  # 输出: 30
 
result = client.rpc("multiply", 5, 6)
print(result.data)  # 输出: 30

错误处理

Bricks RPC 提供了完善的错误处理机制:

状态码说明

状态码含义描述
0成功请求处理成功
400客户端错误请求参数错误、JSON 格式错误等
404方法不存在调用的方法未找到
500服务器错误服务器内部错误
501未实现方法未实现

异常处理示例

from bricks.rpc.http_.service import Client
 
client = Client("localhost:8080")
 
try:
    result = client.rpc("divide", 10, 0)
    if result.code != 0:
        print(f"调用失败: {result.message}")
    else:
        print(f"结果: {result.data}")
except Exception as e:
    print(f"网络错误: {e}")

性能特性

1. 并发处理

  • 线程池支持:自动管理线程池处理同步方法
  • 异步支持:原生支持异步方法,提高并发性能
  • 可配置并发数:根据需求调整并发处理能力

2. 连接管理

  • 连接复用:支持长连接和连接池
  • 自动重连:网络异常时自动重连
  • 超时控制:可配置请求超时时间

3. 内存优化

  • 流式处理:大数据量时支持流式传输
  • 智能缓存:自动缓存序列化结果
  • 资源回收:自动管理连接和内存资源

应用场景

1. 分布式爬虫

# 主控节点
class MasterService:
    def distribute_task(self, urls):
        # 分发爬取任务到工作节点
        pass
 
# 工作节点
class WorkerService:
    def crawl_url(self, url):
        # 执行具体的爬取任务
        pass

2. 微服务架构

# 用户服务
class UserService:
    def authenticate(self, token):
        pass
 
# 数据服务
class DataService:
    def save_data(self, data):
        pass
 
# 通知服务
class NotificationService:
    def send_notification(self, message):
        pass

3. 实时数据处理

# 使用 WebSocket 进行实时数据推送
class RealtimeService:
    def subscribe_data(self, topic):
        # 订阅实时数据
        pass
    
    def push_data(self, data):
        # 推送数据到客户端
        pass

Bricks 的 RPC 功能为构建分布式、高性能的爬虫系统提供了强大的基础设施支持。