bricks
开发指南
2.5 RPC篇
2.5.4 多协议支持

2.5.4 多协议支持

Bricks 框架支持多种 RPC 通信协议,每种协议都有其特定的优势和适用场景。本节详细介绍各种协议的特性、配置和使用方法。

协议对比

协议传输方式性能复杂度跨语言支持适用场景
HTTPHTTP/1.1中等简单优秀Web API、跨语言调用
WebSocketWebSocket中等良好实时通信、长连接
SocketTCP很高中等一般高性能、低延迟
gRPCHTTP/2很高复杂优秀微服务、高并发
RedisRedis 协议简单优秀异步队列、分布式

HTTP 协议

特性

  • 简单易用:基于标准 HTTP 协议,易于理解和调试
  • 跨语言:任何支持 HTTP 的语言都可以调用
  • 防火墙友好:使用标准 80/443 端口
  • 工具丰富:可以使用 curl、Postman 等工具测试

服务端配置

from bricks.rpc.common import serve
 
# 基本配置
serve(
    MyService(),
    mode="http",
    ident=8080,
    concurrency=10
)
 
# 高级配置
serve(
    MyService(),
    mode="http",
    ident=8080,
    concurrency=20,
    # HTTP 特定参数
    host="0.0.0.0",        # 监听地址
    timeout=30,            # 请求超时
    max_request_size=1024*1024  # 最大请求大小
)

客户端使用

from bricks.rpc.http_.service import Client
 
client = Client("http://localhost:8080")
response = client.rpc("method_name", param1="value1")
 
# 支持 HTTPS
https_client = Client("https://api.example.com")

使用场景

  • Web API 服务:对外提供 HTTP API
  • 跨语言调用:不同语言系统间的通信
  • 调试和测试:便于使用 HTTP 工具调试
  • 负载均衡:易于配置 HTTP 负载均衡器

WebSocket 协议

特性

  • 全双工通信:支持服务端主动推送
  • 低延迟:减少 HTTP 握手开销
  • 实时性:适合实时数据传输
  • 连接保持:长连接减少连接建立成本

服务端配置

from bricks.rpc.common import serve
 
# WebSocket 服务
serve(
    RealtimeService(),
    mode="websocket",
    ident=8080,
    concurrency=50,
    # WebSocket 特定参数
    max_size=None,         # 无消息大小限制
    max_queue=None,        # 无队列大小限制
    ping_interval=20,      # 心跳间隔
    ping_timeout=10        # 心跳超时
)

客户端使用

from bricks.rpc.websocket_.service import Client
 
client = Client("ws://localhost:8080")
try:
    # 实时调用
    response = client.rpc("get_realtime_data")
    print(response.data)
finally:
    client.close()

实时推送示例

class RealtimeService:
    def __init__(self):
        self.subscribers = set()
    
    def subscribe(self, client_id):
        """订阅实时数据"""
        self.subscribers.add(client_id)
        return {"subscribed": True, "client_id": client_id}
    
    def broadcast_data(self, data):
        """广播数据到所有订阅者"""
        # 这里需要配合 WebSocket 服务的连接管理
        return {"broadcasted": True, "subscriber_count": len(self.subscribers)}

使用场景

  • 实时监控:系统状态、性能指标实时推送
  • 在线聊天:即时消息系统
  • 实时数据:股票价格、传感器数据
  • 协作应用:多人在线编辑

Socket 协议

特性

  • 高性能:直接 TCP 通信,开销最小
  • 低延迟:无 HTTP 协议开销
  • 可靠传输:TCP 保证数据可靠性
  • 自定义协议:可以定制传输格式

服务端配置

from bricks.rpc.common import serve
 
# Socket 服务
serve(
    HighPerformanceService(),
    mode="socket",
    ident=9090,
    concurrency=100,
    # Socket 特定参数
    backlog=1024,          # 连接队列长度
    nodelay=True,          # 禁用 Nagle 算法
    keepalive=True,        # 启用 TCP keepalive
    buffer_size=8192       # 缓冲区大小
)

客户端使用

from bricks.rpc.socket_.service import Client
 
client = Client("localhost:9090")
response = client.rpc("high_performance_method", large_data)

使用场景

  • 高频交易:金融系统的低延迟要求
  • 游戏服务器:实时游戏数据传输
  • IoT 设备:物联网设备通信
  • 内部服务:微服务间高性能通信

gRPC 协议

特性

  • 高性能:基于 HTTP/2,支持多路复用
  • 强类型:Protocol Buffers 序列化
  • 流式传输:支持客户端流、服务端流、双向流
  • 负载均衡:内置负载均衡支持

服务端配置

from bricks.rpc.common import serve
 
# gRPC 服务
serve(
    MicroService(),
    mode="grpc",
    ident=50051,
    concurrency=200,
    # gRPC 特定参数
    max_workers=200,       # 最大工作线程
    max_message_length=4*1024*1024,  # 最大消息长度
    compression="gzip"     # 压缩算法
)

客户端使用

from bricks.rpc.grpc_.service import Client
 
client = Client("localhost:50051")
response = client.rpc("process_batch", batch_data)

流式处理示例

class StreamingService:
    def process_stream(self, data_stream):
        """处理数据流"""
        results = []
        for data in data_stream:
            processed = self.process_single(data)
            results.append(processed)
            # 可以实现流式返回
            yield processed
        return results
    
    def process_single(self, data):
        """处理单个数据项"""
        return {"processed": data, "timestamp": time.time()}

使用场景

  • 微服务架构:服务间高效通信
  • 云原生应用:Kubernetes 环境
  • 大数据处理:流式数据处理
  • 企业级应用:高可靠性要求

Redis 协议

特性

  • 异步处理:基于消息队列的异步调用
  • 持久化:Redis 提供数据持久化
  • 分布式:天然支持分布式部署
  • 高可用:Redis 集群支持

服务端配置

from bricks.rpc.common import serve
 
# Redis 服务
serve(
    AsyncService(),
    mode="redis",
    ident="redis://localhost:6379/0",
    concurrency=50,
    # Redis 特定参数
    server_id="worker_001",    # 服务器标识
    queue_name="rpc_queue",    # 队列名称
    timeout=30,                # 消息超时
    max_retries=3              # 最大重试次数
)

客户端使用

from bricks.rpc.redis_.service import Client
 
# 指定服务器ID
client = Client("redis://localhost:6379/0?server_id=worker_001")
response = client.rpc("async_process", data)

分布式任务示例

class DistributedTaskService:
    def __init__(self):
        self.task_queue = []
    
    def submit_task(self, task_data):
        """提交任务"""
        task_id = str(uuid.uuid4())
        self.task_queue.append({
            "id": task_id,
            "data": task_data,
            "status": "pending",
            "created_at": time.time()
        })
        return {"task_id": task_id, "status": "submitted"}
    
    def get_task_status(self, task_id):
        """获取任务状态"""
        for task in self.task_queue:
            if task["id"] == task_id:
                return task
        return {"error": "Task not found"}

使用场景

  • 异步任务处理:后台任务队列
  • 分布式计算:多节点协同处理
  • 消息队列:系统间异步通信
  • 缓存服务:分布式缓存

协议选择指南

性能要求

# 高性能场景:Socket > gRPC > WebSocket > HTTP
if performance_critical:
    protocol = "socket"
elif high_throughput:
    protocol = "grpc"
elif realtime_required:
    protocol = "websocket"
else:
    protocol = "http"

部署环境

# 根据部署环境选择协议
if cloud_native:
    protocol = "grpc"  # 云原生环境
elif web_service:
    protocol = "http"  # Web 服务
elif internal_service:
    protocol = "socket"  # 内部服务
elif async_processing:
    protocol = "redis"  # 异步处理

开发复杂度

# 根据开发复杂度选择
if simple_api:
    protocol = "http"  # 简单 API
elif need_streaming:
    protocol = "grpc"  # 需要流式处理
elif need_push:
    protocol = "websocket"  # 需要推送
elif high_performance:
    protocol = "socket"  # 高性能要求

混合协议部署

多协议同时支持

import asyncio
from bricks.rpc.common import start_rpc_server
 
async def multi_protocol_server():
    """同时启动多种协议的服务"""
    service = MyService()
    
    # 启动多个协议服务
    tasks = [
        start_rpc_server(service, mode="http", ident=8080),
        start_rpc_server(service, mode="websocket", ident=8081),
        start_rpc_server(service, mode="grpc", ident=50051),
    ]
    
    await asyncio.gather(*tasks)
 
asyncio.run(multi_protocol_server())

协议网关

class ProtocolGateway:
    """协议网关,统一多种协议的访问"""
    
    def __init__(self):
        self.services = {
            "http": "http://localhost:8080",
            "websocket": "ws://localhost:8081",
            "grpc": "localhost:50051"
        }
    
    def route_request(self, protocol, method, *args, **kwargs):
        """根据协议路由请求"""
        if protocol == "http":
            from bricks.rpc.http_.service import Client
            client = Client(self.services["http"])
        elif protocol == "websocket":
            from bricks.rpc.websocket_.service import Client
            client = Client(self.services["websocket"])
        elif protocol == "grpc":
            from bricks.rpc.grpc_.service import Client
            client = Client(self.services["grpc"])
        else:
            raise ValueError(f"不支持的协议: {protocol}")
        
        return client.rpc(method, *args, **kwargs)
 
# 使用网关
gateway = ProtocolGateway()
result = gateway.route_request("http", "get_user", user_id=123)

通过合理选择和配置不同的协议,可以构建适应各种场景需求的高效 RPC 系统。