2.5.3 RPC 客户端
Bricks
框架为每种 RPC 协议都提供了对应的客户端实现,支持同步和异步调用,具有统一的接口设计。
基本客户端使用
1. HTTP 客户端
from bricks.rpc.http_.service import Client
# 创建 HTTP RPC 客户端
client = Client("localhost:8080")
# 或者指定完整 URL
client = Client("http://localhost:8080")
# 调用远程方法
response = client.rpc("add", 10, 20)
print(f"结果: {response.data}") # 结果: 30
print(f"状态码: {response.code}") # 状态码: 0
print(f"消息: {response.message}") # 消息:
2. WebSocket 客户端
from bricks.rpc.websocket_.service import Client
# 创建 WebSocket RPC 客户端
client = Client("localhost:8080")
# 或者指定完整 URL
client = Client("ws://localhost:8080")
try:
# 调用远程方法
response = client.rpc("multiply", 5, 6)
print(f"结果: {response.data}")
finally:
# 关闭连接
client.close()
3. Socket 客户端
from bricks.rpc.socket_.service import Client
# 创建 Socket RPC 客户端
client = Client("localhost:8080")
response = client.rpc("divide", 100, 5)
print(f"结果: {response.data}")
4. gRPC 客户端
from bricks.rpc.grpc_.service import Client
# 创建 gRPC 客户端
client = Client("localhost:50051")
response = client.rpc("subtract", 100, 30)
print(f"结果: {response.data}")
5. Redis 客户端
from bricks.rpc.redis_.service import Client
# 创建 Redis RPC 客户端
client = Client("redis://localhost:6379/0?server_id=my_server")
response = client.rpc("get_user", user_id=123)
print(f"用户信息: {response.data}")
统一的调用接口
所有客户端都提供统一的 rpc
方法:
# 基本语法
response = client.rpc(method_name, *args, **kwargs)
# 参数说明
# method_name: 远程方法名
# *args: 位置参数
# **kwargs: 关键字参数
# 返回: RpcResponse 对象
调用示例
# 1. 无参数调用
response = client.rpc("get_server_status")
# 2. 位置参数调用
response = client.rpc("add", 10, 20)
# 3. 关键字参数调用
response = client.rpc("create_user", name="张三", email="zhangsan@example.com")
# 4. 混合参数调用
response = client.rpc("update_user", 123, name="李四", email="lisi@example.com")
# 5. 复杂数据类型
response = client.rpc("process_data", {
"items": [1, 2, 3, 4, 5],
"config": {"batch_size": 100, "timeout": 30}
})
响应处理
RpcResponse 对象
class RpcResponse:
data: str # 响应数据(JSON 字符串)
message: str # 响应消息
code: int # 状态码(0表示成功)
request_id: str # 请求ID
响应处理示例
response = client.rpc("get_user", user_id=123)
# 检查调用是否成功
if response.code == 0:
# 解析响应数据
import json
user_data = json.loads(response.data)
print(f"用户名: {user_data['name']}")
else:
print(f"调用失败: {response.message}")
便捷的响应处理
def safe_rpc_call(client, method, *args, **kwargs):
"""安全的 RPC 调用封装"""
try:
response = client.rpc(method, *args, **kwargs)
if response.code == 0:
import json
return json.loads(response.data)
else:
raise RuntimeError(f"RPC 调用失败: {response.message}")
except Exception as e:
raise RuntimeError(f"RPC 调用异常: {e}")
# 使用示例
try:
user = safe_rpc_call(client, "get_user", user_id=123)
print(f"用户: {user}")
except RuntimeError as e:
print(f"错误: {e}")
错误处理
1. 网络错误处理
from bricks.rpc.http_.service import Client
client = Client("localhost:8080")
try:
response = client.rpc("some_method", param="value")
if response.code != 0:
print(f"业务错误: {response.message}")
else:
print(f"成功: {response.data}")
except ConnectionError:
print("连接错误:无法连接到服务器")
except TimeoutError:
print("超时错误:请求超时")
except Exception as e:
print(f"其他错误: {e}")
2. 重试机制
import time
from typing import Optional
def rpc_call_with_retry(
client,
method: str,
max_retries: int = 3,
retry_delay: float = 1.0,
*args,
**kwargs
) -> Optional[dict]:
"""带重试的 RPC 调用"""
for attempt in range(max_retries + 1):
try:
response = client.rpc(method, *args, **kwargs)
if response.code == 0:
import json
return json.loads(response.data)
else:
print(f"业务错误 (尝试 {attempt + 1}): {response.message}")
except Exception as e:
print(f"网络错误 (尝试 {attempt + 1}): {e}")
# 如果不是最后一次尝试,等待后重试
if attempt < max_retries:
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
return None
# 使用示例
result = rpc_call_with_retry(
client,
"unstable_method",
max_retries=3,
retry_delay=1.0,
param="value"
)
连接管理
1. 连接池(HTTP 客户端)
import requests
from bricks.rpc.http_.service import Client
class PooledHttpClient(Client):
"""使用连接池的 HTTP 客户端"""
def __init__(self, endpoint: str, pool_size: int = 10):
super().__init__(endpoint)
self.session = requests.Session()
# 配置连接池
adapter = requests.adapters.HTTPAdapter(
pool_connections=pool_size,
pool_maxsize=pool_size
)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def rpc(self, method: str, *args, **kwargs):
"""使用连接池的 RPC 调用"""
rpc_request = self._prepare_request(method, *args, **kwargs)
try:
response = self.session.post(
f"{self.endpoint}/rpc",
json=rpc_request.to_dict(),
headers={"Content-Type": "application/json"}
)
if not response.ok:
raise RuntimeError(f"HTTP error {response.status_code}: {response.text}")
response_data = response.json()
return RpcResponse.from_dict({**response_data, "request_id": rpc_request.request_id})
except Exception as e:
raise RuntimeError(f"HTTP RPC call failed for method '{method}': {e}")
# 使用连接池客户端
client = PooledHttpClient("localhost:8080", pool_size=20)
2. 长连接管理(WebSocket)
from bricks.rpc.websocket_.service import Client
class ManagedWebSocketClient:
"""管理 WebSocket 连接的客户端"""
def __init__(self, endpoint: str):
self.endpoint = endpoint
self.client = None
self.connected = False
def connect(self):
"""建立连接"""
if not self.connected:
self.client = Client(self.endpoint)
self.connected = True
def disconnect(self):
"""断开连接"""
if self.connected and self.client:
self.client.close()
self.connected = False
def rpc(self, method: str, *args, **kwargs):
"""自动管理连接的 RPC 调用"""
if not self.connected:
self.connect()
try:
return self.client.rpc(method, *args, **kwargs)
except Exception as e:
# 连接异常时重新连接
self.connected = False
self.connect()
return self.client.rpc(method, *args, **kwargs)
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
# 使用上下文管理器
with ManagedWebSocketClient("localhost:8080") as client:
result1 = client.rpc("method1", param="value1")
result2 = client.rpc("method2", param="value2")
异步客户端
1. 异步 HTTP 客户端
import asyncio
import aiohttp
from bricks.rpc.common import BaseRpcClient, RpcRequest, RpcResponse
class AsyncHttpClient(BaseRpcClient):
"""异步 HTTP RPC 客户端"""
def __init__(self, endpoint: str):
if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
endpoint = "http://" + endpoint
self.endpoint = endpoint.rstrip('/')
async def async_rpc(self, method: str, *args, **kwargs):
"""异步 RPC 调用"""
rpc_request = self._prepare_request(method, *args, **kwargs)
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.endpoint}/rpc",
json=rpc_request.to_dict(),
headers={"Content-Type": "application/json"}
) as response:
if not response.ok:
raise RuntimeError(f"HTTP error {response.status}: {await response.text()}")
response_data = await response.json()
return RpcResponse.from_dict({**response_data, "request_id": rpc_request.request_id})
# 使用异步客户端
async def main():
client = AsyncHttpClient("localhost:8080")
# 并发调用多个方法
tasks = [
client.async_rpc("method1", param1="value1"),
client.async_rpc("method2", param2="value2"),
client.async_rpc("method3", param3="value3")
]
results = await asyncio.gather(*tasks)
for result in results:
print(f"结果: {result.data}")
asyncio.run(main())
2. 批量调用
async def batch_rpc_calls(client, calls):
"""批量 RPC 调用"""
tasks = []
for method, args, kwargs in calls:
task = client.async_rpc(method, *args, **kwargs)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果和异常
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"success": False,
"error": str(result),
"call_index": i
})
else:
processed_results.append({
"success": True,
"data": result.data,
"call_index": i
})
return processed_results
# 使用示例
calls = [
("get_user", [123], {}),
("get_order", [456], {}),
("get_product", [], {"product_id": 789})
]
results = await batch_rpc_calls(client, calls)
客户端配置和优化
1. 超时配置
import requests
from bricks.rpc.http_.service import Client
class ConfigurableHttpClient(Client):
def __init__(self, endpoint: str, timeout: int = 30):
super().__init__(endpoint)
self.timeout = timeout
def rpc(self, method: str, *args, **kwargs):
rpc_request = self._prepare_request(method, *args, **kwargs)
try:
response = requests.post(
f"{self.endpoint}/rpc",
json=rpc_request.to_dict(),
headers={"Content-Type": "application/json"},
timeout=self.timeout # 设置超时
)
# ... 处理响应
except requests.Timeout:
raise TimeoutError(f"RPC call timeout after {self.timeout} seconds")
client = ConfigurableHttpClient("localhost:8080", timeout=60)
2. 客户端中间件
class MiddlewareClient:
"""支持中间件的客户端"""
def __init__(self, client):
self.client = client
self.middlewares = []
def add_middleware(self, middleware):
"""添加中间件"""
self.middlewares.append(middleware)
def rpc(self, method: str, *args, **kwargs):
"""执行带中间件的 RPC 调用"""
# 前置中间件
for middleware in self.middlewares:
if hasattr(middleware, 'before_request'):
method, args, kwargs = middleware.before_request(method, args, kwargs)
# 执行实际调用
response = self.client.rpc(method, *args, **kwargs)
# 后置中间件
for middleware in reversed(self.middlewares):
if hasattr(middleware, 'after_request'):
response = middleware.after_request(response)
return response
# 日志中间件示例
class LoggingMiddleware:
def before_request(self, method, args, kwargs):
print(f"调用方法: {method}, 参数: {args}, {kwargs}")
return method, args, kwargs
def after_request(self, response):
print(f"响应状态: {response.code}, 数据长度: {len(response.data)}")
return response
# 使用中间件
base_client = Client("localhost:8080")
client = MiddlewareClient(base_client)
client.add_middleware(LoggingMiddleware())
通过这些客户端功能,可以灵活地与 RPC 服务进行交互,满足各种应用场景的需求。