2.5.4 多协议支持
Bricks
框架支持多种 RPC 通信协议,每种协议都有其特定的优势和适用场景。本节详细介绍各种协议的特性、配置和使用方法。
协议对比
协议 | 传输方式 | 性能 | 复杂度 | 跨语言支持 | 适用场景 |
---|---|---|---|---|---|
HTTP | HTTP/1.1 | 中等 | 简单 | 优秀 | Web API、跨语言调用 |
WebSocket | WebSocket | 高 | 中等 | 良好 | 实时通信、长连接 |
Socket | TCP | 很高 | 中等 | 一般 | 高性能、低延迟 |
gRPC | HTTP/2 | 很高 | 复杂 | 优秀 | 微服务、高并发 |
Redis | Redis 协议 | 高 | 简单 | 优秀 | 异步队列、分布式 |
HTTP 协议
特性
- 简单易用:基于标准 HTTP 协议,易于理解和调试
- 跨语言:任何支持 HTTP 的语言都可以调用
- 防火墙友好:使用标准 80/443 端口
- 工具丰富:可以使用 curl、Postman 等工具测试
服务端配置
from bricks.rpc.common import serve
# 基本配置
serve(
MyService(),
mode="http",
ident=8080,
concurrency=10
)
# 高级配置
serve(
MyService(),
mode="http",
ident=8080,
concurrency=20,
# HTTP 特定参数
host="0.0.0.0", # 监听地址
timeout=30, # 请求超时
max_request_size=1024*1024 # 最大请求大小
)
客户端使用
from bricks.rpc.http_.service import Client
client = Client("http://localhost:8080")
response = client.rpc("method_name", param1="value1")
# 支持 HTTPS
https_client = Client("https://api.example.com")
使用场景
- Web API 服务:对外提供 HTTP API
- 跨语言调用:不同语言系统间的通信
- 调试和测试:便于使用 HTTP 工具调试
- 负载均衡:易于配置 HTTP 负载均衡器
WebSocket 协议
特性
- 全双工通信:支持服务端主动推送
- 低延迟:减少 HTTP 握手开销
- 实时性:适合实时数据传输
- 连接保持:长连接减少连接建立成本
服务端配置
from bricks.rpc.common import serve
# WebSocket 服务
serve(
RealtimeService(),
mode="websocket",
ident=8080,
concurrency=50,
# WebSocket 特定参数
max_size=None, # 无消息大小限制
max_queue=None, # 无队列大小限制
ping_interval=20, # 心跳间隔
ping_timeout=10 # 心跳超时
)
客户端使用
from bricks.rpc.websocket_.service import Client
client = Client("ws://localhost:8080")
try:
# 实时调用
response = client.rpc("get_realtime_data")
print(response.data)
finally:
client.close()
实时推送示例
class RealtimeService:
def __init__(self):
self.subscribers = set()
def subscribe(self, client_id):
"""订阅实时数据"""
self.subscribers.add(client_id)
return {"subscribed": True, "client_id": client_id}
def broadcast_data(self, data):
"""广播数据到所有订阅者"""
# 这里需要配合 WebSocket 服务的连接管理
return {"broadcasted": True, "subscriber_count": len(self.subscribers)}
使用场景
- 实时监控:系统状态、性能指标实时推送
- 在线聊天:即时消息系统
- 实时数据:股票价格、传感器数据
- 协作应用:多人在线编辑
Socket 协议
特性
- 高性能:直接 TCP 通信,开销最小
- 低延迟:无 HTTP 协议开销
- 可靠传输:TCP 保证数据可靠性
- 自定义协议:可以定制传输格式
服务端配置
from bricks.rpc.common import serve
# Socket 服务
serve(
HighPerformanceService(),
mode="socket",
ident=9090,
concurrency=100,
# Socket 特定参数
backlog=1024, # 连接队列长度
nodelay=True, # 禁用 Nagle 算法
keepalive=True, # 启用 TCP keepalive
buffer_size=8192 # 缓冲区大小
)
客户端使用
from bricks.rpc.socket_.service import Client
client = Client("localhost:9090")
response = client.rpc("high_performance_method", large_data)
使用场景
- 高频交易:金融系统的低延迟要求
- 游戏服务器:实时游戏数据传输
- IoT 设备:物联网设备通信
- 内部服务:微服务间高性能通信
gRPC 协议
特性
- 高性能:基于 HTTP/2,支持多路复用
- 强类型:Protocol Buffers 序列化
- 流式传输:支持客户端流、服务端流、双向流
- 负载均衡:内置负载均衡支持
服务端配置
from bricks.rpc.common import serve
# gRPC 服务
serve(
MicroService(),
mode="grpc",
ident=50051,
concurrency=200,
# gRPC 特定参数
max_workers=200, # 最大工作线程
max_message_length=4*1024*1024, # 最大消息长度
compression="gzip" # 压缩算法
)
客户端使用
from bricks.rpc.grpc_.service import Client
client = Client("localhost:50051")
response = client.rpc("process_batch", batch_data)
流式处理示例
class StreamingService:
def process_stream(self, data_stream):
"""处理数据流"""
results = []
for data in data_stream:
processed = self.process_single(data)
results.append(processed)
# 可以实现流式返回
yield processed
return results
def process_single(self, data):
"""处理单个数据项"""
return {"processed": data, "timestamp": time.time()}
使用场景
- 微服务架构:服务间高效通信
- 云原生应用:Kubernetes 环境
- 大数据处理:流式数据处理
- 企业级应用:高可靠性要求
Redis 协议
特性
- 异步处理:基于消息队列的异步调用
- 持久化:Redis 提供数据持久化
- 分布式:天然支持分布式部署
- 高可用:Redis 集群支持
服务端配置
from bricks.rpc.common import serve
# Redis 服务
serve(
AsyncService(),
mode="redis",
ident="redis://localhost:6379/0",
concurrency=50,
# Redis 特定参数
server_id="worker_001", # 服务器标识
queue_name="rpc_queue", # 队列名称
timeout=30, # 消息超时
max_retries=3 # 最大重试次数
)
客户端使用
from bricks.rpc.redis_.service import Client
# 指定服务器ID
client = Client("redis://localhost:6379/0?server_id=worker_001")
response = client.rpc("async_process", data)
分布式任务示例
class DistributedTaskService:
def __init__(self):
self.task_queue = []
def submit_task(self, task_data):
"""提交任务"""
task_id = str(uuid.uuid4())
self.task_queue.append({
"id": task_id,
"data": task_data,
"status": "pending",
"created_at": time.time()
})
return {"task_id": task_id, "status": "submitted"}
def get_task_status(self, task_id):
"""获取任务状态"""
for task in self.task_queue:
if task["id"] == task_id:
return task
return {"error": "Task not found"}
使用场景
- 异步任务处理:后台任务队列
- 分布式计算:多节点协同处理
- 消息队列:系统间异步通信
- 缓存服务:分布式缓存
协议选择指南
性能要求
# 高性能场景:Socket > gRPC > WebSocket > HTTP
if performance_critical:
protocol = "socket"
elif high_throughput:
protocol = "grpc"
elif realtime_required:
protocol = "websocket"
else:
protocol = "http"
部署环境
# 根据部署环境选择协议
if cloud_native:
protocol = "grpc" # 云原生环境
elif web_service:
protocol = "http" # Web 服务
elif internal_service:
protocol = "socket" # 内部服务
elif async_processing:
protocol = "redis" # 异步处理
开发复杂度
# 根据开发复杂度选择
if simple_api:
protocol = "http" # 简单 API
elif need_streaming:
protocol = "grpc" # 需要流式处理
elif need_push:
protocol = "websocket" # 需要推送
elif high_performance:
protocol = "socket" # 高性能要求
混合协议部署
多协议同时支持
import asyncio
from bricks.rpc.common import start_rpc_server
async def multi_protocol_server():
"""同时启动多种协议的服务"""
service = MyService()
# 启动多个协议服务
tasks = [
start_rpc_server(service, mode="http", ident=8080),
start_rpc_server(service, mode="websocket", ident=8081),
start_rpc_server(service, mode="grpc", ident=50051),
]
await asyncio.gather(*tasks)
asyncio.run(multi_protocol_server())
协议网关
class ProtocolGateway:
"""协议网关,统一多种协议的访问"""
def __init__(self):
self.services = {
"http": "http://localhost:8080",
"websocket": "ws://localhost:8081",
"grpc": "localhost:50051"
}
def route_request(self, protocol, method, *args, **kwargs):
"""根据协议路由请求"""
if protocol == "http":
from bricks.rpc.http_.service import Client
client = Client(self.services["http"])
elif protocol == "websocket":
from bricks.rpc.websocket_.service import Client
client = Client(self.services["websocket"])
elif protocol == "grpc":
from bricks.rpc.grpc_.service import Client
client = Client(self.services["grpc"])
else:
raise ValueError(f"不支持的协议: {protocol}")
return client.rpc(method, *args, **kwargs)
# 使用网关
gateway = ProtocolGateway()
result = gateway.route_request("http", "get_user", user_id=123)
通过合理选择和配置不同的协议,可以构建适应各种场景需求的高效 RPC 系统。