epilys

Running minor tasks with a simple job system in Django

Suppose you have a Django website that needs to run jobs and when exactly it runs and idempotence are not important, as long as it eventually runs soon enough.

Examples:

You can encode a generic job function as a Django model and store it in the database. The job’s function is a plain text field that must include a valid Python dotted path of a function, that is, on runtime we must be able to import the function by using its path from the given string.

from django.db import models
from django.utils.timezone import make_aware
from django.utils.module_loading import import_string

class JobKind(models.Model):
    id = models.AutoField(primary_key=True)
    dotted_path = models.TextField(null=False, blank=False, unique=True)
    created = models.DateTimeField(auto_now_add=True, null=False, blank=False)
    last_modified = models.DateTimeField(auto_now_add=True, null=False, blank=False)

    def __str__(self):
        return self.dotted_path

    @staticmethod
    def from_func(func):
        if isinstance(func, types.FunctionType):
            dotted_path = f"{func.__module__}.{func.__name__}"
            ret, _ = JobKind.objects.get_or_create(dotted_path=dotted_path)
            return ret
        else:
            raise TypeError

    def run(self, job):
        try:
            func = import_string(self.dotted_path)
            return func(job)
        except ImportError:
            logging.error(f"Could not resolve job dotted_path: {self.dotted_path}")
            raise ImportError

You can implement a Job Django model that can run a JobKind as follows:

class Job(models.Model):
    id = models.AutoField(primary_key=True)
    kind = models.ForeignKey(JobKind, null=True, on_delete=models.SET_NULL)
    created = models.DateTimeField(auto_now_add=True)
    active = models.BooleanField(default=True, null=False, blank=False)
    periodic = models.BooleanField(default=False, null=False, blank=False)
    failed = models.BooleanField(default=False, null=False, blank=False)
    last_run = models.DateTimeField(default=None, null=True, blank=True)
    logs = models.TextField(null=True, blank=True)
    data = models.JSONField(null=True, blank=True)

    def __str__(self):
        return f"{self.kind} {self.data}"

    def run(self):
        if not self.kind_id:
            return
        self.last_run = make_aware(datetime.now())
        try:
            res = self.kind.run(self)
            if res and not self.periodic:
                self.active = False
            if isinstance(res, str):
                if self.logs is None:
                    self.logs = ""
                self.logs += res
            self.failed = False
            self.save(update_fields=["last_run", "failed", "active", "logs"])
        except Exception as exc:
            if self.logs is None:
                self.logs = ""
            self.logs += str(exc)
            self.failed = True
            self.save(update_fields=["last_run", "failed", "logs"])
        return

Now you can run pending jobs with cron by making a Django management command:

# my_project/management/commands/run_jobs.py
from django.core.management.base import BaseCommand
from my_project.jobs import Job

class Command(BaseCommand):
    help = "Run pending jobs"

    def handle(self, *args, **kwargs):
        for job in Job.objects.filter(active=True):
            job.run()

You can also setup a thread that sleeps and periodically wakes up to run any pending tasks by overriding the ready method on your django.apps.AppConfig:

# my_project/apps.py
import threading

    def ready(self):
        import my_project.jobs

        def sched_jobs():
            from my_project.jobs import Job
            import sched
            import time

            def exec_fn():
                for job in Job.objects.filter(active=True, failed=False):
                    job.run()

            s = sched.scheduler(time.time, time.sleep)
            while True:
                s.enter(15 * 60, 1, exec_fn)
                s.run(blocking=True)

        self.scheduling_thread = threading.Thread(target=sched_jobs, daemon=True)
        self.scheduling_thread.name = "scheduling_thread"
        self.scheduling_thread.start()

JobKind django admin panel

Job django admin panel

You can easily inspect jobs from the Django admin panel by registering the models to the admin app:

@admin.action(description="Run jobs")
def run_jobs(modeladmin, request, queryset):
    for job in queryset.all():
        job.run()

class JobAdmin(ModelAdmin):
    def success(self, obj):
        if obj.last_run is None:
            return None
        return not obj.failed

    readonly_fields = (
        "json_pprint",
    )

    @admin.display(description="JSON pretty print")
    def json_pprint(self, instance):
        import json

        return mark_safe(
            f"""<pre>{json.dumps(instance.data, sort_keys=True, indent=4)}</pre>"""
        )

    success.boolean = True
    ordering = ["-created", "-last_run"]
    actions = [run_jobs]
    list_display = ["__str__", "created", "active", "periodic", "success", "last_run"]
    list_filter = [
        "kind",
        "active",
        "failed",
    ]


class JobKindAdmin(ModelAdmin):
    def resolves(self, obj):
        from django.utils.module_loading import import_string

        try:
            _ = import_string(obj.dotted_path)
            return True
        except ImportError:
            return False

    resolves.boolean = True
    ordering = ["-created", "-last_modified"]
    list_display = ["__str__", "created", "last_modified", "resolves"]

Now you can create new jobs from the admin panel and from code elsewhere in your app. Suppose you have an API endpoint to receive Webmentions. You can avoid blocking the HTTP response by scheduling the processing for later in the view:

from my_project.jobs import Job, JobKind

# schedule job
kind = JobKind.from_func(webmention_receive)
_job_obj, _ = Job.objects.get_or_create(
    kind=kind, periodic=False, data={"source": source, "target": target}
)

Improvements

Real life example

This pattern is used in the sic.pm link aggregator community: https://github.com/epilys/sic/blob/158284451097ab94da0efe5cbdfae14b0bb3a1a8/sic/jobs.py