Django中的信号与信号量机制
Django信号系统
Django的信号系统是一种解耦机制,主要功能是在特定框架事件(如模型存储、用户认证)发生时自动执行预设操作,避免在事件源处直接调用逻辑,从而降低代码耦合度。
信号基础原理
信号本质是事件驱动的消息传递机制,包含三个核心组件:
- 信号(Signal):代表特定事件的通知载体,例如模型保存或请求处理完成。
- 发送者(Sender):触发事件的对象,通常是模型类或框架组件。
- 接收器(Receiver):响应信号的函数,在信号触发时自动执行。
信号类型
- 内置信号:Django自动触发的事件,如模型操作和请求生命周期信号。
- 自定义信号:开发者定义的事件,用于特定业务场景。
内置信号应用示例
场景:用户注册后自动初始化关联档案
# app_core/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth.models import User
from app_core.models import UserProfile
@receiver(post_save, sender=User)
def init_user_data(sender, obj, is_new, **kwargs):
if is_new:
UserProfile.objects.create(owner=obj)
在app_core/apps.py中注册接收器:
# app_core/apps.py
from django.apps import AppConfig
class CoreConfig(AppConfig):
name = 'app_core'
def ready(self):
import app_core.signals
自定义信号应用示例
场景:订单支付后触发通知和库存更新
# commerce/signals.py
from django.dispatch import Signal, receiver
from commerce.models import PurchaseOrder
payment_done = Signal(providing_args=["purchase", "total"])
@receiver(payment_done)
def send_confirmation(sender, purchase, total, **kwargs):
print(f"订单{purchase.id}支付成功,金额{total}元")
@receiver(payment_done)
def adjust_inventory(sender, purchase, total, **kwargs):
for item in purchase.items.all():
product = item.product
product.quantity -= item.count
product.save()
在业务逻辑中触发信号:
# commerce/views.py
from commerce.signals import payment_done
def complete_payment(request, order_id):
order = PurchaseOrder.objects.get(id=order_id)
payment_done.send(sender=PurchaseOrder, purchase=order, total=order.amount)
return HttpResponse("操作完成")
信号量机制
信号量是并发编程中的资源控制工具,通过计数器管理共享资源访问:
- P操作(acquire):减少计数器,资源不足时阻塞线程
- V操作(release):增加计数器,唤醒等待线程
Django应用场景:
- 限制异步任务并发数量
- 控制数据库连接池大小
- 管理API调用频率
线程信号量示例
场景:限制数据库并发访问为2个线程
import threading
import time
def execute_query(worker_id, lock):
lock.acquire()
try:
print(f"[{worker_id}] 数据库访问开始 (可用连接: {lock._value})")
time.sleep(1.5)
print(f"[{worker_id}] 操作完成")
finally:
lock.release()
print(f"[{worker_id}] 释放连接 (可用连接: {lock._value})")
conn_lock = threading.Semaphore(2)
workers = []
for i in range(4):
t = threading.Thread(target=execute_query, args=(f"Worker-{i}", conn_lock))
workers.append(t)
t.start()
for t in workers:
t.join()
输出说明:初始两个线程立即执行,其余线程等待资源释放,确保始终≤2个并发连接。
注意事项:
- 使用try/finally保证资源释放,避免死锁
- 区分线程信号量(threading.Semaphore)和进程信号量(multiprocessing.Semaphore)
- 仅在共享资源场景使用,避免不必要的复杂度