Celery tasks

This project uses Celery to implement background jobs.

See also

Running the celery worker

If celery worker is not available, celery tasks can be ran synchronously by setting CELERY_TASK_ALWAYS_EAGER - this is set to true by default. This has the benefit of being easy to use, but results in poor performance as all work is done within the wsgi request thread, and does not support concurrency. In eager mode, the scheduled jobs are not triggered. This requires a Celery Beat process.

docker compose

When running the project using the Using docker compose method, the celery worker and beat should already be running by default.

virtualenv

If the project is ran locally in a virtualenv, you need to install and run Redis so that worker process to act as a job queue for celery.

Note

Adding redis connection to the project, will enable redis as a cache backend as well as celery queue.

  1. Download and install Redis.

    • Ensure that Redis server is running

    • You may need to run redis-server in terminal to run it

  2. Set the following environment variables to point at the redis database:

    Example variables for running celery worker
    export REDIS_HOST=localhost
    export CELERY_TASK_ALWAYS_EAGER=False
    
  3. Activate python virtual environment if you haven’t already.

  4. Run Celery worker to handle incoming tasks:

    Execute this command and leave the process running in the background
    PYTHONPATH=${PWD}/meg_forms celery -A meg_forms worker -l DEBUG --concurrency ${CELERY_CONCURRENCY:-4}
    
  5. Run Celery Beat to trigger scheduled tasks:

    Execute this command and leave the process running in the background
    PYTHONPATH=${PWD}/meg_forms celery -A meg_forms beat -l DEBUG
    

Implementing tasks

Tasks should be added to the app defined in celery_app. Use one of the presets in celery_constants to set the default retry policy and priority.

from meg_forms import app
from celery_constants import CELERY_TASK_DEFAULTS

@app.task(**CELERY_TASK_DEFAULTS)
def example_task(form_id: int):
    form: AuditForm = AuditForm.objects.get(pk=form_id)

Some important rules when designing celery tasks:

  • Tasks must be always implemented in relevant app’s tasks.py module.

  • When accepting model instance, the job should use id instead of serialized object instance

  • A celery job should not take longer than 10s to complete. Large jobs should be split into smaller ones to take advantage of concurrency.

  • When querying and saving objects, use select_for_update() to ensure that object is locked, especially in long-running tasks.

Task priorities

The following task priorities are available. The jobs will be executed in order from high to low priority. Task’s priority can be set by adding priority parameter to its decorator.

High

Important jobs that should skip the queue. Use for rendering data that user is actively waiting for, but don’t take much time. For example, rendering previews. This can be either a celery job awaited by a view, or polled by htmx while showing a progress spinner to the user.

Normal

Default priority used for most jobs.

Low

Unimportant celery tasks that can wait until all other jobs have been completed, such as routine maintenance. Use this priority in particular when launching a lot of jobs that would otherwise fill up the celery queue.

Triggering tasks

To trigger a celery job in the background, invoke its delay() method:

example_task.delay(form.pk)

Important

When triggering a celery job within a transaction, ensure that job does not start until transaction is committed to ensure that celery worker has the latest version of the object. For example:

form = AuditForm.objects.create(...)
transaction.on_commit(lambda: example_task.delay(form.pk))

Overriding priority

To override task’s default priority by calling apply_async:

example_task.apply_async(args=[form.pk], priority=CELERY_PRIORITY_HIGH)

Advanced

More advanced options of running, grouping and chaining tasks are explained in celery workflows documentation.

Scheduling recurring jobs

Repeating tasks can be scheduled by:

Monitoring & Troubleshooting

Celery dashboard is available in celery admin dashboard. Celery queue can be viewed in Celery page - it shows pending and currently running jobs as well as a list of workers online.

The following manage.py commands are available for troubleshooting:

celery_queue

Lists all jobs in the queue

celery_results

Prints out results of all completed jobs

cancel_celery_task [task_path]

Cancels a task by name. For example:

./manage.py cancel_celery_task accounts.tasks.celery_update_user_form_permission_cache