今天小编给大家分享一下Tortoise orm信号实现及使用场景是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
场景
在使用Tortoise操作数据库的时候发现,通过对操作数据库模型加以装饰器,如@pre_save(Model)
,可以实现对这个模型在savue
时,自动调用被装饰的方法,从而实现对模型的一些操作。
在此先从官方文档入手,看一下官方的对于模型信号的Example
# -*- coding: utf-8 -*-"""This example demonstrates model signals usage"""from typing import List, Optional, Typefrom tortoise import BaseDBAsyncClient, Tortoise, fields, run_asyncfrom tortoise.models import Modelfrom tortoise.signals import post_delete, post_save, pre_delete, pre_saveclass Signal(Model): id = fields.IntField(pk=True) name = fields.TextField() class Meta: table = "signal" def __str__(self): return self.name@pre_save(Signal)async def signal_pre_save( sender: "Type[Signal]", instance: Signal, using_db, update_fields) -> None: print('signal_pre_save', sender, instance, using_db, update_fields)@post_save(Signal)async def signal_post_save( sender: "Type[Signal]", instance: Signal, created: bool, using_db: "Optional[BaseDBAsyncClient]", update_fields: List[str],) -> None: print('post_save', sender, instance, using_db, created, update_fields)@pre_delete(Signal)async def signal_pre_delete( sender: "Type[Signal]", instance: Signal, using_db: "Optional[BaseDBAsyncClient]") -> None: print('pre_delete', sender, instance, using_db)@post_delete(Signal)async def signal_post_delete( sender: "Type[Signal]", instance: Signal, using_db: "Optional[BaseDBAsyncClient]") -> None: print('post_delete', sender, instance, using_db)async def run(): await Tortoise.init(db_url="sqlite://:memory:", modules={"models": ["__main__"]}) await Tortoise.generate_schemas() # pre_save,post_save will be send signal = await Signal.create(name="Signal") signal.name = "Signal_Save" # pre_save,post_save will be send await signal.save(update_fields=["name"]) # pre_delete,post_delete will be send await signal.delete()if __name__ == "__main__": run_async(run())
以上代码可直接复制后运行,运行后的结果:
signal_pre_save <class '__main__.Signal'> Signal <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> None
post_save <class '__main__.Signal'> Signal <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> True None
signal_pre_save <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> ['name']
post_save <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400> False ['name']
pre_delete <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400>
post_delete <class '__main__.Signal'> Signal_Save <tortoise.backends.sqlite.client.SqliteClient object at 0x7f8518319400>
可以发现,对模型进行保存和删除时候,都会调用对应的信号方法。
源码
从导包可以得知,tortoise的所有信号方法都在tortoise.signals
中。
from enum import Enumfrom typing import CallableSignals = Enum("Signals", ["pre_save", "post_save", "pre_delete", "post_delete"])def post_save(*senders) -> Callable: """ Register given models post_save signal. :param senders: Model class """ def decorator(f): for sender in senders: sender.register_listener(Signals.post_save, f) return f return decoratordef pre_save(*senders) -> Callable: ...def pre_delete(*senders) -> Callable: ...def post_delete(*senders) -> Callable: ...
其内部实现的四个信号方法分别是模型的保存后,保存前,删除前,删除后。
其内部装饰器代码也十分简单,就是对装饰器中的参数(也就是模型),注册一个监听者,而这个监听者,其实就是被装饰的方法。
如上面的官方示例中:
# 给模型Signal注册一个监听者,它是方法signal_pre_save@pre_save(Signal)async def signal_pre_save( sender: "Type[Signal]", instance: Signal, using_db, update_fields) -> None: print('signal_pre_save', sender, instance, using_db, update_fields)
而到了Model类中,自然就有一个register_listener方法,定睛一看,上面示例Signal中并没有register_listener方法,所以自然就想到了,这个方法必定在父类Model中。
class Model: ... @classmethod def register_listener(cls, signal: Signals, listener: Callable): ... if not callable(listener): raise ConfigurationError("Signal listener must be callable!") # 检测是否已经注册过 cls_listeners = cls._listeners.get(signal).setdefault(cls, []) # type:ignore if listener not in cls_listeners: # 注册监听者 cls_listeners.append(listener)
接下来注册后,这个listeners就会一直跟着这个Signal类。只需要在需要操作关键代码的地方,进行调用即可。
看看在模型save的时候,都干了什么?
async def save( self, using_db: Optional[BaseDBAsyncClient] = None, update_fields: Optional[Iterable[str]] = None, force_create: bool = False, force_update: bool = False, ) -> None: ... # 执行保存前的信号 await self._pre_save(db, update_fields) if force_create: await executor.execute_insert(self) created = True elif force_update: rows = await executor.execute_update(self, update_fields) if rows == 0: raise IntegrityError(f"Can't update object that doesn't exist. PK: {self.pk}") created = False else: if self._saved_in_db or update_fields: if self.pk is None: await executor.execute_insert(self) created = True else: await executor.execute_update(self, update_fields) created = False else: # TODO: Do a merge/upsert operation here instead. Let the executor determine an optimal strategy for each DB engine. await executor.execute_insert(self) created = True self._saved_in_db = True # 执行保存后的信号 await self._post_save(db, created, update_fields)
抛开其他代码,可以看到,在模型save的时候,其实是先执行保存前的信号,然后执行保存后的信号。
自己实现一个信号
有了以上的经验,可以自己实现一个信号,比如我打算做个数据处理器的类,我想在这个处理器工作中,监听处理前/后的信号。
# -*- coding: utf-8 -*-from enum import Enumfrom typing import Callable, Dict# 声明枚举信号量Signals = Enum("Signals", ["before_process", "after_process"])# 处理前的装饰器def before_process(*senders): def decorator(f): for sender in senders: sender.register_listener(Signals.before_process, f) return f return decorator# 处理后的装饰器def after_process(*senders): def decorator(f): for sender in senders: sender.register_listener(Signals.after_process, f) return f return decoratorclass Model(object): _listeners: Dict = { Signals.before_process: {}, Signals.after_process: {} } @classmethod def register_listener(cls, signal: Signals, listener: Callable): """注册监听者""" # 判断是否已经存在监听者 cls_listeners = cls._listeners.get(signal).setdefault(cls, []) if listener not in cls_listeners: # 如果不存在,则添加监听者 cls_listeners.append(listener) def _before_process(self): # 取出before_process监听者 cls_listeners = self._listeners.get(Signals.before_process, {}).get(self.__class__, []) for listener in cls_listeners: # 调用监听者 listener(self.__class__, self) def _after_process(self): # 取出after_process监听者 cls_listeners = self._listeners.get(Signals.after_process, {}).get(self.__class__, []) for listener in cls_listeners: # 调用监听者 listener(self.__class__, self)class SignalModel(Model): def process(self): """真正的调用端""" self._before_process() print("Processing") self._after_process()# 注册before_process信号@before_process(SignalModel)def before_process_listener(*args, **kwargs): print("before_process_listener1", args, kwargs)# 注册before_process信号@before_process(SignalModel)def before_process_listener(*args, **kwargs): print("before_process_listener2", args, kwargs)# 注册after_process信号@after_process(SignalModel)def before_process_listener(*args, **kwargs): print("after_process_listener", args, kwargs)if __name__ == '__main__': sm = SignalModel() sm.process()
输出结果:
before_process_listener1 (<class '__main__.SignalModel'>, <__main__.SignalModel object at 0x7ff700116e50>) {}
before_process_listener2 (<class '__main__.SignalModel'>, <__main__.SignalModel object at 0x7ff700116e50>) {}
Processing
after_process_listener (<class '__main__.SignalModel'>, <__main__.SignalModel object at 0x7ff700116e50>) {}
以上就是“Tortoise orm信号实现及使用场景是什么”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网行业资讯频道。