2.5.1 RPC 概述
Bricks
框架提供了强大的 RPC(Remote Procedure Call,远程过程调用)功能,支持多种通信协议,使得爬虫服务可以轻松地进行分布式部署和远程调用。
RPC 功能特性
多协议支持
Bricks
内置支持多种 RPC 通信协议:
协议 | 描述 | 适用场景 |
---|---|---|
HTTP | 基于 HTTP 协议的 RPC | Web 服务、API 调用、跨语言调用 |
WebSocket | 基于 WebSocket 的实时通信 | 实时数据推送、长连接场景 |
Socket | 基于 TCP Socket 的通信 | 高性能、低延迟场景 |
gRPC | 基于 HTTP/2 的高性能 RPC | 微服务架构、高并发场景 |
Redis | 基于 Redis 的消息队列 | 异步处理、任务队列 |
统一的编程接口
无论使用哪种协议,Bricks
都提供了统一的编程接口:
from bricks.rpc.common import serve
# 服务端 - 统一的启动方式
serve(
my_service,
mode="http", # 选择协议
concurrency=10, # 并发数
ident=8080 # 端口或标识
)
# 客户端 - 统一的调用方式
client.rpc("method_name", arg1, arg2, kwarg1="value")
自动序列化
Bricks
提供了智能的序列化机制:
- 自动 JSON 序列化:支持基本数据类型的自动序列化
- 自定义序列化:对象可实现
on_json_dump()
方法自定义序列化 - 容错处理:无法序列化的对象自动降级为字符串
核心组件
1. 统一的数据结构
RpcRequest - 请求对象
class RpcRequest:
def __init__(self, method: str, data: str, request_id: str = None):
self.method = method # 方法名
self.data = data # 请求数据(JSON字符串)
self.request_id = request_id # 请求ID
RpcResponse - 响应对象
class RpcResponse:
def __init__(self, data: str = "", message: str = "", code: int = 0, request_id: str = ""):
self.data = data # 响应数据
self.message = message # 响应消息
self.code = code # 状态码(0表示成功)
self.request_id = request_id # 请求ID
2. 多对象代理
MultiObjectProxy
支持将多个对象或函数组合成一个 RPC 服务:
from bricks.rpc.common import MultiObjectProxy
# 组合多个服务对象
class UserService:
def get_user(self, user_id):
return {"id": user_id, "name": "张三"}
class OrderService:
def get_order(self, order_id):
return {"id": order_id, "amount": 100}
def get_status():
return "服务正常"
# 创建多对象代理
proxy = MultiObjectProxy([UserService, OrderService, get_status])
# 支持多种调用方式
proxy.get_user(123) # 按顺序查找方法
proxy.UserService.get_user(123) # 类名.方法名调用
proxy.get_status() # 函数调用
3. 异步处理支持
Bricks
RPC 框架完全支持异步处理:
import asyncio
class AsyncService:
async def async_method(self, data):
await asyncio.sleep(1) # 模拟异步操作
return f"处理完成: {data}"
def sync_method(self, data):
return f"同步处理: {data}"
# 框架自动识别并正确处理异步和同步方法
基本使用流程
1. 定义服务
class CalculatorService:
def add(self, a, b):
"""加法运算"""
return a + b
def multiply(self, a, b):
"""乘法运算"""
return a * b
def divide(self, a, b):
"""除法运算"""
if b == 0:
raise ValueError("除数不能为零")
return a / b
2. 启动服务端
from bricks.rpc.common import serve
# 启动 HTTP RPC 服务
serve(
CalculatorService(),
mode="http",
concurrency=10,
ident=8080
)
3. 客户端调用
from bricks.rpc.http_.service import Client
# 创建客户端
client = Client("localhost:8080")
# 调用远程方法
result = client.rpc("add", 10, 20)
print(result.data) # 输出: 30
result = client.rpc("multiply", 5, 6)
print(result.data) # 输出: 30
错误处理
Bricks
RPC 提供了完善的错误处理机制:
状态码说明
状态码 | 含义 | 描述 |
---|---|---|
0 | 成功 | 请求处理成功 |
400 | 客户端错误 | 请求参数错误、JSON 格式错误等 |
404 | 方法不存在 | 调用的方法未找到 |
500 | 服务器错误 | 服务器内部错误 |
501 | 未实现 | 方法未实现 |
异常处理示例
from bricks.rpc.http_.service import Client
client = Client("localhost:8080")
try:
result = client.rpc("divide", 10, 0)
if result.code != 0:
print(f"调用失败: {result.message}")
else:
print(f"结果: {result.data}")
except Exception as e:
print(f"网络错误: {e}")
性能特性
1. 并发处理
- 线程池支持:自动管理线程池处理同步方法
- 异步支持:原生支持异步方法,提高并发性能
- 可配置并发数:根据需求调整并发处理能力
2. 连接管理
- 连接复用:支持长连接和连接池
- 自动重连:网络异常时自动重连
- 超时控制:可配置请求超时时间
3. 内存优化
- 流式处理:大数据量时支持流式传输
- 智能缓存:自动缓存序列化结果
- 资源回收:自动管理连接和内存资源
应用场景
1. 分布式爬虫
# 主控节点
class MasterService:
def distribute_task(self, urls):
# 分发爬取任务到工作节点
pass
# 工作节点
class WorkerService:
def crawl_url(self, url):
# 执行具体的爬取任务
pass
2. 微服务架构
# 用户服务
class UserService:
def authenticate(self, token):
pass
# 数据服务
class DataService:
def save_data(self, data):
pass
# 通知服务
class NotificationService:
def send_notification(self, message):
pass
3. 实时数据处理
# 使用 WebSocket 进行实时数据推送
class RealtimeService:
def subscribe_data(self, topic):
# 订阅实时数据
pass
def push_data(self, data):
# 推送数据到客户端
pass
Bricks
的 RPC 功能为构建分布式、高性能的爬虫系统提供了强大的基础设施支持。