利用Signal信号控制AsyncIOScheduler定时关闭
在使用 aiohttp 与 APScheduler 的 AsyncIOScheduler 实现异步定时任务时,常会遇到程序提前退出或无法精确控制调度器生命周期的问题。尤其是在主函数中调用 scheduler.start() 后,事件循环未持续运行,导致添加的任务未能执行即退出。
典型的场景是:向调度器注册多个延迟执行的异步任务后,期望在所有任务执行完毕或经过指定时间后安全关闭调度器。但由于缺乏持续的事件循环驱动,程序立即终止,任务无法完成。
问题分析
AsyncIOScheduler 基于 asyncio 事件循环工作,调用 start() 并不会阻塞主线程。若不手动维持事件循环运行,Python 主程序将直接结束。常见的解决方式是调用:
asyncio.get_event_loop().run_forever()
但这会使程序无限挂起,除非外部强制中断(如 Ctrl+C)。为了实现"运行一段时间后自动退出",需要一种机制在特定时间触发退出动作。
使用 Signal 实现定时退出
在 Unix-like 系统中,signal 模块可用于注册信号处理器。通过 signal.alarm() 可设置一个定时器,在指定秒数后发送 SIGALRM 信号,从而触发预定义的清理逻辑。
结合此机制,可在调度器启动后设定一个超时,到达时间后由信号处理函数主动停止事件循环和调度器。
改进后的实现
import asyncio
import signal
from datetime import datetime, timedelta
import aiohttp
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
async def fetch_get(session):
url = 'https://httpbin.org/get?a=1'
async with session.get(url) as response:
print(f'GET 请求状态: {response.status}')
return await response.text()
async def fetch_post(session):
url = 'https://httpbin.org/post?b=2'
async with session.post(url) as response:
print(f'POST 请求状态: {response.status}')
return await response.text()
async def task_runner():
async with aiohttp.ClientSession() as client:
await fetch_get(client)
await fetch_post(client)
def schedule_shutdown(signum, frame):
"""信号处理函数:接收到 SIGALRM 时停止调度器并关闭事件循环"""
print("收到终止信号,正在关闭调度器...")
scheduler = AsyncIOScheduler.get_instance()
if scheduler.running:
scheduler.shutdown(wait=True)
loop = asyncio.get_event_loop()
loop.stop()
if __name__ == '__main__':
# 设置信号处理器
signal.signal(signal.SIGALRM, schedule_shutdown)
# 配置持久化任务存储
stores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}
scheduler = AsyncIOScheduler(jobstores=stores)
# 添加10个延时10秒执行的任务
for _ in range(10):
scheduler.add_job(task_runner, 'date', run_date=datetime.now() + timedelta(seconds=10))
scheduler.start()
# 设定20秒后发送 SIGALRM 信号
signal.alarm(20)
# 启动事件循环等待任务执行及信号触发
print("调度器已启动,将在20秒后自动关闭...")
asyncio.get_event_loop().run_forever()
关键点说明
- 信号绑定:通过
signal.signal(SIGALRM, handler)注册处理函数,确保定时器触发时能执行清理逻辑。 - 安全关闭:
scheduler.shutdown(wait=True)确保正在运行的任务有机会完成。 - 跨平台限制:
signal.alarm()仅适用于 Unix 系统(Linux/macOS),Windows 下不可用。生产环境需考虑替代方案如异步定时检查或使用asyncio.sleep()控制生命周期。
替代思路(纯异步方式)
为避免依赖系统信号,也可采用纯协程方式实现延时关闭:
async def auto_shutdown(delay):
await asyncio.sleep(delay)
print("自动关闭调度器...")
scheduler.shutdown(wait=True)
loop = asyncio.get_event_loop()
loop.stop()
# 在 start 后调度该协程
loop = asyncio.get_event_loop()
loop.create_task(auto_shutdown(20))
loop.run_forever()
这种方式更具可移植性,推荐用于跨平台项目。