bricks
开发指南
2.1 事件
2.1.3 事件开发

1. 事件回调开发

事件回调函数实际上就是一个普通的函数 / 实例方法 / 类方法 / 静态方法,因此回调函数的开发即开发普通的函数,如以下为内置过滤无效状态码等简单的事件函数源码。

 
from bricks.core import signals
from bricks.core.context import Context
 
def is_success():
    """
    通用的判断响应是否成功
 
    :return:
    """
    context: Context = Context.get_context()
    response = context.obtain("response")
    if not response.ok:
        raise signals.Retry
 
 

从以上源码我们需要了解的内容是:

  1. 我们主要操作的对象就是 contextContext 是爬虫请求过程中的上下文对象,通过该对象,我们可以获取到他的请求/响应等数据
  2. 只有运行到一定的流程,Context 的属性才会被赋值,例如,只有在完成请求后,才会有 response 属性
  3. 针对 Context 对象进行相应操作之后,如果要改变运行流程,则需要 raisesignals 信号;比较常用的信号如下
    1. signals.Retry: 重试信号,会中断当前流程并重试请求
    2. signals.Success: 成功信号,会中断当前流程并删除种子
    3. signals.Failure: 失败信号,会中断当前流程并将种子暂时放入 failure 队列
  4. 大体的时间,返回的信号导致的结果是类似的,但是部分环节的事件可能不会处理某些信号,例如初始化的时间和爬虫的事件其实是有细微差别的

2. 事件回调注册

内置接口注册

事件函数一般在 install 接口进行注册,该接口在实例生成的时候会自动调用一次,我们可以看看内置的 air.Spiderinstall 方法,就会默认注册一些事件。

 
def install(self):
    super().install()
    self.use(
        state.const.BEFORE_REQUEST,
        {"func": on_request.Before.fake_ua},
        {"func": on_request.Before.set_proxy, "index": math.inf},
    )
    self.use(
        state.const.AFTER_REQUEST,
        {"func": on_request.After.show_response},
        {"func": on_request.After.conditional_scripts},
        {"func": on_request.After.is_success}
    )
 
 

注册时间使用的是 use 方法

装饰器注册

除了可以使用内置接口注册,还可以使用装饰器注册,方法如下

from bricks import const
from bricks.core import signals, events
 
 
@events.on(const.BEFORE_PIPELINE)
def test():
    context: Context = Context.get_context()
    print(context)

类下的方法同样是支持的

装饰器参数说明

@events.on 装饰器支持以下参数:

参数名类型描述默认值
formstr事件类型,可以传入 const 属性必传
indexint执行顺序索引,索引越小越先执行None
disposablebool是否为一次性事件(仅运行一次)False
argslist传递给处理函数的额外位置参数None
kwargsdict传递给处理函数的额外关键字参数None
matchCallable/str条件匹配函数或表达式None

自动识别注册方式

新版本的事件系统会根据函数的 __qualname__ 属性自动识别注册方式:

  • 类方法/实例方法:如果函数名包含 .,则注册为实例事件
  • 全局函数:如果函数名中没有 .,则设置 __event__ 属性,等待后续注册
# 全局函数 - 立即注册为全局事件
@events.on(const.BEFORE_REQUEST)
def global_handler(context):
    """全局事件处理器"""
    print("这是全局事件")
 
# 类方法 - 注册为实例事件
class MySpider:
    @events.on(const.BEFORE_REQUEST)
    def instance_handler(self, context):
        """实例事件处理器"""
        print(f"这是 {self.__class__.__name__} 的实例事件")

高级装饰器用法

# 设置执行顺序
@events.on(const.BEFORE_REQUEST, index=1)
def first_handler(context):
    print("第一个执行")
 
@events.on(const.BEFORE_REQUEST, index=2)
def second_handler(context):
    print("第二个执行")
 
# 一次性事件
@events.on(const.AFTER_REQUEST, disposable=True)
def one_time_handler(context):
    print("只执行一次")
 
# 条件匹配
def is_success_response(context):
    return context.response.status_code == 200
 
@events.on(const.AFTER_REQUEST, match=is_success_response)
def success_handler(context):
    print("只在响应成功时执行")
 
# 字符串表达式匹配
@events.on(const.AFTER_REQUEST, match="context.response.status_code >= 400")
def error_handler(context):
    print("只在响应错误时执行")
 
# 传递额外参数
@events.on(
    const.AFTER_REQUEST,
    args=["额外参数"],
    kwargs={"extra_param": "额外关键字参数"}
)
def handler_with_params(context, extra_arg, extra_param=None):
    print(f"额外参数: {extra_arg}, {extra_param}")

3. 事件管理

事件注册对象

每个注册的事件都会返回一个 Register 对象,提供事件管理功能:

from bricks.core.events import EventManager, Task
from bricks.core.context import Context
from bricks import const
 
class MySpider:
    def __init__(self):
        self.event_registers = []
 
    def register_custom_events(self):
        """手动注册事件"""
        # 创建事件任务
        task = Task(
            func=self.my_handler,
            index=1,
            disposable=False
        )
 
        # 注册事件
        registers = EventManager.register(
            Context(form=const.BEFORE_REQUEST, target=self),
            task
        )
 
        self.event_registers.extend(registers)
 
    def my_handler(self, context):
        print("处理事件")

事件取消和管理

class MySpider:
    def __init__(self):
        self.event_registers = []
 
    def unregister_event(self, register_index=0):
        """取消注册指定事件"""
        if self.event_registers:
            register = self.event_registers[register_index]
            register.unregister()
 
    def reorder_event(self, register_index=0, new_index=10):
        """重新排序事件"""
        if self.event_registers:
            register = self.event_registers[register_index]
            register.reindex(new_index)
 
    def move_event_to_top(self, register_index=0):
        """将事件移动到最前面"""
        if self.event_registers:
            register = self.event_registers[register_index]
            register.move2top()
 
    def move_event_to_tail(self, register_index=0):
        """将事件移动到最后面"""
        if self.event_registers:
            register = self.event_registers[register_index]
            register.move2tail()

传统方式取消事件

如果需要取消通过 use 方法注册的事件,可以使用以下方法:

from bricks import const
from bricks.spider import air
 
class MySpider(air.Spider):
    pass
 
if __name__ == '__main__':
    spider = MySpider()
    # 遍历已注册的插件/事件
    for plugin in spider.plugins:
        if plugin.form == const.BEFORE_PIPELINE and plugin.task.func == spider.is_success:
            plugin.unregister()

事件容器管理

新版本的事件系统使用两种容器来管理事件:

from bricks.core.events import REGISTERED_EVENTS
 
# 查看已注册的持久事件
persistent_events = REGISTERED_EVENTS.permanent
 
# 查看已注册的一次性事件
disposable_events = REGISTERED_EVENTS.disposable
 
# 查看所有注册记录
all_registers = REGISTERED_EVENTS.registered
 
# 线程安全的事件操作
with REGISTERED_EVENTS:
    # 在这里进行事件操作
    pass