2.5.5 实际应用
本节通过具体的应用场景和完整示例,展示如何在实际项目中使用 Bricks
的 RPC 功能。
分布式爬虫系统
系统架构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Master │ │ Worker1 │ │ Worker2 │
│ 调度节点 │◄──►│ 工作节点 │ │ 工作节点 │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
└───────────────────┼───────────────────┘
│
┌─────────────┐
│ Storage │
│ 存储节点 │
└─────────────┘
Master 节点实现
# master.py
import time
import uuid
from typing import List, Dict
from bricks.rpc.common import serve
from bricks.rpc.http_.service import Client
class TaskManager:
"""任务管理器"""
def __init__(self):
self.tasks: Dict[str, dict] = {}
self.workers: List[str] = []
self.worker_clients: Dict[str, Client] = {}
def register_worker(self, worker_url: str):
"""注册工作节点"""
if worker_url not in self.workers:
self.workers.append(worker_url)
self.worker_clients[worker_url] = Client(worker_url)
print(f"工作节点已注册: {worker_url}")
return {"success": True, "worker_count": len(self.workers)}
def submit_crawl_task(self, urls: List[str], config: dict = None):
"""提交爬取任务"""
task_id = str(uuid.uuid4())
task = {
"id": task_id,
"urls": urls,
"config": config or {},
"status": "pending",
"created_at": time.time(),
"results": []
}
self.tasks[task_id] = task
# 分发任务到工作节点
self._distribute_task(task)
return {"task_id": task_id, "status": "submitted"}
def _distribute_task(self, task: dict):
"""分发任务到工作节点"""
if not self.workers:
print("没有可用的工作节点")
return
urls = task["urls"]
urls_per_worker = len(urls) // len(self.workers)
for i, worker_url in enumerate(self.workers):
start_idx = i * urls_per_worker
end_idx = start_idx + urls_per_worker if i < len(self.workers) - 1 else len(urls)
worker_urls = urls[start_idx:end_idx]
if worker_urls:
try:
client = self.worker_clients[worker_url]
response = client.rpc("crawl_urls", worker_urls, task["config"])
print(f"任务已分发到 {worker_url}: {len(worker_urls)} 个URL")
except Exception as e:
print(f"分发任务到 {worker_url} 失败: {e}")
def get_task_status(self, task_id: str):
"""获取任务状态"""
task = self.tasks.get(task_id)
if not task:
return {"error": "任务不存在"}
return {
"task_id": task_id,
"status": task["status"],
"url_count": len(task["urls"]),
"result_count": len(task["results"]),
"created_at": task["created_at"]
}
def collect_result(self, task_id: str, worker_url: str, results: List[dict]):
"""收集工作节点的结果"""
if task_id in self.tasks:
self.tasks[task_id]["results"].extend(results)
print(f"收到来自 {worker_url} 的 {len(results)} 条结果")
return {"success": True}
return {"error": "任务不存在"}
# 启动 Master 节点
if __name__ == "__main__":
manager = TaskManager()
serve(manager, mode="http", ident=8080)
Worker 节点实现
# worker.py
import requests
from typing import List
from bricks import Request
from bricks.spider import air
from bricks.rpc.common import serve
from bricks.rpc.http_.service import Client
class CrawlWorker:
"""爬取工作节点"""
def __init__(self, master_url: str, worker_port: int):
self.master_url = master_url
self.worker_port = worker_port
self.master_client = Client(master_url)
self.spider = self._create_spider()
def _create_spider(self):
"""创建爬虫实例"""
class WorkerSpider(air.Spider):
def make_request(self, context):
url = context.seeds["url"]
return Request(url=url)
def parse(self, context):
response = context.response
return {
"url": response.url,
"title": response.xpath("//title/text()").get(""),
"status_code": response.status_code,
"content_length": len(response.text)
}
return WorkerSpider()
def crawl_urls(self, urls: List[str], config: dict = None):
"""爬取URL列表"""
print(f"开始爬取 {len(urls)} 个URL")
results = []
for url in urls:
try:
# 使用爬虫爬取单个URL
context = self.spider.Context(
target=self.spider,
seeds={"url": url}
)
request = self.spider.make_request(context)
# 这里简化处理,实际应该使用完整的爬虫流程
response = requests.get(request.url, timeout=10)
result = {
"url": url,
"status_code": response.status_code,
"content_length": len(response.text),
"success": True
}
results.append(result)
except Exception as e:
results.append({
"url": url,
"error": str(e),
"success": False
})
print(f"爬取完成,成功: {sum(1 for r in results if r.get('success'))}")
return {"results": results, "worker_url": f"localhost:{self.worker_port}"}
def register_to_master(self):
"""向主节点注册"""
try:
response = self.master_client.rpc(
"register_worker",
f"localhost:{self.worker_port}"
)
print(f"注册到主节点成功: {response.data}")
except Exception as e:
print(f"注册到主节点失败: {e}")
# 启动 Worker 节点
if __name__ == "__main__":
import sys
master_url = "localhost:8080"
worker_port = int(sys.argv[1]) if len(sys.argv) > 1 else 8081
worker = CrawlWorker(master_url, worker_port)
# 启动服务后注册到主节点
def on_server_started(port):
worker.register_to_master()
serve(worker, mode="http", ident=worker_port, on_server_started=on_server_started)
客户端使用示例
# client.py
from bricks.rpc.http_.service import Client
import time
def main():
# 连接到 Master 节点
master = Client("localhost:8080")
# 提交爬取任务
urls = [
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml",
"https://httpbin.org/robots.txt"
]
response = master.rpc("submit_crawl_task", urls)
task_id = response.data.get("task_id")
print(f"任务已提交: {task_id}")
# 轮询任务状态
while True:
status_response = master.rpc("get_task_status", task_id)
status = status_response.data
print(f"任务状态: {status}")
if status.get("result_count", 0) >= len(urls):
print("任务完成!")
break
time.sleep(2)
if __name__ == "__main__":
main()
微服务架构示例
用户服务
# user_service.py
from bricks.rpc.common import serve
from bricks.rpc.grpc_.service import Client
class UserService:
"""用户服务"""
def __init__(self):
self.users = {
1: {"id": 1, "name": "张三", "email": "zhangsan@example.com"},
2: {"id": 2, "name": "李四", "email": "lisi@example.com"}
}
# 连接到其他服务
self.order_service = Client("localhost:50052")
def get_user(self, user_id: int):
"""获取用户信息"""
user = self.users.get(user_id)
if not user:
return {"error": "用户不存在"}
return user
def get_user_with_orders(self, user_id: int):
"""获取用户信息及其订单"""
user = self.get_user(user_id)
if "error" in user:
return user
# 调用订单服务
try:
orders_response = self.order_service.rpc("get_user_orders", user_id)
orders = orders_response.data if orders_response.code == 0 else []
user["orders"] = orders
except Exception as e:
user["orders"] = []
user["order_error"] = str(e)
return user
def create_user(self, name: str, email: str):
"""创建用户"""
user_id = max(self.users.keys()) + 1 if self.users else 1
user = {"id": user_id, "name": name, "email": email}
self.users[user_id] = user
return user
if __name__ == "__main__":
serve(UserService(), mode="grpc", ident=50051)
订单服务
# order_service.py
from bricks.rpc.common import serve
class OrderService:
"""订单服务"""
def __init__(self):
self.orders = {
1: [
{"id": 101, "amount": 99.99, "status": "completed"},
{"id": 102, "amount": 149.99, "status": "pending"}
],
2: [
{"id": 103, "amount": 79.99, "status": "completed"}
]
}
def get_user_orders(self, user_id: int):
"""获取用户订单"""
return self.orders.get(user_id, [])
def create_order(self, user_id: int, amount: float):
"""创建订单"""
order_id = 200 + len([o for orders in self.orders.values() for o in orders])
order = {"id": order_id, "amount": amount, "status": "pending"}
if user_id not in self.orders:
self.orders[user_id] = []
self.orders[user_id].append(order)
return order
if __name__ == "__main__":
serve(OrderService(), mode="grpc", ident=50052)
API 网关
# api_gateway.py
from bricks.rpc.common import serve
from bricks.rpc.grpc_.service import Client
class ApiGateway:
"""API 网关"""
def __init__(self):
self.user_service = Client("localhost:50051")
self.order_service = Client("localhost:50052")
def get_user_profile(self, user_id: int):
"""获取用户完整档案"""
try:
response = self.user_service.rpc("get_user_with_orders", user_id)
return response.data if response.code == 0 else {"error": response.message}
except Exception as e:
return {"error": f"服务调用失败: {e}"}
def create_user_order(self, user_id: int, amount: float):
"""为用户创建订单"""
# 先验证用户存在
user_response = self.user_service.rpc("get_user", user_id)
if user_response.code != 0:
return {"error": "用户不存在"}
# 创建订单
try:
order_response = self.order_service.rpc("create_order", user_id, amount)
return order_response.data if order_response.code == 0 else {"error": order_response.message}
except Exception as e:
return {"error": f"创建订单失败: {e}"}
if __name__ == "__main__":
serve(ApiGateway(), mode="http", ident=8080)
实时监控系统
监控服务
# monitor_service.py
import time
import psutil
import threading
from bricks.rpc.common import serve
class MonitorService:
"""系统监控服务"""
def __init__(self):
self.metrics = {}
self.subscribers = set()
self.running = True
# 启动监控线程
self.monitor_thread = threading.Thread(target=self._collect_metrics, daemon=True)
self.monitor_thread.start()
def _collect_metrics(self):
"""收集系统指标"""
while self.running:
self.metrics = {
"timestamp": time.time(),
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"network_io": psutil.net_io_counters()._asdict()
}
time.sleep(5)
def get_current_metrics(self):
"""获取当前指标"""
return self.metrics
def get_system_info(self):
"""获取系统信息"""
return {
"cpu_count": psutil.cpu_count(),
"memory_total": psutil.virtual_memory().total,
"disk_total": psutil.disk_usage('/').total,
"boot_time": psutil.boot_time()
}
def subscribe_metrics(self, client_id: str):
"""订阅指标推送"""
self.subscribers.add(client_id)
return {"subscribed": True, "client_id": client_id}
def unsubscribe_metrics(self, client_id: str):
"""取消订阅"""
self.subscribers.discard(client_id)
return {"unsubscribed": True, "client_id": client_id}
if __name__ == "__main__":
serve(MonitorService(), mode="websocket", ident=8080)
监控客户端
# monitor_client.py
import json
import time
from bricks.rpc.websocket_.service import Client
class MonitorClient:
"""监控客户端"""
def __init__(self, server_url: str):
self.client = Client(server_url)
self.client_id = f"client_{int(time.time())}"
def start_monitoring(self):
"""开始监控"""
try:
# 订阅指标
response = self.client.rpc("subscribe_metrics", self.client_id)
print(f"订阅成功: {response.data}")
# 获取系统信息
info_response = self.client.rpc("get_system_info")
print(f"系统信息: {json.dumps(info_response.data, indent=2)}")
# 持续获取指标
while True:
metrics_response = self.client.rpc("get_current_metrics")
if metrics_response.code == 0:
metrics = metrics_response.data
print(f"CPU: {metrics['cpu_percent']}%, "
f"内存: {metrics['memory_percent']}%, "
f"磁盘: {metrics['disk_percent']}%")
time.sleep(5)
except KeyboardInterrupt:
print("停止监控...")
finally:
# 取消订阅
self.client.rpc("unsubscribe_metrics", self.client_id)
self.client.close()
if __name__ == "__main__":
client = MonitorClient("ws://localhost:8080")
client.start_monitoring()
部署和运维
Docker 部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["python", "service.py"]
Docker Compose
# docker-compose.yml
version: "3.8"
services:
master:
build: .
command: python master.py
ports:
- "8080:8080"
environment:
- SERVICE_TYPE=master
worker1:
build: .
command: python worker.py 8081
ports:
- "8081:8081"
depends_on:
- master
environment:
- SERVICE_TYPE=worker
- MASTER_URL=master:8080
worker2:
build: .
command: python worker.py 8082
ports:
- "8082:8082"
depends_on:
- master
environment:
- SERVICE_TYPE=worker
- MASTER_URL=master:8080
redis:
image: redis:alpine
ports:
- "6379:6379"
健康检查
# health_check.py
from bricks.rpc.http_.service import Client
def health_check(service_url: str):
"""服务健康检查"""
try:
client = Client(service_url)
response = client.rpc("PING")
return response.code == 0 and response.data == "PONG"
except:
return False
def check_all_services():
"""检查所有服务"""
services = {
"master": "localhost:8080",
"worker1": "localhost:8081",
"worker2": "localhost:8082"
}
for name, url in services.items():
status = "健康" if health_check(url) else "异常"
print(f"{name}: {status}")
if __name__ == "__main__":
check_all_services()
爬虫 RPC 服务
使用 Rpc.wrap 包装爬虫
Bricks
提供了 Rpc.wrap
方法,可以轻松将爬虫包装成 RPC 服务:
# spider_rpc_server.py
from loguru import logger
from bricks.spider.addon import Rpc
from bricks.spider import air
from bricks import Request
class MySpider(air.Spider):
"""示例爬虫"""
def make_seeds(self, context, **kwargs):
"""生成种子数据"""
return [{"page": i} for i in range(1, 6)]
def make_request(self, context) -> Request:
"""构造请求"""
page = context.seeds["page"]
return Request(
url=f"https://httpbin.org/json?page={page}",
headers={
'User-Agent': 'Bricks Spider 1.0'
}
)
def parse(self, context):
"""解析响应"""
response = context.response
return response.extract(
engine="json",
rules={
"args": {
"page": "page"
},
"headers": {
"user_agent": "User-Agent"
}
}
)
def item_pipeline(self, context):
"""处理数据"""
items = context.items
logger.info(f"处理了 {len(items)} 条数据")
# 确认处理完成
context.success()
def on_finish(ctx: MySpider.Context, err: Exception):
"""爬虫完成回调"""
if err:
logger.error(f"爬虫执行出错: {err}")
else:
logger.info(f"爬虫执行完成: {ctx.response.url}")
if __name__ == '__main__':
# 包装爬虫为 RPC 服务
rpc = Rpc.wrap(MySpider)
# 添加完成回调
rpc.with_callback(on_finish)
# 启动 RPC 服务
rpc.serve(
concurrency=10,
mode="http", # 可选: http, websocket, socket, grpc, redis
ident=8080
)
RPC 爬虫客户端调用
# spider_rpc_client.py
from bricks.rpc.http_.service import Client
import json
def test_spider_rpc():
"""测试爬虫 RPC 调用"""
client = Client("localhost:8080")
# 调用爬虫执行方法
response = client.rpc("execute", {"custom_param": "test_value"})
if response.code == 0:
result = json.loads(response.data)
print(f"爬虫执行成功: {result}")
else:
print(f"爬虫执行失败: {response.message}")
def test_spider_with_config():
"""测试带配置的爬虫调用"""
client = Client("localhost:8080")
# 传递配置参数
config = {
"proxy": "http://127.0.0.1:8080",
"timeout": 30,
"max_retry": 3
}
response = client.rpc("execute", config)
print(f"响应: {response.data}")
if __name__ == '__main__':
test_spider_rpc()
test_spider_with_config()
分布式爬虫任务调度
# distributed_crawler.py
from bricks.spider.addon import Rpc
from bricks.spider import air
from bricks.rpc.http_.service import Client
import threading
import time
class DistributedCrawler:
"""分布式爬虫调度器"""
def __init__(self):
self.workers = [] # 工作节点列表
self.task_queue = [] # 任务队列
self.results = [] # 结果收集
def register_worker(self, worker_url: str):
"""注册工作节点"""
self.workers.append(worker_url)
print(f"工作节点已注册: {worker_url}")
def submit_crawl_task(self, urls: list, spider_config: dict = None):
"""提交爬取任务"""
task_id = int(time.time())
# 将URL分配给不同的工作节点
chunk_size = len(urls) // len(self.workers) if self.workers else len(urls)
for i, worker_url in enumerate(self.workers):
start_idx = i * chunk_size
end_idx = start_idx + chunk_size if i < len(self.workers) - 1 else len(urls)
worker_urls = urls[start_idx:end_idx]
if worker_urls:
# 异步分发任务
thread = threading.Thread(
target=self._dispatch_task,
args=(worker_url, worker_urls, spider_config, task_id)
)
thread.start()
return {"task_id": task_id, "worker_count": len(self.workers)}
def _dispatch_task(self, worker_url: str, urls: list, config: dict, task_id: int):
"""分发任务到工作节点"""
try:
client = Client(worker_url)
# 构造爬虫配置
spider_config = {
"urls": urls,
"task_id": task_id,
**(config or {})
}
response = client.rpc("execute", spider_config)
if response.code == 0:
print(f"任务 {task_id} 在 {worker_url} 执行成功")
self.results.append({
"task_id": task_id,
"worker": worker_url,
"result": response.data
})
else:
print(f"任务 {task_id} 在 {worker_url} 执行失败: {response.message}")
except Exception as e:
print(f"分发任务到 {worker_url} 失败: {e}")
# 启动分布式调度器
if __name__ == '__main__':
scheduler = DistributedCrawler()
# 注册工作节点
scheduler.register_worker("localhost:8081")
scheduler.register_worker("localhost:8082")
# 提交爬取任务
urls = [
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml",
"https://httpbin.org/robots.txt"
]
result = scheduler.submit_crawl_task(urls, {"timeout": 30})
print(f"任务已提交: {result}")
Redis 队列模式爬虫
# redis_queue_spider.py
from bricks.spider.addon import Rpc
from bricks.spider import air
from bricks import Request
import json
class QueueSpider(air.Spider):
"""队列模式爬虫"""
def make_seeds(self, context, **kwargs):
"""从参数中获取种子"""
urls = kwargs.get("urls", [])
return [{"url": url} for url in urls]
def make_request(self, context) -> Request:
"""构造请求"""
url = context.seeds["url"]
return Request(url=url)
def parse(self, context):
"""解析响应"""
response = context.response
return {
"url": response.url,
"status_code": response.status_code,
"title": response.xpath("//title/text()").get(""),
"content_length": len(response.text)
}
def item_pipeline(self, context):
"""处理结果"""
items = context.items
print(f"处理URL: {context.seeds['url']}, 结果: {items}")
context.success()
if __name__ == '__main__':
# 使用 Redis 队列模式
rpc = Rpc.wrap(QueueSpider)
rpc.serve(
concurrency=20,
mode="redis",
ident="redis://localhost:6379/0",
server_id="crawler_worker_001"
)
Redis 队列客户端
# redis_queue_client.py
from bricks.rpc.redis_.service import Client
import time
def submit_crawl_jobs():
"""提交爬取任务到 Redis 队列"""
client = Client("redis://localhost:6379/0?server_id=crawler_worker_001")
# 提交多个爬取任务
urls = [
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml"
]
for url in urls:
response = client.rpc("execute", {"urls": [url]})
print(f"任务已提交: {url}, 响应: {response.data}")
time.sleep(1)
if __name__ == '__main__':
submit_crawl_jobs()
命令行工具集成
使用 RpcProxy 运行器
# runner_example.py
from bricks.client.runner import RpcProxy
from bricks.spider import air
from bricks import Request
class CommandSpider(air.Spider):
"""命令行爬虫"""
def make_seeds(self, context, **kwargs):
# 从命令行参数获取URL
urls = kwargs.get("urls", ["https://httpbin.org/json"])
return [{"url": url} for url in urls]
def make_request(self, context) -> Request:
return Request(url=context.seeds["url"])
def parse(self, context):
response = context.response
return response.json()
def item_pipeline(self, context):
print(f"结果: {context.items}")
context.success()
def main(urls=None, **kwargs):
"""主函数"""
urls = urls or ["https://httpbin.org/json"]
spider = CommandSpider()
# 创建 RPC 代理
proxy = RpcProxy(lambda: spider.run(urls=urls))
# 绑定爬虫对象
proxy.bind(spider)
# 注册自定义方法
proxy.register_adapter("get_status", lambda: "爬虫运行中")
proxy.register_adapter("stop_spider", lambda: proxy.stop(delay=2))
return proxy
if __name__ == '__main__':
# 启动 RPC 服务
proxy = main()
proxy.start(mode="http", ident=8080)
命令行调用示例
# 启动爬虫 RPC 服务
python runner_example.py
# 在另一个终端调用
curl -X POST http://localhost:8080/rpc \
-H "Content-Type: application/json" \
-d '{
"method": "get_status",
"data": "{\"args\": [], \"kwargs\": {}}"
}'
# 停止服务
curl -X POST http://localhost:8080/rpc \
-H "Content-Type: application/json" \
-d '{
"method": "stop_spider",
"data": "{\"args\": [], \"kwargs\": {}}"
}'
性能监控和调优
RPC 性能监控
# rpc_monitor.py
import time
import threading
from collections import defaultdict
from bricks.rpc.common import BaseRpcService, RpcRequest, RpcResponse
class MonitoredRpcService(BaseRpcService):
"""带监控的 RPC 服务"""
def __init__(self):
super().__init__()
self.metrics = defaultdict(list)
self.request_count = 0
self.error_count = 0
async def process_rpc_request(self, request: RpcRequest) -> RpcResponse:
"""处理请求并记录指标"""
start_time = time.time()
self.request_count += 1
try:
response = await super().process_rpc_request(request)
# 记录成功指标
duration = time.time() - start_time
self.metrics[request.method].append({
"duration": duration,
"success": response.code == 0,
"timestamp": start_time
})
return response
except Exception as e:
self.error_count += 1
duration = time.time() - start_time
self.metrics[request.method].append({
"duration": duration,
"success": False,
"error": str(e),
"timestamp": start_time
})
raise
def get_metrics(self):
"""获取性能指标"""
summary = {}
for method, records in self.metrics.items():
if records:
durations = [r["duration"] for r in records]
success_count = sum(1 for r in records if r["success"])
summary[method] = {
"total_calls": len(records),
"success_calls": success_count,
"error_calls": len(records) - success_count,
"avg_duration": sum(durations) / len(durations),
"max_duration": max(durations),
"min_duration": min(durations)
}
return {
"total_requests": self.request_count,
"total_errors": self.error_count,
"methods": summary
}
# 使用监控服务
service = MonitoredRpcService()
service.bind_target(MySpider())
负载测试
# load_test.py
import asyncio
import aiohttp
import time
from concurrent.futures import ThreadPoolExecutor
async def load_test_async(url: str, concurrent: int, total_requests: int):
"""异步负载测试"""
async def single_request(session, request_id):
try:
start_time = time.time()
async with session.post(
f"{url}/rpc",
json={
"method": "PING",
"data": '{"args": [], "kwargs": {}}',
"request_id": f"req_{request_id}"
}
) as response:
duration = time.time() - start_time
status = response.status
return {"success": status == 200, "duration": duration}
except Exception as e:
return {"success": False, "error": str(e)}
# 创建连接池
connector = aiohttp.TCPConnector(limit=concurrent)
async with aiohttp.ClientSession(connector=connector) as session:
# 分批发送请求
results = []
for batch_start in range(0, total_requests, concurrent):
batch_size = min(concurrent, total_requests - batch_start)
tasks = [
single_request(session, batch_start + i)
for i in range(batch_size)
]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
print(f"完成 {len(results)}/{total_requests} 请求")
# 统计结果
success_count = sum(1 for r in results if r["success"])
durations = [r["duration"] for r in results if "duration" in r]
print(f"总请求数: {total_requests}")
print(f"成功请求: {success_count}")
print(f"失败请求: {total_requests - success_count}")
print(f"平均响应时间: {sum(durations) / len(durations):.3f}s")
print(f"最大响应时间: {max(durations):.3f}s")
print(f"最小响应时间: {min(durations):.3f}s")
if __name__ == '__main__':
asyncio.run(load_test_async("http://localhost:8080", concurrent=50, total_requests=1000))
通过这些实际应用示例,可以看到 Bricks
RPC 功能在构建分布式系统、微服务架构、实时监控系统以及爬虫服务中的强大能力。无论是简单的服务调用还是复杂的分布式架构,都能通过 RPC 功能轻松实现。