bricks
开发指南
2.5 RPC篇
2.5.3 RPC 客户端

2.5.3 RPC 客户端

Bricks 框架为每种 RPC 协议都提供了对应的客户端实现,支持同步和异步调用,具有统一的接口设计。

基本客户端使用

1. HTTP 客户端

from bricks.rpc.http_.service import Client
 
# 创建 HTTP RPC 客户端
client = Client("localhost:8080")
# 或者指定完整 URL
client = Client("http://localhost:8080")
 
# 调用远程方法
response = client.rpc("add", 10, 20)
print(f"结果: {response.data}")  # 结果: 30
print(f"状态码: {response.code}")  # 状态码: 0
print(f"消息: {response.message}")  # 消息: 

2. WebSocket 客户端

from bricks.rpc.websocket_.service import Client
 
# 创建 WebSocket RPC 客户端
client = Client("localhost:8080")
# 或者指定完整 URL
client = Client("ws://localhost:8080")
 
try:
    # 调用远程方法
    response = client.rpc("multiply", 5, 6)
    print(f"结果: {response.data}")
finally:
    # 关闭连接
    client.close()

3. Socket 客户端

from bricks.rpc.socket_.service import Client
 
# 创建 Socket RPC 客户端
client = Client("localhost:8080")
 
response = client.rpc("divide", 100, 5)
print(f"结果: {response.data}")

4. gRPC 客户端

from bricks.rpc.grpc_.service import Client
 
# 创建 gRPC 客户端
client = Client("localhost:50051")
 
response = client.rpc("subtract", 100, 30)
print(f"结果: {response.data}")

5. Redis 客户端

from bricks.rpc.redis_.service import Client
 
# 创建 Redis RPC 客户端
client = Client("redis://localhost:6379/0?server_id=my_server")
 
response = client.rpc("get_user", user_id=123)
print(f"用户信息: {response.data}")

统一的调用接口

所有客户端都提供统一的 rpc 方法:

# 基本语法
response = client.rpc(method_name, *args, **kwargs)
 
# 参数说明
# method_name: 远程方法名
# *args: 位置参数
# **kwargs: 关键字参数
# 返回: RpcResponse 对象

调用示例

# 1. 无参数调用
response = client.rpc("get_server_status")
 
# 2. 位置参数调用
response = client.rpc("add", 10, 20)
 
# 3. 关键字参数调用
response = client.rpc("create_user", name="张三", email="zhangsan@example.com")
 
# 4. 混合参数调用
response = client.rpc("update_user", 123, name="李四", email="lisi@example.com")
 
# 5. 复杂数据类型
response = client.rpc("process_data", {
    "items": [1, 2, 3, 4, 5],
    "config": {"batch_size": 100, "timeout": 30}
})

响应处理

RpcResponse 对象

class RpcResponse:
    data: str      # 响应数据(JSON 字符串)
    message: str   # 响应消息
    code: int      # 状态码(0表示成功)
    request_id: str # 请求ID

响应处理示例

response = client.rpc("get_user", user_id=123)
 
# 检查调用是否成功
if response.code == 0:
    # 解析响应数据
    import json
    user_data = json.loads(response.data)
    print(f"用户名: {user_data['name']}")
else:
    print(f"调用失败: {response.message}")

便捷的响应处理

def safe_rpc_call(client, method, *args, **kwargs):
    """安全的 RPC 调用封装"""
    try:
        response = client.rpc(method, *args, **kwargs)
        if response.code == 0:
            import json
            return json.loads(response.data)
        else:
            raise RuntimeError(f"RPC 调用失败: {response.message}")
    except Exception as e:
        raise RuntimeError(f"RPC 调用异常: {e}")
 
# 使用示例
try:
    user = safe_rpc_call(client, "get_user", user_id=123)
    print(f"用户: {user}")
except RuntimeError as e:
    print(f"错误: {e}")

错误处理

1. 网络错误处理

from bricks.rpc.http_.service import Client
 
client = Client("localhost:8080")
 
try:
    response = client.rpc("some_method", param="value")
    if response.code != 0:
        print(f"业务错误: {response.message}")
    else:
        print(f"成功: {response.data}")
except ConnectionError:
    print("连接错误:无法连接到服务器")
except TimeoutError:
    print("超时错误:请求超时")
except Exception as e:
    print(f"其他错误: {e}")

2. 重试机制

import time
from typing import Optional
 
def rpc_call_with_retry(
    client, 
    method: str, 
    max_retries: int = 3, 
    retry_delay: float = 1.0,
    *args, 
    **kwargs
) -> Optional[dict]:
    """带重试的 RPC 调用"""
    
    for attempt in range(max_retries + 1):
        try:
            response = client.rpc(method, *args, **kwargs)
            if response.code == 0:
                import json
                return json.loads(response.data)
            else:
                print(f"业务错误 (尝试 {attempt + 1}): {response.message}")
                
        except Exception as e:
            print(f"网络错误 (尝试 {attempt + 1}): {e}")
            
        # 如果不是最后一次尝试,等待后重试
        if attempt < max_retries:
            time.sleep(retry_delay)
            retry_delay *= 2  # 指数退避
    
    return None
 
# 使用示例
result = rpc_call_with_retry(
    client, 
    "unstable_method", 
    max_retries=3,
    retry_delay=1.0,
    param="value"
)

连接管理

1. 连接池(HTTP 客户端)

import requests
from bricks.rpc.http_.service import Client
 
class PooledHttpClient(Client):
    """使用连接池的 HTTP 客户端"""
    
    def __init__(self, endpoint: str, pool_size: int = 10):
        super().__init__(endpoint)
        self.session = requests.Session()
        # 配置连接池
        adapter = requests.adapters.HTTPAdapter(
            pool_connections=pool_size,
            pool_maxsize=pool_size
        )
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
    
    def rpc(self, method: str, *args, **kwargs):
        """使用连接池的 RPC 调用"""
        rpc_request = self._prepare_request(method, *args, **kwargs)
        
        try:
            response = self.session.post(
                f"{self.endpoint}/rpc",
                json=rpc_request.to_dict(),
                headers={"Content-Type": "application/json"}
            )
            
            if not response.ok:
                raise RuntimeError(f"HTTP error {response.status_code}: {response.text}")
            
            response_data = response.json()
            return RpcResponse.from_dict({**response_data, "request_id": rpc_request.request_id})
            
        except Exception as e:
            raise RuntimeError(f"HTTP RPC call failed for method '{method}': {e}")
 
# 使用连接池客户端
client = PooledHttpClient("localhost:8080", pool_size=20)

2. 长连接管理(WebSocket)

from bricks.rpc.websocket_.service import Client
 
class ManagedWebSocketClient:
    """管理 WebSocket 连接的客户端"""
    
    def __init__(self, endpoint: str):
        self.endpoint = endpoint
        self.client = None
        self.connected = False
    
    def connect(self):
        """建立连接"""
        if not self.connected:
            self.client = Client(self.endpoint)
            self.connected = True
    
    def disconnect(self):
        """断开连接"""
        if self.connected and self.client:
            self.client.close()
            self.connected = False
    
    def rpc(self, method: str, *args, **kwargs):
        """自动管理连接的 RPC 调用"""
        if not self.connected:
            self.connect()
        
        try:
            return self.client.rpc(method, *args, **kwargs)
        except Exception as e:
            # 连接异常时重新连接
            self.connected = False
            self.connect()
            return self.client.rpc(method, *args, **kwargs)
    
    def __enter__(self):
        self.connect()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.disconnect()
 
# 使用上下文管理器
with ManagedWebSocketClient("localhost:8080") as client:
    result1 = client.rpc("method1", param="value1")
    result2 = client.rpc("method2", param="value2")

异步客户端

1. 异步 HTTP 客户端

import asyncio
import aiohttp
from bricks.rpc.common import BaseRpcClient, RpcRequest, RpcResponse
 
class AsyncHttpClient(BaseRpcClient):
    """异步 HTTP RPC 客户端"""
    
    def __init__(self, endpoint: str):
        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
            endpoint = "http://" + endpoint
        self.endpoint = endpoint.rstrip('/')
    
    async def async_rpc(self, method: str, *args, **kwargs):
        """异步 RPC 调用"""
        rpc_request = self._prepare_request(method, *args, **kwargs)
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.endpoint}/rpc",
                json=rpc_request.to_dict(),
                headers={"Content-Type": "application/json"}
            ) as response:
                if not response.ok:
                    raise RuntimeError(f"HTTP error {response.status}: {await response.text()}")
                
                response_data = await response.json()
                return RpcResponse.from_dict({**response_data, "request_id": rpc_request.request_id})
 
# 使用异步客户端
async def main():
    client = AsyncHttpClient("localhost:8080")
    
    # 并发调用多个方法
    tasks = [
        client.async_rpc("method1", param1="value1"),
        client.async_rpc("method2", param2="value2"),
        client.async_rpc("method3", param3="value3")
    ]
    
    results = await asyncio.gather(*tasks)
    for result in results:
        print(f"结果: {result.data}")
 
asyncio.run(main())

2. 批量调用

async def batch_rpc_calls(client, calls):
    """批量 RPC 调用"""
    tasks = []
    for method, args, kwargs in calls:
        task = client.async_rpc(method, *args, **kwargs)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # 处理结果和异常
    processed_results = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            processed_results.append({
                "success": False,
                "error": str(result),
                "call_index": i
            })
        else:
            processed_results.append({
                "success": True,
                "data": result.data,
                "call_index": i
            })
    
    return processed_results
 
# 使用示例
calls = [
    ("get_user", [123], {}),
    ("get_order", [456], {}),
    ("get_product", [], {"product_id": 789})
]
 
results = await batch_rpc_calls(client, calls)

客户端配置和优化

1. 超时配置

import requests
from bricks.rpc.http_.service import Client
 
class ConfigurableHttpClient(Client):
    def __init__(self, endpoint: str, timeout: int = 30):
        super().__init__(endpoint)
        self.timeout = timeout
    
    def rpc(self, method: str, *args, **kwargs):
        rpc_request = self._prepare_request(method, *args, **kwargs)
        
        try:
            response = requests.post(
                f"{self.endpoint}/rpc",
                json=rpc_request.to_dict(),
                headers={"Content-Type": "application/json"},
                timeout=self.timeout  # 设置超时
            )
            # ... 处理响应
        except requests.Timeout:
            raise TimeoutError(f"RPC call timeout after {self.timeout} seconds")
 
client = ConfigurableHttpClient("localhost:8080", timeout=60)

2. 客户端中间件

class MiddlewareClient:
    """支持中间件的客户端"""
    
    def __init__(self, client):
        self.client = client
        self.middlewares = []
    
    def add_middleware(self, middleware):
        """添加中间件"""
        self.middlewares.append(middleware)
    
    def rpc(self, method: str, *args, **kwargs):
        """执行带中间件的 RPC 调用"""
        # 前置中间件
        for middleware in self.middlewares:
            if hasattr(middleware, 'before_request'):
                method, args, kwargs = middleware.before_request(method, args, kwargs)
        
        # 执行实际调用
        response = self.client.rpc(method, *args, **kwargs)
        
        # 后置中间件
        for middleware in reversed(self.middlewares):
            if hasattr(middleware, 'after_request'):
                response = middleware.after_request(response)
        
        return response
 
# 日志中间件示例
class LoggingMiddleware:
    def before_request(self, method, args, kwargs):
        print(f"调用方法: {method}, 参数: {args}, {kwargs}")
        return method, args, kwargs
    
    def after_request(self, response):
        print(f"响应状态: {response.code}, 数据长度: {len(response.data)}")
        return response
 
# 使用中间件
base_client = Client("localhost:8080")
client = MiddlewareClient(base_client)
client.add_middleware(LoggingMiddleware())

通过这些客户端功能,可以灵活地与 RPC 服务进行交互,满足各种应用场景的需求。