impl#

exception sqlalchemy_mate.patterns.status_tracker.impl.JobExecutionError[source]#
exception sqlalchemy_mate.patterns.status_tracker.impl.JobLockedError[source]#

Raised when try to start a locked job.

exception sqlalchemy_mate.patterns.status_tracker.impl.JobIsNotReadyToStartError[source]#

Raised when try to start job that the current status shows that it is not ready to start.

exception sqlalchemy_mate.patterns.status_tracker.impl.JobAlreadySucceededError[source]#

Raised when try to start a succeeded (failed too many times) job.

exception sqlalchemy_mate.patterns.status_tracker.impl.JobIgnoredError[source]#

Raised when try to start an ignored (failed too many times) job.

class sqlalchemy_mate.patterns.status_tracker.impl.JobMixin[source]#

The sqlalchemy ORM data model mixin class that brings in status tracking related features. Core API includes:

See: https://docs.sqlalchemy.org/en/20/orm/declarative_mixins.html

锁机制

为了在高并发的环境中防止多个程序执行同一个 Job, 我们需要对 Job 进行加锁.

每当我们准备执行一个 Job 时, 我们需要根据 job id 去获取一把锁 (也就是将 job 上锁). 然后执行 Job 的业务逻辑. 在 Job 正在执行的过程中, 其他程序是不允许彭这个 Job 的. 整个流程的时间顺序如下:

  1. SELECT ... WHERE id = 'job_id' 获取这个 Job.

  2. 看看这个 Job 是否上锁, 如果已上锁则直接退出.

  3. UPDATE ... SET lock = ... WHERE id = 'job_id' AND lock IS NULL

    进行加锁操作. 如果在 #1 之后, #3 之前有人把这个 Job 锁上了, 这个 SQL 就不会执行成功, 我们也就视为获取锁失败.

注 1, 这里我们故意没有用 SELECT ... WHERE ... FOR UPDATE 的行锁语法, 因为我们 需要显式的维护这个锁的开关和生命周期.

注 2, 我们是先获得这个 job, 检查是否上锁, 然后再 update 上锁. 你可能会担心在检查成功后 到 update 上锁期间如果有其他人把这个锁锁上了怎么办? 这个问题是不存在的, 因为 update 里的 where 会保证如果尝试上锁的时候已经被上锁了, 这个 update 会失败. 再一个你可能会问为什么不 先 update 上锁, 再获取 job. 因为我们希望当这个 job 已经被上锁时, 其他的并发 worker 能够 用最小的代价了解到这个 job 已经被上锁了. 而明显 get 的代价比 update 要小得多, 所以 优先用 get 来获得 job 检查锁的状态.

classmethod create(id: str, status: int, data: Optional[dict] = None, **kwargs)[source]#

Create an in-memory instance of the job object. This method won’t write the job to database. This is useful for initializing many new jobs in batch.

Usage example:

with orm.Session(engine) as ses:
    for job_id in job_id_list:
        job = Job.create(id=job_id, status=10)
        ses.add(job)
    ses.commit()
classmethod create_and_save(engine_or_session: Union[Engine, Session], id: str, status: int, data: Optional[dict] = None, **kwargs)[source]#

Create an instance of the job object and write the job to database.

Usage example:

with orm.Session(engine) as ses:
    Job.create_and_save(ses, id="job-1", status=10)
is_locked(expire: int) bool[source]#

Check if the job is locked.

lock_it(engine_or_session: Union[Engine, Session], in_progress_status: int, debug: bool = False) Tuple[str, datetime][source]#
Returns:

a tuple of (lock, lock_at).

classmethod start(engine: Engine, id: str, pending_status: int, in_progress_status: int, failed_status: int, succeeded_status: int, ignored_status: int, expire: int, max_retry: int, more_pending_status: Optional[Union[int, List[int]]] = None, traceback_stack_limit: int = 10, skip_error: bool = False, debug: bool = False) ContextManager[Tuple[T_JOB, Updates]][source]#

This is the most important API. A context manager that does a lot of things:

  1. Try to obtain lock before the job begin. Once we have obtained the lock,

    other work won’t be able to update this row (they will see that it is locked).

  2. Any raised exception will be captured by the context manager, and it will

    set the status as failed, add retry count, log the error (and save the error information to DB), and release the lock.

  3. If the job has been failed too many times, it will set the status as ignored.

  4. If everything goes well, it will set status as succeeded and apply updates.

Usage example:

with Job.start(
    engine=engine,
    id="job-1",
    pending_status=10,
    in_process_status=20,
    failed_status=30,
    success_status=40,
    ignore_status=50,
    expire=900, # concurrency lock will expire in 15 minutes,
    max_retry=3,
    debug=True,
) as (job, updates):
    # do your job logic here
    ...
    # you can use ``updates.set(...)`` method to specify
    # what you would like to update at the end of the job
    # if the job succeeded.
    updates.set(key="data", value={"version": 1})
Parameters:
  • engine – SQLAlchemy engine. A life-cycle of a job has to be done in a new session.

  • id – unique job id, usually the primary key of the job table. todo, add support to allow compound primary key.

  • pending_status – pending status code in integer.

  • in_progress_status – in_progress status code in integer.

  • failed_status – failed status code in integer.

  • succeeded_status – succeeded status code in integer.

  • ignored_status – ignored status code in integer.

  • more_pending_status – additional pending status code that logically equal to “pending” status.

  • max_retry – how many retry is allowed before we ignore it

  • expire – how long the lock will expire

  • skip_error – if True, ignore the error during the job execution logics. note that this flag won’t ignore the error during the context manager start up and clean up. For example, it won’t ignore the JobLockedError.

  • debug – if True, print debug message.

注: 这里的设计跟 pynamodb_mate 中的 status tracker 模块不同. 这里没有 detailed_error 这个参数. 这是因为在 sql 中我们会先 get job 再 update 获取锁, 所以 在获取锁失败时我们无需再次查询数据库来了解错误原因. 而 dynamodb 是先 update 获取锁, 出错后如需了解详细的错误原因需要一次额外的 get 操作.

classmethod query_by_status(engine_or_session: Union[Engine, Session], status: int, limit: int = 10, older_task_first: bool = True) List[T_JOB][source]#

Query job by status.

Parameters:
  • engine_or_session

  • status – desired status code

  • limit – number of jobs to return

  • older_task_first – if True, then return older task

(older update_at time) first.

class sqlalchemy_mate.patterns.status_tracker.impl.Updates(values: dict = <factory>)[source]#

A helper class that hold the key value you want to update at the end of the job if the job succeeded.

set(key: str, value: Any)[source]#

Use this method to set “to-update” data. Note that you should not update some columns like “id”, “status”, “update_at” yourself, it will be updated by the JobMixin.start() context manager.