Celery tasks
This project uses Celery to implement background jobs.
See also
Celery utilities in
megforms.celery_utilsExisting celery tasks are documented in Celery tasks
Running the celery worker
If celery worker is not available, celery tasks can be ran synchronously by setting CELERY_TASK_ALWAYS_EAGER - this is set to true by default.
This has the benefit of being easy to use, but results in poor performance as all work is done within the wsgi request thread, and does not support concurrency.
In eager mode, the scheduled jobs are not triggered. This requires a Celery Beat process.
docker compose
When running the project using the Using docker compose method, the celery worker and beat should already be running by default.
virtualenv
If the project is ran locally in a virtualenv, you need to install and run Redis so that worker process to act as a job queue for celery.
Note
Adding redis connection to the project, will enable redis as a cache backend as well as celery queue.
See also
Monitoring & Troubleshooting Celery
Download and install Redis.
Ensure that Redis server is running
You may need to run redis-server in terminal to run it
Set the following environment variables to point at the redis database:
CELERY_TASK_ALWAYS_EAGERto “False”REDIS_HOSTandREDIS_PORT_NUMBERas neededPYTHONPATHshould be passed to celery and point at themeg_formsfolder (absolute path), e.g.${PWD}/meg_forms.
Example variables for running celery workerexport REDIS_HOST=localhost export CELERY_TASK_ALWAYS_EAGER=False
Activate python virtual environment if you haven’t already.
Run Celery worker to handle incoming tasks:
Execute this command and leave the process running in the backgroundPYTHONPATH=${PWD}/meg_forms celery -A meg_forms worker -l DEBUG --concurrency ${CELERY_CONCURRENCY:-4}
Run Celery Beat to trigger scheduled tasks:
Execute this command and leave the process running in the backgroundPYTHONPATH=${PWD}/meg_forms celery -A meg_forms beat -l DEBUG
Implementing tasks
Tasks should be added to the app defined in celery_app. Use one of the presets in celery_constants to set the default retry policy and priority.
from meg_forms import app
from celery_constants import CELERY_TASK_DEFAULTS
@app.task(**CELERY_TASK_DEFAULTS)
def example_task(form_id: int):
form: AuditForm = AuditForm.objects.get(pk=form_id)
Some important rules when designing celery tasks:
Tasks must be always implemented in relevant app’s
tasks.pymodule.When accepting model instance, the job should use id instead of serialized object instance
A celery job should not take longer than 10s to complete. Large jobs should be split into smaller ones to take advantage of concurrency.
When querying and saving objects, use
select_for_update()to ensure that object is locked, especially in long-running tasks.
Task priorities
The following task priorities are available. The jobs will be executed in order from high to low priority.
Task’s priority can be set by adding priority parameter to its decorator.
- High
Important jobs that should skip the queue. Use for rendering data that user is actively waiting for, but don’t take much time. For example, rendering previews. This can be either a celery job awaited by a view, or polled by htmx while showing a progress spinner to the user.
- Normal
Default priority used for most jobs.
- Low
Unimportant celery tasks that can wait until all other jobs have been completed, such as routine maintenance. Use this priority in particular when launching a lot of jobs that would otherwise fill up the celery queue.
Triggering tasks
To trigger a celery job in the background, invoke its delay() method:
example_task.delay(form.pk)
Important
When triggering a celery job within a transaction, ensure that job does not start until transaction is committed to ensure that celery worker has the latest version of the object. For example:
form = AuditForm.objects.create(...)
transaction.on_commit(lambda: example_task.delay(form.pk))
Overriding priority
To override task’s default priority by calling apply_async:
example_task.apply_async(args=[form.pk], priority=CELERY_PRIORITY_HIGH)
Advanced
More advanced options of running, grouping and chaining tasks are explained in celery workflows documentation.
Scheduling recurring jobs
Repeating tasks can be scheduled by:
hard-coding its schedule in
app.conf.beat_schedulefrom meg_forms import app app.conf.beat_schedule |= { 'job-name': { 'task': 'megforms.example', 'schedule': crontab(minute=0, hour=0), 'kwargs': {}, 'options': {}, }, }
Manually adding a
PeriodicTaskin django admin: /meg-admin/django_celery_beat/periodictask/add/Programmatically scheduling a
PeriodicTaskusing one of the utilities:Schedule a repeating job using
schedule_repeating_task()Schedule a once-off job using
schedule_task()
Monitoring & Troubleshooting
Celery dashboard is available in celery admin dashboard. Celery queue can be viewed in Celery page - it shows pending and currently running jobs as well as a list of workers online.
The following manage.py commands are available for troubleshooting:
- celery_queue
Lists all jobs in the queue
- celery_results
Prints out results of all completed jobs
- cancel_celery_task [task_path]
Cancels a task by name. For example:
./manage.py cancel_celery_task accounts.tasks.celery_update_user_form_permission_cache