距离上一篇APScheduler源码分析已经间隔了一段时间,趁现在有点闲暇,赶紧写上一篇。

这篇来分析APScheduler执行器相关的代码。

回顾

先回忆一下APScheduler是怎么运行起来的?回顾一下example的代码。

    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3) # 添加一个任务,3秒后运行
    scheduler.start()

简单而言,实例化BackgroundScheduler,然后调用add_job方法添加任务,最后调用start方法启动。

add_job方法通过前面文章的分析已经知道了,就是将方法存到内存dict中,interval指定触发器为间隔触发器,间隔时间为3秒。

现在看一下start方法。

start方法

BackgroundScheduler的start方法调用了BaseScheduler类的start方法,其代码如下。

# apscheduler/schedulers/base.py/BaseScheduler
                             
    def start(self, paused=False):
        if self.state != STATE_STOPPED:
            raise SchedulerAlreadyRunningError
         # 检查:如果我们在uWSGI线程禁用状态下运行时就返回相应的错误警报
        self._check_uwsgi()

        with self._executors_lock:
            # Create a default executor if nothing else is configured
            # 创建默认执行器
            if 'default' not in self._executors:
                self.add_executor(self._create_default_executor(), 'default')

            # Start all the executors
            for alias, executor in self._executors.items():
                executor.start(self, alias)

        with self._jobstores_lock:
            # Create a default job store if nothing else is configured
            # 创建默认的存储器
            if 'default' not in self._jobstores:
                self.add_jobstore(self._create_default_jobstore(), 'default')

            # Start all the job stores
            for alias, store in self._jobstores.items():
                store.start(self, alias)

            # Schedule all pending jobs
            for job, jobstore_alias, replace_existing in self._pending_jobs:
                self._real_add_job(job, jobstore_alias, replace_existing)
            del self._pending_jobs[:]

        self.state = STATE_PAUSED if paused else STATE_RUNNING
        self._logger.info('Scheduler started')
        self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_START))

        if not paused:
            self.wakeup()

start方法代码含义直观,就是创建默认执行器以及默认的存储器,此外还调用了想要的start方法,执行器的start方法传入了self(调度器本身)与alias,执行器的start方法做了什么?默认执行器的start方法BaseExecutor类中,其代码如下。

# apscheduler/executors/base.py/BaseExecutor

    def start(self, scheduler, alias):
        self._scheduler = scheduler
        self._lock = scheduler._create_lock()
        self._logger = logging.getLogger('apscheduler.executors.%s' % alias)

可以发现,start方法其实没做什么。

APScheduler默认的执行器就是线程执行器

# apscheduler/schedulers/base.py/BaseScheduler

    def _create_default_executor(self):
        """Creates a default executor store, specific to the particular scheduler type."""
        return ThreadPoolExecutor()

本质就是使用ThreadPoolExecutor,但要注意其继承了BasePoolExecutor,而BasePoolExecutor又继承了BaseExecutor。

# apscheduler/executores/pool.py

class ThreadPoolExecutor(BasePoolExecutor):
    def __init__(self, max_workers=10):
        pool = concurrent.futures.ThreadPoolExecutor(int(max_workers))
        super().__init__(pool)

如何调用执行器?

这就要说回_process_jobs方法了,该方法详细分析在「Python定时任务框架:APScheduler源码剖析(二)」中,这里截取部分相关代码

for job in due_jobs:
     # Look up the job's executor
     # 搜索当前任务对象的执行器
     try:
         executor = self._lookup_executor(job.executor)
     except BaseException:
         #...省略
     # 获得运行时间
     run_times = job._get_run_times(now)
     run_times = run_times[-1:] if run_times and job.coalesce else run_times
     if run_times:
         try:
             # 提交这个任务给执行器
             executor.submit_job(job, run_times)
         except MaxInstancesReachedError:
             #...省略

大致逻辑就是从jobstore获取job任务对象,然后通过submit_job方法将job任务对象提交到执行器中,submit_job方法的具体实现在BaseExecutor类中,其逻辑如下。

# apscheduler/executors/base.py/BaseExecutor

    def submit_job(self, job, run_times):
        # self._lock 为 RLock
        assert self._lock is not None, 'This executor has not been started yet'
        with self._lock:
            if self._instances[job.id] >= job.max_instances:
                raise MaxInstancesReachedError(job)

            self._do_submit_job(job, run_times)
            self._instances[job.id] += 1

submit_job方法先判断可重入锁是否存在,存在则在加锁的情况下使用_do_submit_job方法执行job任务对象。

因为默认使用是线程执行器,其_do_submit_job方法就简单的将job任务对象提交给线程池,对应代码如下

# apscheduler/executors/pool.py/BasePoolExecutor

    def _do_submit_job(self, job, run_times):
        def callback(f):
            exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
                       (f.exception(), getattr(f.exception(), '__traceback__', None)))
            if exc:
                self._run_job_error(job.id, exc, tb)
            else:
                self._run_job_success(job.id, f.result())

        f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
        f.add_done_callback(callback)

_do_submit_job方法中,一开始定义了回调函数,用于接收线程池执行任务的结果,如果成功了,则调用_run_job_success方法,失败了则调用_run_job_error方法,这两个方法都在BaseExecutor中。

_run_job_success方法代码如下。

# apscheduler/executors/base.py/BaseExecutor

    def _run_job_success(self, job_id, events):
        """
        Called by the executor with the list of generated events when :func:`run_job` has been
        successfully called.

        """
        with self._lock:
            self._instances[job_id] -= 1
            if self._instances[job_id] == 0:
                del self._instances[job_id]

        for event in events:
            self._scheduler._dispatch_event(event)

该方法会调用事件相关的机制,将线程池执行job任务对象的结果通APScheduler事件机制分发出去。

APScheduler的事件机制下次再聊,回过头看f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name),job任务对象作为run_job方法的参数,所以执行job的其实是run_job方法。

run_job方法

run_job方法代码如下。

# apscheduler/executors/base.py

def run_job(job, jobstore_alias, run_times, logger_name):

    events = []
    logger = logging.getLogger(logger_name)
    for run_time in run_times:

        #  misfire_grace_time:在指定运行时间的之后几秒仍运行该作业运行
        if job.misfire_grace_time is not None:
            difference = datetime.now(utc) - run_time
            grace_time = timedelta(seconds=job.misfire_grace_time)
            # 判断是否超时
            if difference > grace_time:
                # 超时,则将 EVENT_JOB_MISSED 事件记录到 events 这个 list 中
                events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias,
                                                run_time))
                logger.warning('Run time of job "%s" was missed by %s', job, difference)
                continue

        logger.info('Running job "%s" (scheduled at %s)', job, run_time)
        try:
            # 执行job任务对象
            retval = job.func(*job.args, **job.kwargs)
        except BaseException:
            exc, tb = sys.exc_info()[1:]
            formatted_tb = ''.join(format_tb(tb))
            # job任务对象执行报错,将 EVENT_JOB_ERROR 添加到
            events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
                                            exception=exc, traceback=formatted_tb))
            logger.exception('Job "%s" raised an exception', job)

            # 为了防止循环引用,导致内存泄漏
            traceback.clear_frames(tb)
            del tb
        else:
            events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
                                            retval=retval))
            logger.info('Job "%s" executed successfully', job)

    return events

run_job方法中,一开始先判断当前job任务对象的运行时间是否超过了misfire_grace_time(在指定运行时间的之后几秒仍运行该作业运行),如果超时,则记录到events这个list中。

然后通过retval = job.func(*job.args, **job.kwargs)真正的执行任务对象,如果执行过程中崩溃了,也会将job任务对象执行报错以事件的形式添加到events中。

这里出现了一个有趣的小技巧。

job任务对象执行崩溃后,通过exc, tb = sys.exc_info()[1:]获取错误,而不是常见的将Exception中的值打印。

sys.exc_info方法会返回三个值:type(异常类别), value(异常说明,可带参数), traceback(traceback 对象,包含更丰富的信息),这里只取了value与traceback信息,然后通过traceback.format_tb方法将其格式化,记录到日志中后,调用traceback.clear_frames(tb)方法回溯清除所有堆栈帧中的局部变量tb,从APScheduler对该方法的注释是「为了防止循环引用,导致内存泄漏」。有点意思。​

结尾

本文主要剖析了APScheduler中线程执行器它的源码,线程执行器代码简单,是APScheduler默认的执行器,APScheduler还有多个不同的执行器,各位有兴趣可以自行探究一下,有雅致可以联系我一同简单的讨论讨论。

APScheduler源码不同执行器、调度器、触发器其设计理念是类似的,这里就不一一去分析的,但还有个东西在前面一直出现却没有分析,那就是APSCheduler的「事件分发」机制,下一篇文章就来看看APScheduler的事件分发/监听等是怎么实现的。

Scroll Up