impl#
- exception sqlalchemy_mate.patterns.status_tracker.impl.JobLockedError[source]#
Raised when try to start a locked job.
- exception sqlalchemy_mate.patterns.status_tracker.impl.JobIgnoredError[source]#
Raised when try to start a ignored (failed too many times) job.
- class sqlalchemy_mate.patterns.status_tracker.impl.JobMixin[source]#
The sqlalchemy ORM data model mixin class that brings in status tracking related features. Core API includes:
See: https://docs.sqlalchemy.org/en/20/orm/declarative_mixins.html
锁机制
为了在高并发的环境中防止多个程序执行同一个 Job, 我们需要对 Job 进行加锁.
每当我们准备执行一个 Job 时, 我们需要根据 job id 去获取一把锁 (也就是将 job 上锁). 然后执行 Job 的业务逻辑. 在 Job 正在执行的过程中, 其他程序是不允许彭这个 Job 的. 整个流程的时间顺序如下:
用
SELECT ... WHERE id = 'job_id'获取这个 Job.看看这个 Job 是否上锁, 如果已上锁则直接退出.
- 用
UPDATE ... SET lock = ... WHERE id = 'job_id' AND lock IS NULL来 进行加锁操作. 如果在 #1 之后, #3 之前有人把这个 Job 锁上了, 这个 SQL 就不会执行成功, 我们也就视为获取锁失败.
- 用
注, 这里我们故意没有用
SELECT ... WHERE ... FOR UPDATE的行锁语法, 因为我们 需要显式的维护这个锁的开关和生命周期.- classmethod create(id: str, status: int, data: Optional[dict] = None, **kwargs)[source]#
Create an in-memory instance of the job object. This method won’t write the job to database. This is useful for initializing many new jobs in batch.
Usage example:
with orm.Session(engine) as ses: for job_id in job_id_list: job = Job.create(id=job_id, status=10) ses.add(job) ses.commit()
- classmethod create_and_save(engine_or_session: Union[Engine, Session], id: str, status: int, data: Optional[dict] = None, **kwargs)[source]#
Create an instance of the job object and write the job to database.
Usage example:
with orm.Session(engine) as ses: Job.create_and_save(ses, id="job-1", status=10)
- lock_it(engine_or_session: Union[Engine, Session], in_progress_status: int, debug: bool = False) Tuple[str, datetime][source]#
- Returns:
a tuple of
(lock, lock_at).
- classmethod start(engine: Engine, id: str, in_process_status: int, failed_status: int, success_status: int, ignore_status: int, expire: int, max_retry: int, skip_error: bool = False, debug: bool = False) ContextManager[Tuple[T_JOB, Updates]][source]#
This is the most important API. A context manager that does a lot of things:
- Try to obtain lock before the job begin. Once we have obtained the lock,
other work won’t be able to update this row (they will see that it is locked).
- Any raised exception will be captured by the context manager, and it will
set the status as failed, add retry count, log the error (and save the error information to DB), and release the lock.
If the job has been failed too many times, it will set the status as
ignored.If everything goes well, it will set status as
succeededand apply updates.
Usage example:
with Job.start( engine=engine, id="job-1", in_process_status=20, failed_status=30, success_status=40, ignore_status=50, expire=900, # concurrency lock will expire in 15 minutes, max_retry=3, debug=True, ) as (job, updates): # do your job logic here ... # you can use ``updates.set(...)`` method to specify # what you would like to update at the end of the job # if the job succeeded. updates.set(key="data", value={"version": 1})
- Parameters:
engine – SQLAlchemy engine. A life-cycle of a job has to be done in a new session.
- classmethod query_by_status(engine_or_session: Union[Engine, Session], status: int, limit: int = 10, older_task_first: bool = True) List[T_JOB][source]#
Query job by status.
- Parameters:
engine_or_session –
status – desired status code
limit – number of jobs to return
older_task_first – if True, then return older task
(older update_at time) first.
- class sqlalchemy_mate.patterns.status_tracker.impl.Updates(values: dict = <factory>)[source]#
A helper class that hold the key value you want to update at the end of the job if the job succeeded.
- set(key: str, value: Any)[source]#
Use this method to set “to-update” data. Note that you should not update some columns like “id”, “status”, “update_at” yourself, it will be updated by the
JobMixin.start()context manager.