1. 事件回调开发
事件回调函数实际上就是一个普通的函数 / 实例方法 / 类方法 / 静态方法,因此回调函数的开发即开发普通的函数,如以下为内置过滤无效状态码等简单的事件函数源码。
from bricks.core import signals
from bricks.core.context import Context
def is_success():
"""
通用的判断响应是否成功
:return:
"""
context: Context = Context.get_context()
response = context.obtain("response")
if not response.ok:
raise signals.Retry
从以上源码我们需要了解的内容是:
- 我们主要操作的对象就是
context
,Context
是爬虫请求过程中的上下文对象,通过该对象,我们可以获取到他的请求/响应等数据 - 只有运行到一定的流程,
Context
的属性才会被赋值,例如,只有在完成请求后,才会有response
属性 - 针对
Context
对象进行相应操作之后,如果要改变运行流程,则需要raise
出signals
信号;比较常用的信号如下signals.Retry
: 重试信号,会中断当前流程并重试请求signals.Success
: 成功信号,会中断当前流程并删除种子signals.Failure
: 失败信号,会中断当前流程并将种子暂时放入failure
队列
- 大体的时间,返回的信号导致的结果是类似的,但是部分环节的事件可能不会处理某些信号,例如初始化的时间和爬虫的事件其实是有细微差别的
2. 事件回调注册
内置接口注册
事件函数一般在 install
接口进行注册,该接口在实例生成的时候会自动调用一次,我们可以看看内置的 air.Spider
的 install
方法,就会默认注册一些事件。
def install(self):
super().install()
self.use(
state.const.BEFORE_REQUEST,
{"func": on_request.Before.fake_ua},
{"func": on_request.Before.set_proxy, "index": math.inf},
)
self.use(
state.const.AFTER_REQUEST,
{"func": on_request.After.show_response},
{"func": on_request.After.conditional_scripts},
{"func": on_request.After.is_success}
)
注册时间使用的是 use
方法
装饰器注册
除了可以使用内置接口注册,还可以使用装饰器注册,方法如下
from bricks import const
from bricks.core import signals, events
@events.on(const.BEFORE_PIPELINE)
def test():
context: Context = Context.get_context()
print(context)
类下的方法同样是支持的
装饰器参数说明
@events.on
装饰器支持以下参数:
参数名 | 类型 | 描述 | 默认值 |
---|---|---|---|
form | str | 事件类型,可以传入 const 属性 | 必传 |
index | int | 执行顺序索引,索引越小越先执行 | None |
disposable | bool | 是否为一次性事件(仅运行一次) | False |
args | list | 传递给处理函数的额外位置参数 | None |
kwargs | dict | 传递给处理函数的额外关键字参数 | None |
match | Callable/str | 条件匹配函数或表达式 | None |
自动识别注册方式
新版本的事件系统会根据函数的 __qualname__
属性自动识别注册方式:
- 类方法/实例方法:如果函数名包含
.
,则注册为实例事件 - 全局函数:如果函数名中没有
.
,则设置__event__
属性,等待后续注册
# 全局函数 - 立即注册为全局事件
@events.on(const.BEFORE_REQUEST)
def global_handler(context):
"""全局事件处理器"""
print("这是全局事件")
# 类方法 - 注册为实例事件
class MySpider:
@events.on(const.BEFORE_REQUEST)
def instance_handler(self, context):
"""实例事件处理器"""
print(f"这是 {self.__class__.__name__} 的实例事件")
高级装饰器用法
# 设置执行顺序
@events.on(const.BEFORE_REQUEST, index=1)
def first_handler(context):
print("第一个执行")
@events.on(const.BEFORE_REQUEST, index=2)
def second_handler(context):
print("第二个执行")
# 一次性事件
@events.on(const.AFTER_REQUEST, disposable=True)
def one_time_handler(context):
print("只执行一次")
# 条件匹配
def is_success_response(context):
return context.response.status_code == 200
@events.on(const.AFTER_REQUEST, match=is_success_response)
def success_handler(context):
print("只在响应成功时执行")
# 字符串表达式匹配
@events.on(const.AFTER_REQUEST, match="context.response.status_code >= 400")
def error_handler(context):
print("只在响应错误时执行")
# 传递额外参数
@events.on(
const.AFTER_REQUEST,
args=["额外参数"],
kwargs={"extra_param": "额外关键字参数"}
)
def handler_with_params(context, extra_arg, extra_param=None):
print(f"额外参数: {extra_arg}, {extra_param}")
3. 事件管理
事件注册对象
每个注册的事件都会返回一个 Register
对象,提供事件管理功能:
from bricks.core.events import EventManager, Task
from bricks.core.context import Context
from bricks import const
class MySpider:
def __init__(self):
self.event_registers = []
def register_custom_events(self):
"""手动注册事件"""
# 创建事件任务
task = Task(
func=self.my_handler,
index=1,
disposable=False
)
# 注册事件
registers = EventManager.register(
Context(form=const.BEFORE_REQUEST, target=self),
task
)
self.event_registers.extend(registers)
def my_handler(self, context):
print("处理事件")
事件取消和管理
class MySpider:
def __init__(self):
self.event_registers = []
def unregister_event(self, register_index=0):
"""取消注册指定事件"""
if self.event_registers:
register = self.event_registers[register_index]
register.unregister()
def reorder_event(self, register_index=0, new_index=10):
"""重新排序事件"""
if self.event_registers:
register = self.event_registers[register_index]
register.reindex(new_index)
def move_event_to_top(self, register_index=0):
"""将事件移动到最前面"""
if self.event_registers:
register = self.event_registers[register_index]
register.move2top()
def move_event_to_tail(self, register_index=0):
"""将事件移动到最后面"""
if self.event_registers:
register = self.event_registers[register_index]
register.move2tail()
传统方式取消事件
如果需要取消通过 use
方法注册的事件,可以使用以下方法:
from bricks import const
from bricks.spider import air
class MySpider(air.Spider):
pass
if __name__ == '__main__':
spider = MySpider()
# 遍历已注册的插件/事件
for plugin in spider.plugins:
if plugin.form == const.BEFORE_PIPELINE and plugin.task.func == spider.is_success:
plugin.unregister()
事件容器管理
新版本的事件系统使用两种容器来管理事件:
from bricks.core.events import REGISTERED_EVENTS
# 查看已注册的持久事件
persistent_events = REGISTERED_EVENTS.permanent
# 查看已注册的一次性事件
disposable_events = REGISTERED_EVENTS.disposable
# 查看所有注册记录
all_registers = REGISTERED_EVENTS.registered
# 线程安全的事件操作
with REGISTERED_EVENTS:
# 在这里进行事件操作
pass