Enable status tracking for business critical application using sqlalchemy_mate#

In this example, we introduce a pattern for tracking the status of business critical jobs using Relational database as the backend. This feature gives you the ability to track status of each job, and error-handling, retry, concurrency control out-of-the-box.

NOTE: this solution is based on sqlalchemy_mate Python library.

When managing a large number of business-critical jobs, it is crucial to monitor and identify which jobs have been successful, which have failed, and which are still in progress. If the business logic is a pipeline consisting of a sequence of jobs, it is important to keep track of its current status and have the ability to recover from any failed job. We also have seen some advanced requirements like:

  • Each job should be consumed once and exactly once.

  • Each job should be handled by only one worker, you want a concurrency lock mechanism to avoid double consumption.

  • For those succeeded jobs, store additional information such as the output, statistics, metadata of the job and log the success time.

  • For those failed jobs, you want to log the detailed error message for debugging.

  • You want to get all of the failed jobs by one simple query and rerun with the updated business logic.

  • job might be impossible to complete. To avoid falling into an endless retry loop, you want to ignore the jobs if they fail too many times.

  • Run custom query based on job status for analytics purpose.

With sqlalchemy_mate Python library, you can enable this advanced feature without refactoring your existing application code, and you can use the “elegant” context manager to wrap around your business logic code and enjoy all the features above.

Declare Your Status Tracking Table#

  1. First, We define some status code using the enum Python standard library. It improves the code readability and avoids hard coding meaningless integers everywhere in the code base.

  2. We declare a sqlalchemy ORM data model.

[16]:
import enum

import sqlalchemy as sa
import sqlalchemy.orm as orm
import sqlalchemy_mate.api as sm
from sqlalchemy_mate.tests.api import engine_psql as engine

from rich import print as rprint
[17]:
class StatusEnum(int, enum.Enum):
    pending = 10
    in_progress = 20
    failed = 30
    succeeded = 40
    ignored = 50
[23]:
Base = orm.declarative_base()

class Job(Base, sm.ExtendedBase, sm.patterns.status_tracker.JobMixin):
    __tablename__ = "sqlalchemy_mate_status_tracker_job"

    @classmethod
    def start_job(
        cls,
        id: str,
        skip_error: bool = False,
        debug: bool = False,
    ):
        return cls.start(
            engine=engine,
            id=id,
            pending_status=StatusEnum.pending.value,
            in_progress_status=StatusEnum.in_progress.value,
            failed_status=StatusEnum.failed.value,
            succeeded_status=StatusEnum.succeeded.value,
            ignored_status=StatusEnum.ignored.value,
            expire=15,
            max_retry=3,
            skip_error=skip_error,
            debug=debug,
        )


Base.metadata.create_all(engine)

Initialize Some Jobs#

First, let’s initialize some jobs. At begin, all the job are in pending status.

[24]:
with engine.connect() as conn:
    conn.execute(Job.__table__.delete())
    conn.commit()
[25]:
with orm.Session(engine) as ses:
    job = Job.create_and_save(
        engine_or_session=ses,
        id="job-1",
        status=StatusEnum.pending.value,
        data={"version": 0},
    )
    job = Job.create_and_save(
        engine_or_session=ses,
        id="job-2",
        status=StatusEnum.pending.value,
        data={"version": 0},
    )
    job = Job.create_and_save(
        engine_or_session=ses,
        id="job-3",
        status=StatusEnum.pending.value,
        data={"version": 0},
    )
    rprint(job)
Job(id='job-3', status=10, create_at=datetime.datetime(2024, 6, 6, 17, 24, 7, 801533),
update_at=datetime.datetime(2024, 6, 6, 17, 24, 7, 801533), lock=None, lock_at=datetime.datetime(1970, 1, 1, 0, 0),
retry=0, data={'version': 0}, errors={})
[26]:
# You can also do this to initialize many jobs in batch
# with orm.Session(engine) as ses:
#     for job_id in ["job-1", "job-2", "job-3"]:
#         job = Job.create(
#             id=job_id,
#             status=StatusEnum.pending.value,
#             data={"version": 0},
#         )
#         ses.add(job)
#     ses.commit()
#     rprint(job)

Job Succeeded#

The Job.start() class method is a magic context manager that does a lot of things.

  1. Try to obtain lock before the job begin. Once we have obtained the lock, other work won’t be able to update this row (they will see that it is locked).

  2. Any raised exception will be captured by the context manager, and it will set the status as failed, add retry count, log the error (and save the error information to DB), and release the lock.

  3. If the job has been failed too many times, it will set the status as ignored.

  4. If everything goes well, it will set status as succeeded and apply updates.

[27]:
with Job.start_job(id="job-1", debug=True) as (
    job,
    updates,
):
    # run your job logic here ...
    updates.set(key="data", value={"version": job.data["version"] + 1})
----------------------------- ▶️ start Job 'job-1'------------------------------
🔓Try to set status = 20 and lock the job 'job-1' ...
  Successfully lock the job!
✅ 🔐 job succeeded, set status = 40 and unlock the job.
------------------------ ⏹️ end Job 'job-1' status = 40)------------------------
[28]:
with orm.Session(engine) as ses:
    job = ses.get(Job, "job-1")
    rprint(job)
Job(id='job-1', status=40, create_at=datetime.datetime(2024, 6, 6, 17, 24, 7, 786145),
update_at=datetime.datetime(2024, 6, 6, 17, 24, 9, 528467), lock=None, lock_at=datetime.datetime(2024, 6, 6, 17,
24, 9, 506479), retry=0, data={'version': 1}, errors={})

Job Failed#

[29]:
class CustomError(Exception):
    pass


with Job.start_job(id="job-2", debug=True) as (
    job,
    updates,
):
    updates.set(key="data", value={"version": job.data["version"] + 1})
    # intentionally raise an error to simulate a failed job
    raise CustomError("something wrong in job-2")
----------------------------- ▶️ start Job 'job-2'------------------------------
🔓Try to set status = 20 and lock the job 'job-2' ...
  Successfully lock the job!
❌ 🔐 job failed, set status = 30 and unlock the job.
------------------------ ⏹️ end Job 'job-2' status = 20)------------------------
---------------------------------------------------------------------------
CustomError                               Traceback (most recent call last)
Cell In[29], line 11
      9 updates.set(key="data", value={"version": job.data["version"] + 1})
     10 # intentionally raise an error to simulate a failed job
---> 11 raise CustomError("something wrong in job-2")

CustomError: something wrong in job-2
[30]:
with orm.Session(engine) as ses:
    job = ses.get(Job, "job-2")
    rprint(job)
Job(id='job-2', status=30, create_at=datetime.datetime(2024, 6, 6, 17, 24, 7, 796382),
update_at=datetime.datetime(2024, 6, 6, 17, 24, 10, 477069), lock=None, lock_at=datetime.datetime(2024, 6, 6, 17,
24, 10, 457440), retry=1, data={'version': 0}, errors={'error': "CustomError('something wrong in job-2')",
'traceback': 'Traceback (most recent call last):\n  File 
"/Users/sanhehu/Documents/GitHub/sqlalchemy_mate-project/sqlalchemy_mate/patterns/status_tracker/impl.py", line 
411, in start\n    yield job, updates\n  File 
"/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_86613/3159702281.py", line 11, in <module>\n    raise 
CustomError("something wrong in job-2")\nCustomError: something wrong in job-2\n'})

Ignore If Job Fail Too Many Times#

You don’t want a job that logically can never succeed to fail into a endless loop. In this example, we defined the max retry times is 3 (See ORM data model). If it failed 3 times in a row, it will be ignored. And if you want to start a job k that is ignored, you will see an JobIgnoredError

[31]:
with Job.start_job(id="job-3", debug=True) as (job, updates):
    raise CustomError("something wrong in job-3 first attempts")
----------------------------- ▶️ start Job 'job-3'------------------------------
🔓Try to set status = 20 and lock the job 'job-3' ...
  Successfully lock the job!
❌ 🔐 job failed, set status = 30 and unlock the job.
------------------------ ⏹️ end Job 'job-3' status = 20)------------------------
---------------------------------------------------------------------------
CustomError                               Traceback (most recent call last)
Cell In[31], line 2
      1 with Job.start_job(id="job-3", debug=True) as (job, updates):
----> 2     raise CustomError("something wrong in job-3 first attempts")

CustomError: something wrong in job-3 first attempts
[32]:
with Job.start_job(id="job-3", debug=True) as (job, updates):
    raise CustomError("something wrong in job-3 second attempts")
----------------------------- ▶️ start Job 'job-3'------------------------------
🔓Try to set status = 20 and lock the job 'job-3' ...
  Successfully lock the job!
❌ 🔐 job failed, set status = 30 and unlock the job.
------------------------ ⏹️ end Job 'job-3' status = 20)------------------------
---------------------------------------------------------------------------
CustomError                               Traceback (most recent call last)
Cell In[32], line 2
      1 with Job.start_job(id="job-3", debug=True) as (job, updates):
----> 2     raise CustomError("something wrong in job-3 second attempts")

CustomError: something wrong in job-3 second attempts
[33]:
with Job.start_job(id="job-3", debug=True) as (job, updates):
    raise CustomError("something wrong in job-3 third attempts")
----------------------------- ▶️ start Job 'job-3'------------------------------
🔓Try to set status = 20 and lock the job 'job-3' ...
  Successfully lock the job!
❌ 🔐 job failed 3 times already, set status = 50 and unlock the job.
------------------------ ⏹️ end Job 'job-3' status = 20)------------------------
---------------------------------------------------------------------------
CustomError                               Traceback (most recent call last)
Cell In[33], line 2
      1 with Job.start_job(id="job-3", debug=True) as (job, updates):
----> 2     raise CustomError("something wrong in job-3 third attempts")

CustomError: something wrong in job-3 third attempts
[34]:
# The 4th attempts will raise ``JobIgnoredError``
with Job.start_job(id="job-3", debug=True) as (job, updates):
    updates.set(key="data", value={"version": job.data["version"] + 1})
----------------------------- ▶️ start Job 'job-3'------------------------------
❌ Job 'job-3' is ignored, do nothing.
---------------------------------------------------------------------------
JobIgnoredError                           Traceback (most recent call last)
Cell In[34], line 2
      1 # The 4th attempts will raise ``JobIgnoredError``
----> 2 with Job.start_job(id="job-3", debug=True) as (job, updates):
      3     updates.set(key="data", value={"version": job.data["version"] + 1})

File ~/.pyenv/versions/3.10.10/lib/python3.10/contextlib.py:135, in _GeneratorContextManager.__enter__(self)
    133 del self.args, self.kwds, self.func
    134 try:
--> 135     return next(self.gen)
    136 except StopIteration:
    137     raise RuntimeError("generator didn't yield") from None

File ~/Documents/GitHub/sqlalchemy_mate-project/sqlalchemy_mate/patterns/status_tracker/impl.py:382, in JobMixin.start(cls, engine, id, pending_status, in_progress_status, failed_status, succeeded_status, ignored_status, expire, max_retry, more_pending_status, skip_error, debug)
    380     if debug:  # pragma: no cover
    381         print(f"❌ Job {id!r} is ignored, do nothing.")
--> 382     raise JobIgnoredError(
    383         f"Job {id!r} retry count already exceeded {max_retry}, "
    384         f"ignore it."
    385     )
    386 elif job.status not in ready_to_start_status:
    387     if debug:  # pragma: no cover

JobIgnoredError: Job 'job-3' retry count already exceeded 3, ignore it.
[35]:
with orm.Session(engine) as ses:
    job = ses.get(Job, "job-3")
    rprint(job)
Job(id='job-3', status=50, create_at=datetime.datetime(2024, 6, 6, 17, 24, 7, 801533),
update_at=datetime.datetime(2024, 6, 6, 17, 24, 11, 760582), lock=None, lock_at=datetime.datetime(2024, 6, 6, 17,
24, 11, 746486), retry=3, data={'version': 0}, errors={'error': "CustomError('something wrong in job-3 third 
attempts')", 'traceback': 'Traceback (most recent call last):\n  File 
"/Users/sanhehu/Documents/GitHub/sqlalchemy_mate-project/sqlalchemy_mate/patterns/status_tracker/impl.py", line 
411, in start\n    yield job, updates\n  File 
"/var/folders/bl/vkmgjdsx5115w2xcnp67_8y40000gn/T/ipykernel_86613/1125219603.py", line 2, in <module>\n    raise 
CustomError("something wrong in job-3 third attempts")\nCustomError: something wrong in job-3 third attempts\n'})

Recap#

A relationship database is perfect for this status tracking use case. However, if you don’t want to manage the database cluster or you are experiencing super high volume of conccurent jobs, you could consider using Amazon DynamoDB. Amazon DynamoDB is a serverless, infinitely scalable, key value store that is perfect for this use case. pynamodb_mate Python library has built-in support for this status tracking pattern. You can see example at Enable status tracking for business critical application using Amazon DynamoDB

[ ]:

[ ]:

[ ]:

[ ]: