2.5.2 RPC 服务端
Bricks
框架提供了简单易用的 RPC 服务端实现,支持多种协议和灵活的服务配置。
基本服务端实现
1. 简单服务定义
class CalculatorService:
"""计算器服务示例"""
def add(self, a, b):
"""加法运算"""
return {"result": a + b, "operation": "add"}
def subtract(self, a, b):
"""减法运算"""
return {"result": a - b, "operation": "subtract"}
def multiply(self, a, b):
"""乘法运算"""
return {"result": a * b, "operation": "multiply"}
def divide(self, a, b):
"""除法运算"""
if b == 0:
raise ValueError("除数不能为零")
return {"result": a / b, "operation": "divide"}
2. 启动服务
from bricks.rpc.common import serve
# 方式一:使用 serve 函数(同步启动)
serve(
CalculatorService(),
mode="http",
concurrency=10,
ident=8080
)
# 方式二:使用 start_rpc_server(异步启动)
import asyncio
from bricks.rpc.common import start_rpc_server
async def main():
await start_rpc_server(
CalculatorService(),
mode="http",
concurrency=10,
ident=8080
)
asyncio.run(main())
服务端参数配置
serve 函数参数
参数名 | 参数类型 | 参数描述 | 默认值 |
---|---|---|---|
*obj | Any | 要暴露的服务对象或函数 | 必传 |
mode | MODE | RPC 协议模式 | "http" |
concurrency | int | 并发处理数量 | 10 |
ident | Any | 服务标识(端口号或地址) | 0 |
on_server_started | Callable | 服务启动回调函数 | None |
**kwargs | dict | 其他协议特定参数 | - |
示例:完整配置
from bricks.rpc.common import serve
def on_server_ready(port):
print(f"服务已启动,监听端口: {port}")
serve(
CalculatorService(),
UserService(), # 可以同时暴露多个服务
mode="http",
concurrency=20, # 20个并发处理线程
ident=8080, # 指定端口
on_server_started=on_server_ready,
# HTTP 特定参数
host="0.0.0.0", # 监听地址
timeout=30 # 请求超时时间
)
多服务对象支持
1. 组合多个服务类
class UserService:
def get_user(self, user_id):
return {"id": user_id, "name": f"用户{user_id}"}
def create_user(self, name, email):
return {"id": 123, "name": name, "email": email}
class OrderService:
def get_order(self, order_id):
return {"id": order_id, "amount": 100.0}
def create_order(self, user_id, amount):
return {"id": 456, "user_id": user_id, "amount": amount}
# 同时暴露多个服务
serve(
UserService(),
OrderService(),
mode="http",
ident=8080
)
2. 混合函数和类
def get_server_status():
"""获取服务器状态"""
return {"status": "running", "uptime": "2 hours"}
def ping():
"""健康检查"""
return "pong"
class DataService:
def save_data(self, data):
return {"saved": True, "id": 789}
# 混合暴露函数和类
serve(
get_server_status,
ping,
DataService(),
mode="http",
ident=8080
)
异步服务支持
1. 异步方法定义
import asyncio
import aiohttp
class AsyncWebService:
"""异步 Web 服务示例"""
async def fetch_url(self, url):
"""异步获取网页内容"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return {
"url": url,
"status": response.status,
"content_length": len(content)
}
async def batch_fetch(self, urls):
"""批量异步获取"""
tasks = [self.fetch_url(url) for url in urls]
results = await asyncio.gather(*tasks)
return {"results": results, "count": len(results)}
def sync_method(self, data):
"""同步方法也支持"""
return {"processed": data}
# 启动异步服务
serve(AsyncWebService(), mode="http", ident=8080)
2. 混合同步异步
class MixedService:
def __init__(self):
self.cache = {}
def get_cache(self, key):
"""同步获取缓存"""
return self.cache.get(key, "Not found")
def set_cache(self, key, value):
"""同步设置缓存"""
self.cache[key] = value
return {"success": True}
async def async_process(self, data):
"""异步处理数据"""
await asyncio.sleep(1) # 模拟异步操作
processed = data.upper()
return {"original": data, "processed": processed}
serve(MixedService(), mode="http", ident=8080)
错误处理和异常管理
1. 自定义异常处理
class BusinessException(Exception):
"""业务异常"""
def __init__(self, message, code=400):
self.message = message
self.code = code
super().__init__(message)
class ValidationService:
def validate_email(self, email):
"""邮箱验证"""
if "@" not in email:
raise BusinessException("邮箱格式不正确", 400)
return {"valid": True, "email": email}
def validate_age(self, age):
"""年龄验证"""
if age < 0 or age > 150:
raise BusinessException("年龄必须在0-150之间", 400)
return {"valid": True, "age": age}
serve(ValidationService(), mode="http", ident=8080)
2. 全局异常处理
from bricks.rpc.common import BaseRpcService
class CustomRpcService(BaseRpcService):
"""自定义 RPC 服务,添加全局异常处理"""
async def process_rpc_request(self, request):
try:
return await super().process_rpc_request(request)
except BusinessException as e:
return RpcResponse(
message=e.message,
code=e.code,
request_id=request.request_id
)
except Exception as e:
# 记录详细错误日志
logger.error(f"未处理的异常: {e}", exc_info=True)
return RpcResponse(
message="服务器内部错误",
code=500,
request_id=request.request_id
)
# 使用自定义服务
service = CustomRpcService()
service.bind_target(ValidationService())
await service.serve(concurrency=10, ident=8080)
服务生命周期管理
1. 服务启动回调
def on_server_started(port):
"""服务启动回调"""
print(f"🚀 RPC 服务已启动")
print(f"📡 监听端口: {port}")
print(f"🔗 访问地址: http://localhost:{port}/rpc")
# 可以在这里执行初始化操作
# 比如:注册服务到注册中心、启动健康检查等
serve(
CalculatorService(),
mode="http",
ident=8080,
on_server_started=on_server_started
)
2. 优雅关闭
import signal
import sys
class GracefulService:
def __init__(self):
self.is_shutting_down = False
# 注册信号处理器
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
"""信号处理器"""
print(f"收到信号 {signum},开始优雅关闭...")
self.is_shutting_down = True
# 执行清理操作
self.cleanup()
sys.exit(0)
def cleanup(self):
"""清理资源"""
print("正在清理资源...")
# 关闭数据库连接、保存状态等
def process_data(self, data):
"""处理数据"""
if self.is_shutting_down:
return {"error": "服务正在关闭"}
return {"processed": data}
serve(GracefulService(), mode="http", ident=8080)
性能优化
1. 并发配置优化
import os
# 根据 CPU 核心数配置并发
cpu_count = os.cpu_count()
optimal_concurrency = cpu_count * 2
serve(
MyService(),
mode="http",
concurrency=optimal_concurrency,
ident=8080
)
2. 内存优化
class OptimizedService:
def __init__(self):
# 使用对象池减少内存分配
self.object_pool = []
def process_large_data(self, data):
"""处理大数据量"""
# 流式处理,避免一次性加载到内存
result = []
for chunk in self.chunk_data(data, 1000):
processed_chunk = self.process_chunk(chunk)
result.append(processed_chunk)
return {"chunks_processed": len(result)}
def chunk_data(self, data, chunk_size):
"""数据分块"""
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
def process_chunk(self, chunk):
"""处理数据块"""
return len(chunk)
serve(OptimizedService(), mode="http", ident=8080)
通过这些配置和优化,可以构建高性能、稳定可靠的 RPC 服务端。