bricks
开发指南
2.5 RPC篇
2.5.2 RPC 服务端

2.5.2 RPC 服务端

Bricks 框架提供了简单易用的 RPC 服务端实现,支持多种协议和灵活的服务配置。

基本服务端实现

1. 简单服务定义

class CalculatorService:
    """计算器服务示例"""
    
    def add(self, a, b):
        """加法运算"""
        return {"result": a + b, "operation": "add"}
    
    def subtract(self, a, b):
        """减法运算"""
        return {"result": a - b, "operation": "subtract"}
    
    def multiply(self, a, b):
        """乘法运算"""
        return {"result": a * b, "operation": "multiply"}
    
    def divide(self, a, b):
        """除法运算"""
        if b == 0:
            raise ValueError("除数不能为零")
        return {"result": a / b, "operation": "divide"}

2. 启动服务

from bricks.rpc.common import serve
 
# 方式一:使用 serve 函数(同步启动)
serve(
    CalculatorService(),
    mode="http",
    concurrency=10,
    ident=8080
)
 
# 方式二:使用 start_rpc_server(异步启动)
import asyncio
from bricks.rpc.common import start_rpc_server
 
async def main():
    await start_rpc_server(
        CalculatorService(),
        mode="http",
        concurrency=10,
        ident=8080
    )
 
asyncio.run(main())

服务端参数配置

serve 函数参数

参数名参数类型参数描述默认值
*objAny要暴露的服务对象或函数必传
modeMODERPC 协议模式"http"
concurrencyint并发处理数量10
identAny服务标识(端口号或地址)0
on_server_startedCallable服务启动回调函数None
**kwargsdict其他协议特定参数-

示例:完整配置

from bricks.rpc.common import serve
 
def on_server_ready(port):
    print(f"服务已启动,监听端口: {port}")
 
serve(
    CalculatorService(),
    UserService(),              # 可以同时暴露多个服务
    mode="http",
    concurrency=20,             # 20个并发处理线程
    ident=8080,                # 指定端口
    on_server_started=on_server_ready,
    # HTTP 特定参数
    host="0.0.0.0",            # 监听地址
    timeout=30                  # 请求超时时间
)

多服务对象支持

1. 组合多个服务类

class UserService:
    def get_user(self, user_id):
        return {"id": user_id, "name": f"用户{user_id}"}
    
    def create_user(self, name, email):
        return {"id": 123, "name": name, "email": email}
 
class OrderService:
    def get_order(self, order_id):
        return {"id": order_id, "amount": 100.0}
    
    def create_order(self, user_id, amount):
        return {"id": 456, "user_id": user_id, "amount": amount}
 
# 同时暴露多个服务
serve(
    UserService(),
    OrderService(),
    mode="http",
    ident=8080
)

2. 混合函数和类

def get_server_status():
    """获取服务器状态"""
    return {"status": "running", "uptime": "2 hours"}
 
def ping():
    """健康检查"""
    return "pong"
 
class DataService:
    def save_data(self, data):
        return {"saved": True, "id": 789}
 
# 混合暴露函数和类
serve(
    get_server_status,
    ping,
    DataService(),
    mode="http",
    ident=8080
)

异步服务支持

1. 异步方法定义

import asyncio
import aiohttp
 
class AsyncWebService:
    """异步 Web 服务示例"""
    
    async def fetch_url(self, url):
        """异步获取网页内容"""
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                content = await response.text()
                return {
                    "url": url,
                    "status": response.status,
                    "content_length": len(content)
                }
    
    async def batch_fetch(self, urls):
        """批量异步获取"""
        tasks = [self.fetch_url(url) for url in urls]
        results = await asyncio.gather(*tasks)
        return {"results": results, "count": len(results)}
    
    def sync_method(self, data):
        """同步方法也支持"""
        return {"processed": data}
 
# 启动异步服务
serve(AsyncWebService(), mode="http", ident=8080)

2. 混合同步异步

class MixedService:
    def __init__(self):
        self.cache = {}
    
    def get_cache(self, key):
        """同步获取缓存"""
        return self.cache.get(key, "Not found")
    
    def set_cache(self, key, value):
        """同步设置缓存"""
        self.cache[key] = value
        return {"success": True}
    
    async def async_process(self, data):
        """异步处理数据"""
        await asyncio.sleep(1)  # 模拟异步操作
        processed = data.upper()
        return {"original": data, "processed": processed}
 
serve(MixedService(), mode="http", ident=8080)

错误处理和异常管理

1. 自定义异常处理

class BusinessException(Exception):
    """业务异常"""
    def __init__(self, message, code=400):
        self.message = message
        self.code = code
        super().__init__(message)
 
class ValidationService:
    def validate_email(self, email):
        """邮箱验证"""
        if "@" not in email:
            raise BusinessException("邮箱格式不正确", 400)
        return {"valid": True, "email": email}
    
    def validate_age(self, age):
        """年龄验证"""
        if age < 0 or age > 150:
            raise BusinessException("年龄必须在0-150之间", 400)
        return {"valid": True, "age": age}
 
serve(ValidationService(), mode="http", ident=8080)

2. 全局异常处理

from bricks.rpc.common import BaseRpcService
 
class CustomRpcService(BaseRpcService):
    """自定义 RPC 服务,添加全局异常处理"""
    
    async def process_rpc_request(self, request):
        try:
            return await super().process_rpc_request(request)
        except BusinessException as e:
            return RpcResponse(
                message=e.message,
                code=e.code,
                request_id=request.request_id
            )
        except Exception as e:
            # 记录详细错误日志
            logger.error(f"未处理的异常: {e}", exc_info=True)
            return RpcResponse(
                message="服务器内部错误",
                code=500,
                request_id=request.request_id
            )
 
# 使用自定义服务
service = CustomRpcService()
service.bind_target(ValidationService())
await service.serve(concurrency=10, ident=8080)

服务生命周期管理

1. 服务启动回调

def on_server_started(port):
    """服务启动回调"""
    print(f"🚀 RPC 服务已启动")
    print(f"📡 监听端口: {port}")
    print(f"🔗 访问地址: http://localhost:{port}/rpc")
    
    # 可以在这里执行初始化操作
    # 比如:注册服务到注册中心、启动健康检查等
 
serve(
    CalculatorService(),
    mode="http",
    ident=8080,
    on_server_started=on_server_started
)

2. 优雅关闭

import signal
import sys
 
class GracefulService:
    def __init__(self):
        self.is_shutting_down = False
        # 注册信号处理器
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)
    
    def signal_handler(self, signum, frame):
        """信号处理器"""
        print(f"收到信号 {signum},开始优雅关闭...")
        self.is_shutting_down = True
        # 执行清理操作
        self.cleanup()
        sys.exit(0)
    
    def cleanup(self):
        """清理资源"""
        print("正在清理资源...")
        # 关闭数据库连接、保存状态等
    
    def process_data(self, data):
        """处理数据"""
        if self.is_shutting_down:
            return {"error": "服务正在关闭"}
        return {"processed": data}
 
serve(GracefulService(), mode="http", ident=8080)

性能优化

1. 并发配置优化

import os
 
# 根据 CPU 核心数配置并发
cpu_count = os.cpu_count()
optimal_concurrency = cpu_count * 2
 
serve(
    MyService(),
    mode="http",
    concurrency=optimal_concurrency,
    ident=8080
)

2. 内存优化

class OptimizedService:
    def __init__(self):
        # 使用对象池减少内存分配
        self.object_pool = []
    
    def process_large_data(self, data):
        """处理大数据量"""
        # 流式处理,避免一次性加载到内存
        result = []
        for chunk in self.chunk_data(data, 1000):
            processed_chunk = self.process_chunk(chunk)
            result.append(processed_chunk)
        return {"chunks_processed": len(result)}
    
    def chunk_data(self, data, chunk_size):
        """数据分块"""
        for i in range(0, len(data), chunk_size):
            yield data[i:i + chunk_size]
    
    def process_chunk(self, chunk):
        """处理数据块"""
        return len(chunk)
 
serve(OptimizedService(), mode="http", ident=8080)

通过这些配置和优化,可以构建高性能、稳定可靠的 RPC 服务端。