2.5.5 爬虫 API 化
将爬虫转换为 API 服务是Bricks
框架的一个重要特色功能。通过简单的配置,就可以将任何爬虫转换为可远程调用的 RPC 服务,实现爬虫的服务化部署和调用。
核心概念
什么是爬虫 API 化
爬虫 API 化是指将传统的本地运行爬虫转换为可通过网络调用的 API 服务。这样做的好处包括:
- 服务化部署:爬虫作为独立服务运行,便于管理和扩展
- 远程调用:其他系统可以通过 API 调用爬虫功能
- 资源隔离:爬虫运行在独立环境中,不影响调用方
- 负载均衡:可以部署多个爬虫实例,实现负载分担
- 统一接口:提供标准化的调用接口
Rpc.wrap 方法
Bricks
提供了Rpc.wrap
方法,可以轻松将任何爬虫类包装成 RPC 服务:
from bricks.spider.addon import Rpc
# 包装爬虫为RPC服务
rpc = Rpc.wrap(MySpider)
# 启动RPC服务
rpc.serve(concurrency=10, mode="http", ident=8080)
基础爬虫 API 化
1. 定义爬虫类
首先,我们需要一个标准的爬虫类:
# spider_api.py
from loguru import logger
from bricks import Request, const
from bricks.core import events, signals
from bricks.spider import air
from bricks.spider.air import Context
class ApiSpider(air.Spider):
"""API化的爬虫示例"""
def make_seeds(self, context: Context, **kwargs):
"""
生成种子数据
kwargs 参数来自RPC调用时传入的参数
"""
# 从RPC调用参数中获取页面范围
start_page = kwargs.get("start_page", 1)
end_page = kwargs.get("end_page", 5)
# 生成页面种子
seeds = []
for page in range(start_page, end_page + 1):
seeds.append({
"page": page,
"category": kwargs.get("category", "default")
})
logger.info(f"生成了 {len(seeds)} 个种子")
return seeds
def make_request(self, context: Context) -> Request:
"""构造请求"""
seeds = context.seeds
page = seeds["page"]
category = seeds.get("category", "default")
# 根据不同类别构造不同的请求
if category == "music":
return Request(
url="https://fx1.service.kugou.com/mfanxing-home/h5/cdn/room/index/list_v2",
params={"page": page, "cid": 6000},
headers={
'User-Agent': "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
}
)
else:
return Request(
url=f"https://httpbin.org/json?page={page}&category={category}",
headers={
'User-Agent': "Bricks Spider API 1.0"
}
)
def parse(self, context: Context):
"""解析响应"""
response = context.response
seeds = context.seeds
category = seeds.get("category", "default")
if category == "music":
# 解析音乐数据
return response.extract(
engine="json",
rules={
"data.list": {
"userId": "userId",
"roomId": "roomId",
"score": "score",
"startTime": "startTime",
"status": "status",
}
}
)
else:
# 解析通用JSON数据
return response.extract(
engine="json",
rules={
"args": {
"page": "page",
"category": "category"
},
"headers": {
"user_agent": "User-Agent"
}
}
)
def item_pipeline(self, context: Context):
"""处理数据"""
items = context.items
logger.info(f"处理了 {len(items)} 条数据: {items}")
# 标记种子处理完成
context.success()
@staticmethod
@events.on(const.AFTER_REQUEST)
def validate_response(context: Context):
"""验证响应是否成功"""
response = context.response
# 检查HTTP状态码
if response.status_code >= 400:
logger.warning(f"HTTP错误: {response.status_code}")
raise signals.Retry
# 检查响应内容
if context.seeds.get("category") == "music":
try:
data = response.json()
if data.get("code") != 0:
logger.warning(f"业务错误: {data.get('message')}")
raise signals.Retry
except:
logger.warning("响应不是有效的JSON")
raise signals.Retry
2. 启动 API 服务
# api_server.py
from bricks.spider.addon import Rpc
from spider_api import ApiSpider
def on_crawl_finish(ctx: ApiSpider.Context, err: Exception):
"""爬虫完成回调"""
if err:
logger.error(f"爬虫执行出错: {err}")
else:
logger.info(f"爬虫执行完成,处理了种子: {ctx.seeds}")
if __name__ == '__main__':
# 包装爬虫为RPC服务
rpc = Rpc.wrap(ApiSpider)
# 添加完成回调
rpc.with_callback(on_crawl_finish)
# 启动HTTP RPC服务
rpc.serve(
concurrency=10, # 并发数
mode="http", # 协议模式
ident=8080 # 端口
)
3. 客户端调用
# api_client.py
from bricks.rpc.http_.service import Client
import json
def test_spider_api():
"""测试爬虫API"""
client = Client("localhost:8080")
# 调用爬虫执行方法
response = client.rpc("execute", {
"start_page": 1,
"end_page": 3,
"category": "music"
})
if response.code == 0:
result = json.loads(response.data)
print(f"爬虫执行成功: {result}")
else:
print(f"爬虫执行失败: {response.message}")
def test_custom_category():
"""测试自定义分类"""
client = Client("localhost:8080")
response = client.rpc("execute", {
"start_page": 1,
"end_page": 2,
"category": "test",
"custom_param": "value"
})
print(f"响应: {response.data}")
if __name__ == '__main__':
test_spider_api()
test_custom_category()
高级 API 化功能
1. 动态参数配置
class ConfigurableSpider(air.Spider):
"""支持动态配置的爬虫"""
def make_seeds(self, context: Context, **kwargs):
"""根据配置生成种子"""
# 获取URL列表
urls = kwargs.get("urls", [])
if urls:
return [{"url": url} for url in urls]
# 或者根据其他参数生成
base_url = kwargs.get("base_url", "https://httpbin.org")
pages = kwargs.get("pages", [1, 2, 3])
return [{"url": f"{base_url}/json?page={page}"} for page in pages]
def make_request(self, context: Context) -> Request:
"""构造请求"""
seeds = context.seeds
url = seeds["url"]
# 从种子中获取请求配置
headers = seeds.get("headers", {})
params = seeds.get("params", {})
proxy = seeds.get("proxy")
return Request(
url=url,
headers=headers,
params=params,
proxy=proxy,
timeout=seeds.get("timeout", 30)
)
def parse(self, context: Context):
"""解析响应"""
response = context.response
# 根据URL判断解析方式
if "json" in response.url:
return response.json()
elif "html" in response.url:
return {
"title": response.xpath("//title/text()").get(""),
"content_length": len(response.text)
}
else:
return {"content": response.text[:100]}
def item_pipeline(self, context: Context):
"""处理数据"""
items = context.items
logger.info(f"处理数据: {items}")
context.success()
# 启动可配置爬虫API
if __name__ == '__main__':
rpc = Rpc.wrap(ConfigurableSpider)
rpc.serve(concurrency=10, mode="http", ident=8081)
2. 批量 URL 处理
# batch_client.py
from bricks.rpc.http_.service import Client
def batch_crawl_urls():
"""批量爬取URL"""
client = Client("localhost:8081")
# 批量URL配置
config = {
"urls": [
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml"
],
"headers": {
"User-Agent": "Batch Crawler 1.0"
},
"timeout": 30
}
response = client.rpc("execute", config)
if response.code == 0:
print("批量爬取成功")
result = json.loads(response.data)
print(f"结果: {result}")
else:
print(f"批量爬取失败: {response.message}")
if __name__ == '__main__':
batch_crawl_urls()
3. 异步 API 调用
# async_client.py
import asyncio
import aiohttp
import json
class AsyncSpiderClient:
"""异步爬虫API客户端"""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
async def execute_spider(self, session, config):
"""异步执行爬虫"""
request_data = {
"method": "execute",
"data": json.dumps({"args": [], "kwargs": config}),
"request_id": f"req_{id(config)}"
}
async with session.post(
f"{self.base_url}/rpc",
json=request_data
) as response:
result = await response.json()
return result
async def batch_execute(self, configs):
"""批量异步执行"""
async with aiohttp.ClientSession() as session:
tasks = [
self.execute_spider(session, config)
for config in configs
]
results = await asyncio.gather(*tasks)
return results
async def test_async_batch():
"""测试异步批量调用"""
client = AsyncSpiderClient("http://localhost:8081")
# 多个配置
configs = [
{"urls": ["https://httpbin.org/json"], "category": "json"},
{"urls": ["https://httpbin.org/html"], "category": "html"},
{"urls": ["https://httpbin.org/xml"], "category": "xml"}
]
results = await client.batch_execute(configs)
for i, result in enumerate(results):
print(f"任务 {i+1} 结果: {result}")
if __name__ == '__main__':
asyncio.run(test_async_batch())
多协议 API 服务
1. 同时支持多种协议
# multi_protocol_server.py
import asyncio
from bricks.rpc.common import start_rpc_server
from spider_api import ApiSpider
async def start_multi_protocol():
"""启动多协议API服务"""
spider_class = ApiSpider
# 同时启动多种协议服务
tasks = [
start_rpc_server(spider_class, mode="http", ident=8080),
start_rpc_server(spider_class, mode="websocket", ident=8081),
start_rpc_server(spider_class, mode="grpc", ident=50051),
]
print("启动多协议爬虫API服务...")
print("HTTP: http://localhost:8080")
print("WebSocket: ws://localhost:8081")
print("gRPC: localhost:50051")
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(start_multi_protocol())
2. 协议特定的客户端
# protocol_clients.py
import json
def test_http_api():
"""测试HTTP API"""
from bricks.rpc.http_.service import Client
client = Client("localhost:8080")
response = client.rpc("execute", {"start_page": 1, "end_page": 2})
print(f"HTTP响应: {response.data}")
def test_websocket_api():
"""测试WebSocket API"""
from bricks.rpc.websocket_.service import Client
client = Client("localhost:8081")
try:
response = client.rpc("execute", {"start_page": 1, "end_page": 2})
print(f"WebSocket响应: {response.data}")
finally:
client.close()
def test_grpc_api():
"""测试gRPC API"""
from bricks.rpc.grpc_.service import Client
client = Client("localhost:50051")
response = client.rpc("execute", {"start_page": 1, "end_page": 2})
print(f"gRPC响应: {response.data}")
if __name__ == '__main__':
print("测试不同协议的爬虫API...")
test_http_api()
test_websocket_api()
test_grpc_api()
生产环境部署
1. Docker 化部署
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install -r requirements.txt
# 复制代码
COPY . .
# 暴露端口
EXPOSE 8080
# 启动命令
CMD ["python", "api_server.py"]
2. 配置文件管理
# config.py
import os
from dataclasses import dataclass
@dataclass
class SpiderConfig:
"""爬虫配置"""
concurrency: int = int(os.getenv("SPIDER_CONCURRENCY", "10"))
mode: str = os.getenv("SPIDER_MODE", "http")
port: int = int(os.getenv("SPIDER_PORT", "8080"))
proxy: str = os.getenv("SPIDER_PROXY", "")
timeout: int = int(os.getenv("SPIDER_TIMEOUT", "30"))
max_retry: int = int(os.getenv("SPIDER_MAX_RETRY", "3"))
# production_server.py
from bricks.spider.addon import Rpc
from spider_api import ApiSpider
from config import SpiderConfig
def create_production_spider():
"""创建生产环境爬虫"""
config = SpiderConfig()
# 配置爬虫
spider_class = type('ProductionSpider', (ApiSpider,), {
'proxy': config.proxy,
'timeout': config.timeout,
'max_retry': config.max_retry
})
return spider_class
if __name__ == '__main__':
config = SpiderConfig()
spider_class = create_production_spider()
rpc = Rpc.wrap(spider_class)
rpc.serve(
concurrency=config.concurrency,
mode=config.mode,
ident=config.port
)
3. 健康检查和监控
# health_check.py
from bricks.rpc.http_.service import Client
import time
import json
def health_check(api_url: str):
"""健康检查"""
try:
client = Client(api_url)
# 发送PING请求
response = client.rpc("PING")
if response.code == 0 and response.data == "PONG":
return {"status": "healthy", "timestamp": time.time()}
else:
return {"status": "unhealthy", "error": response.message}
except Exception as e:
return {"status": "error", "error": str(e)}
def monitor_spider_api():
"""监控爬虫API"""
api_url = "localhost:8080"
while True:
health = health_check(api_url)
print(f"健康检查: {json.dumps(health, indent=2)}")
if health["status"] != "healthy":
# 发送告警
print("⚠️ 爬虫API异常!")
time.sleep(30) # 30秒检查一次
if __name__ == '__main__':
monitor_spider_api()
API 参数传递机制
1. 参数传递流程
当通过 RPC 调用爬虫时,参数传递流程如下:
RPC调用参数 → make_seeds(**kwargs) → 种子数据 → 爬虫执行流程
2. 参数类型和用法
class ParameterSpider(air.Spider):
"""演示参数传递的爬虫"""
def make_seeds(self, context: Context, **kwargs):
"""
kwargs 包含所有RPC调用时传入的参数
"""
# 基础参数
urls = kwargs.get("urls", [])
pages = kwargs.get("pages", [1])
# 配置参数
proxy = kwargs.get("proxy")
timeout = kwargs.get("timeout", 30)
headers = kwargs.get("headers", {})
# 业务参数
category = kwargs.get("category", "default")
filters = kwargs.get("filters", {})
# 生成种子时将参数传递下去
seeds = []
for page in pages:
seed = {
"page": page,
"category": category,
"proxy": proxy,
"timeout": timeout,
"headers": headers,
"filters": filters
}
seeds.append(seed)
return seeds
def make_request(self, context: Context) -> Request:
"""使用种子中的参数构造请求"""
seeds = context.seeds
return Request(
url=f"https://api.example.com/data?page={seeds['page']}&category={seeds['category']}",
headers=seeds.get("headers", {}),
proxy=seeds.get("proxy"),
timeout=seeds.get("timeout", 30)
)
# 客户端调用示例
client = Client("localhost:8080")
response = client.rpc("execute", {
"pages": [1, 2, 3],
"category": "electronics",
"proxy": "http://proxy.example.com:8080",
"timeout": 60,
"headers": {
"User-Agent": "Custom Spider 1.0",
"Authorization": "Bearer token123"
},
"filters": {
"min_price": 100,
"max_price": 1000
}
})
错误处理和重试机制
1. API 级别的错误处理
class RobustSpider(air.Spider):
"""具有健壮错误处理的爬虫"""
def make_seeds(self, context: Context, **kwargs):
"""验证参数并生成种子"""
# 参数验证
required_params = kwargs.get("required_params", [])
for param in required_params:
if param not in kwargs:
raise ValueError(f"缺少必需参数: {param}")
urls = kwargs.get("urls", [])
if not urls:
raise ValueError("urls参数不能为空")
return [{"url": url, "retry_count": 0} for url in urls]
def make_request(self, context: Context) -> Request:
"""构造请求"""
seeds = context.seeds
return Request(
url=seeds["url"],
retry=seeds.get("retry_count", 0),
max_retry=3
)
def parse(self, context: Context):
"""解析响应"""
response = context.response
# 检查响应状态
if response.status_code >= 400:
raise Exception(f"HTTP错误: {response.status_code}")
try:
data = response.json()
if not data:
raise Exception("响应数据为空")
return data
except json.JSONDecodeError:
raise Exception("响应不是有效的JSON格式")
def item_pipeline(self, context: Context):
"""处理数据"""
items = context.items
# 数据验证
if not items:
logger.warning("没有提取到数据")
logger.info(f"成功处理 {len(items)} 条数据")
context.success()
@staticmethod
@events.on(const.AFTER_REQUEST)
def handle_errors(context: Context):
"""统一错误处理"""
response = context.response
# 根据状态码决定是否重试
if response.status_code == 429: # 限流
logger.warning("遇到限流,等待后重试")
time.sleep(5)
raise signals.Retry
elif response.status_code >= 500: # 服务器错误
logger.warning("服务器错误,重试")
raise signals.Retry
elif response.status_code == 403: # 禁止访问
logger.error("访问被禁止,跳过")
raise signals.Drop
# 带错误处理的API服务
def create_robust_api():
"""创建健壮的API服务"""
def error_callback(ctx: RobustSpider.Context, err: Exception):
"""错误回调"""
if err:
logger.error(f"爬虫执行失败: {err}")
# 可以在这里发送告警、记录日志等
else:
logger.info("爬虫执行成功")
rpc = Rpc.wrap(RobustSpider)
rpc.with_callback(error_callback)
return rpc
if __name__ == '__main__':
api = create_robust_api()
api.serve(concurrency=10, mode="http", ident=8080)
2. 客户端错误处理
# robust_client.py
from bricks.rpc.http_.service import Client
import json
import time
class RobustApiClient:
"""健壮的API客户端"""
def __init__(self, api_url: str, max_retries: int = 3):
self.client = Client(api_url)
self.max_retries = max_retries
def execute_with_retry(self, config: dict):
"""带重试的执行"""
last_error = None
for attempt in range(self.max_retries + 1):
try:
response = self.client.rpc("execute", config)
if response.code == 0:
return json.loads(response.data)
else:
last_error = response.message
logger.warning(f"尝试 {attempt + 1} 失败: {last_error}")
except Exception as e:
last_error = str(e)
logger.warning(f"尝试 {attempt + 1} 异常: {last_error}")
# 如果不是最后一次尝试,等待后重试
if attempt < self.max_retries:
wait_time = 2 ** attempt # 指数退避
logger.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
raise Exception(f"执行失败,已重试 {self.max_retries} 次: {last_error}")
def validate_config(self, config: dict):
"""验证配置"""
required_fields = ["urls"]
for field in required_fields:
if field not in config:
raise ValueError(f"配置缺少必需字段: {field}")
if not isinstance(config["urls"], list):
raise ValueError("urls必须是列表类型")
if not config["urls"]:
raise ValueError("urls不能为空")
# 使用示例
def safe_api_call():
"""安全的API调用"""
client = RobustApiClient("localhost:8080", max_retries=3)
config = {
"urls": ["https://httpbin.org/json", "https://httpbin.org/html"],
"timeout": 30,
"headers": {"User-Agent": "Robust Client 1.0"}
}
try:
# 验证配置
client.validate_config(config)
# 执行爬虫
result = client.execute_with_retry(config)
print(f"执行成功: {result}")
except Exception as e:
print(f"执行失败: {e}")
if __name__ == '__main__':
safe_api_call()
性能优化和监控
1. 性能监控
# performance_monitor.py
import time
import threading
from collections import defaultdict
from bricks.spider.addon import Rpc
from bricks.spider import air
class MonitoredSpider(air.Spider):
"""带性能监控的爬虫"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = defaultdict(list)
self.start_time = None
def make_seeds(self, context, **kwargs):
"""记录开始时间"""
self.start_time = time.time()
urls = kwargs.get("urls", ["https://httpbin.org/json"])
return [{"url": url} for url in urls]
def make_request(self, context):
"""记录请求指标"""
seeds = context.seeds
request_start = time.time()
seeds["request_start"] = request_start
return Request(url=seeds["url"])
def parse(self, context):
"""记录解析指标"""
response = context.response
seeds = context.seeds
# 计算请求耗时
request_duration = time.time() - seeds["request_start"]
self.metrics["request_duration"].append(request_duration)
# 记录响应大小
response_size = len(response.content)
self.metrics["response_size"].append(response_size)
return response.json()
def item_pipeline(self, context):
"""记录处理指标"""
items = context.items
# 记录数据量
self.metrics["items_count"].append(len(items))
# 计算总耗时
if self.start_time:
total_duration = time.time() - self.start_time
self.metrics["total_duration"].append(total_duration)
context.success()
def get_metrics(self):
"""获取性能指标"""
if not self.metrics:
return {"message": "暂无指标数据"}
summary = {}
for metric_name, values in self.metrics.items():
if values:
summary[metric_name] = {
"count": len(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
"total": sum(values)
}
return summary
# 创建监控API
class MonitoringApi:
"""监控API包装器"""
def __init__(self, spider_class):
self.spider_instance = None
self.spider_class = spider_class
def execute(self, **kwargs):
"""执行爬虫并返回结果"""
self.spider_instance = self.spider_class()
# 执行爬虫
context = self.spider_instance.Context(
target=self.spider_instance,
seeds={}
)
# 这里简化了执行流程,实际应该使用完整的爬虫运行机制
seeds = self.spider_instance.make_seeds(context, **kwargs)
results = []
for seed in seeds:
context.seeds = seed
request = self.spider_instance.make_request(context)
# 模拟请求执行...
return {"status": "completed", "results": results}
def get_metrics(self):
"""获取性能指标"""
if self.spider_instance:
return self.spider_instance.get_metrics()
return {"message": "爬虫尚未执行"}
# 启动监控API
if __name__ == '__main__':
api = MonitoringApi(MonitoredSpider)
rpc = Rpc.wrap(api)
rpc.serve(concurrency=10, mode="http", ident=8080)
2. 负载均衡和扩展
# load_balancer.py
import random
from bricks.rpc.http_.service import Client
class SpiderLoadBalancer:
"""爬虫负载均衡器"""
def __init__(self, spider_nodes: list):
self.nodes = spider_nodes
self.clients = {node: Client(node) for node in spider_nodes}
self.node_stats = {node: {"requests": 0, "errors": 0} for node in spider_nodes}
def select_node(self, strategy="round_robin"):
"""选择节点"""
if strategy == "random":
return random.choice(self.nodes)
elif strategy == "least_requests":
return min(self.nodes, key=lambda n: self.node_stats[n]["requests"])
else: # round_robin
# 简单的轮询实现
if not hasattr(self, "_current_index"):
self._current_index = 0
node = self.nodes[self._current_index]
self._current_index = (self._current_index + 1) % len(self.nodes)
return node
def execute_spider(self, config: dict, strategy="round_robin"):
"""执行爬虫任务"""
node = self.select_node(strategy)
client = self.clients[node]
try:
self.node_stats[node]["requests"] += 1
response = client.rpc("execute", config)
if response.code == 0:
return {
"success": True,
"data": response.data,
"node": node
}
else:
self.node_stats[node]["errors"] += 1
return {
"success": False,
"error": response.message,
"node": node
}
except Exception as e:
self.node_stats[node]["errors"] += 1
return {
"success": False,
"error": str(e),
"node": node
}
def get_stats(self):
"""获取节点统计"""
return self.node_stats
# 使用负载均衡器
def test_load_balancer():
"""测试负载均衡"""
# 假设有3个爬虫节点
nodes = [
"localhost:8080",
"localhost:8081",
"localhost:8082"
]
balancer = SpiderLoadBalancer(nodes)
# 执行多个任务
for i in range(10):
config = {
"urls": [f"https://httpbin.org/json?id={i}"],
"timeout": 30
}
result = balancer.execute_spider(config, strategy="least_requests")
print(f"任务 {i}: {result}")
# 查看统计
print("节点统计:", balancer.get_stats())
if __name__ == '__main__':
test_load_balancer()
最佳实践
1. 配置管理
# spider_config.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
@dataclass
class SpiderApiConfig:
"""爬虫API配置"""
# 服务配置
host: str = "0.0.0.0"
port: int = 8080
mode: str = "http"
concurrency: int = 10
# 爬虫配置
default_timeout: int = 30
max_retry: int = 3
default_headers: Dict[str, str] = field(default_factory=dict)
# 限制配置
max_urls_per_request: int = 100
max_pages_per_request: int = 50
# 安全配置
allowed_domains: List[str] = field(default_factory=list)
blocked_domains: List[str] = field(default_factory=list)
@classmethod
def from_env(cls):
"""从环境变量创建配置"""
import os
return cls(
host=os.getenv("SPIDER_HOST", "0.0.0.0"),
port=int(os.getenv("SPIDER_PORT", "8080")),
mode=os.getenv("SPIDER_MODE", "http"),
concurrency=int(os.getenv("SPIDER_CONCURRENCY", "10")),
default_timeout=int(os.getenv("SPIDER_TIMEOUT", "30")),
max_retry=int(os.getenv("SPIDER_MAX_RETRY", "3"))
)
class ConfigurableApiSpider(air.Spider):
"""可配置的API爬虫"""
def __init__(self, config: SpiderApiConfig = None):
super().__init__()
self.config = config or SpiderApiConfig()
def make_seeds(self, context, **kwargs):
"""验证并生成种子"""
urls = kwargs.get("urls", [])
# 验证URL数量限制
if len(urls) > self.config.max_urls_per_request:
raise ValueError(f"URL数量超过限制: {len(urls)} > {self.config.max_urls_per_request}")
# 验证域名白名单
if self.config.allowed_domains:
for url in urls:
domain = self._extract_domain(url)
if domain not in self.config.allowed_domains:
raise ValueError(f"域名不在允许列表中: {domain}")
# 验证域名黑名单
for url in urls:
domain = self._extract_domain(url)
if domain in self.config.blocked_domains:
raise ValueError(f"域名在禁止列表中: {domain}")
return [{"url": url} for url in urls]
def _extract_domain(self, url: str) -> str:
"""提取域名"""
from urllib.parse import urlparse
return urlparse(url).netloc
def make_request(self, context):
"""使用配置构造请求"""
seeds = context.seeds
# 合并默认headers
headers = {**self.config.default_headers}
headers.update(seeds.get("headers", {}))
return Request(
url=seeds["url"],
headers=headers,
timeout=seeds.get("timeout", self.config.default_timeout),
max_retry=self.config.max_retry
)
# 生产环境启动脚本
def start_production_api():
"""启动生产环境API"""
config = SpiderApiConfig.from_env()
spider = ConfigurableApiSpider(config)
rpc = Rpc.wrap(spider)
rpc.serve(
concurrency=config.concurrency,
mode=config.mode,
ident=config.port
)
if __name__ == '__main__':
start_production_api()
2. 日志和审计
# audit_spider.py
import json
import time
from datetime import datetime
from bricks.spider import air
class AuditSpider(air.Spider):
"""带审计功能的爬虫"""
def __init__(self):
super().__init__()
self.audit_log = []
def make_seeds(self, context, **kwargs):
"""记录请求审计"""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"action": "request_received",
"params": kwargs,
"client_ip": kwargs.get("client_ip", "unknown")
}
self.audit_log.append(audit_entry)
urls = kwargs.get("urls", [])
return [{"url": url} for url in urls]
def make_request(self, context):
"""记录请求审计"""
seeds = context.seeds
url = seeds["url"]
audit_entry = {
"timestamp": datetime.now().isoformat(),
"action": "request_sent",
"url": url
}
self.audit_log.append(audit_entry)
return Request(url=url)
def parse(self, context):
"""记录解析审计"""
response = context.response
audit_entry = {
"timestamp": datetime.now().isoformat(),
"action": "response_received",
"url": response.url,
"status_code": response.status_code,
"content_length": len(response.content)
}
self.audit_log.append(audit_entry)
return response.json()
def item_pipeline(self, context):
"""记录处理审计"""
items = context.items
audit_entry = {
"timestamp": datetime.now().isoformat(),
"action": "data_processed",
"items_count": len(items)
}
self.audit_log.append(audit_entry)
context.success()
def get_audit_log(self):
"""获取审计日志"""
return self.audit_log
def clear_audit_log(self):
"""清空审计日志"""
self.audit_log.clear()
# 审计API包装器
class AuditApiWrapper:
"""审计API包装器"""
def __init__(self):
self.spider_instance = None
def execute(self, **kwargs):
"""执行爬虫"""
self.spider_instance = AuditSpider()
# 执行爬虫逻辑...
return {"status": "completed"}
def get_audit_log(self):
"""获取审计日志"""
if self.spider_instance:
return self.spider_instance.get_audit_log()
return []
def clear_audit_log(self):
"""清空审计日志"""
if self.spider_instance:
self.spider_instance.clear_audit_log()
return {"status": "cleared"}
if __name__ == '__main__':
wrapper = AuditApiWrapper()
rpc = Rpc.wrap(wrapper)
rpc.serve(concurrency=10, mode="http", ident=8080)
通过这些功能和最佳实践,可以构建出生产级别的爬虫 API 服务,具备完善的错误处理、性能监控、负载均衡和审计功能。