=============
Celery tasks
=============
This project uses `Celery `_ to implement background jobs.
.. seealso::
* Celery utilities in :mod:`megforms.celery_utils`
* Existing celery tasks are documented in :ref:`celery tasks`
* `Celery documentation `_
Running the celery worker
==========================
If celery worker is not available, celery tasks can be ran synchronously by setting :envvar:`CELERY_TASK_ALWAYS_EAGER` - this is set to true by default.
This has the benefit of being easy to use, but results in poor performance as all work is done within the wsgi request thread, and does not support concurrency.
In eager mode, the scheduled jobs are not triggered. This requires a Celery Beat process.
docker compose
---------------
When running the project using the :ref:`docker compose` method, the celery worker and beat should already be running by default.
virtualenv
-------------
If the project is ran locally in a :ref:`virtualenv `, you need to install and run Redis so that worker process to act as a job queue for celery.
.. note:: Adding redis connection to the project, will enable redis as a cache backend as well as celery queue.
.. seealso:: :ref:`celery troubleshooting` Celery
#. Download and install `Redis `_.
* Ensure that Redis server is running
* You may need to run :command:`redis-server` in terminal to run it
#. Set the following :ref:`environment variables ` to point at the redis database:
* :envvar:`CELERY_TASK_ALWAYS_EAGER` to "False"
* :envvar:`REDIS_HOST` and :envvar:`REDIS_PORT_NUMBER` as needed
* :envvar:`PYTHONPATH` should be passed to celery and point at the :file:`meg_forms` folder (absolute path), e.g. :code:`${PWD}/meg_forms`.
.. code-block:: bash
:caption: Example variables for running celery worker
export REDIS_HOST=localhost
export CELERY_TASK_ALWAYS_EAGER=False
#. Activate :ref:`python virtual environment ` if you haven't already.
#. Run Celery worker to handle incoming tasks:
.. code-block:: bash
:caption: Execute this command and leave the process running in the background
PYTHONPATH=${PWD}/meg_forms celery -A meg_forms worker -l DEBUG --concurrency ${CELERY_CONCURRENCY:-4}
#. Run Celery Beat to trigger scheduled tasks:
.. code-block:: bash
:caption: Execute this command and leave the process running in the background
PYTHONPATH=${PWD}/meg_forms celery -A meg_forms beat -l DEBUG
Implementing tasks
====================
Tasks should be added to the app defined in :mod:`celery_app`. Use one of the presets in :mod:`celery_constants` to set the default retry policy and priority.
.. code-block:: python
from meg_forms import app
from celery_constants import CELERY_TASK_DEFAULTS
@app.task(**CELERY_TASK_DEFAULTS)
def example_task(form_id: int):
form: AuditForm = AuditForm.objects.get(pk=form_id)
**Some important rules when designing celery tasks:**
* Tasks must be always implemented in relevant app's :file:`tasks.py` module.
* When accepting model instance, the job should use id instead of serialized object instance
* A celery job should not take longer than 10s to complete. Large jobs should be split into smaller ones to take advantage of concurrency.
* When querying and saving objects, use :meth:`select_for_update` to ensure that object is locked, especially in long-running tasks.
Task priorities
================
The following task priorities are available. The jobs will be executed in order from high to low priority.
Task's priority can be set by adding ``priority`` parameter to its decorator.
High
Important jobs that should skip the queue. Use for rendering data that user is actively waiting for, but don't take much time. For example, rendering previews.
This can be either a celery job awaited by a view, or polled by htmx while showing a progress spinner to the user.
Normal
Default priority used for most jobs.
Low
Unimportant celery tasks that can wait until all other jobs have been completed, such as routine maintenance.
Use this priority in particular when launching a lot of jobs that would otherwise fill up the celery queue.
Triggering tasks
==================
To trigger a celery job in the background, invoke its :meth:`delay` method:
.. code-block:: python
example_task.delay(form.pk)
.. important:: When triggering a celery job within a transaction, ensure that job does not start
until transaction is committed to ensure that celery worker has the latest version of the object. For example:
.. code-block:: python
form = AuditForm.objects.create(...)
transaction.on_commit(lambda: example_task.delay(form.pk))
Overriding priority
----------------------
To override task's default priority by calling `apply_async `_:
.. code-block:: python
example_task.apply_async(args=[form.pk], priority=CELERY_PRIORITY_HIGH)
Advanced
---------
More advanced options of running, grouping and chaining tasks are explained in `celery workflows documentation `_.
Scheduling recurring jobs
============================
Repeating tasks can be scheduled by:
* hard-coding its schedule in ``app.conf.beat_schedule``
.. code-block:: python
from meg_forms import app
app.conf.beat_schedule |= {
'job-name': {
'task': 'megforms.example',
'schedule': crontab(minute=0, hour=0),
'kwargs': {},
'options': {},
},
}
* Manually adding a :class:`PeriodicTask` in :term:`django admin`: :url:`/meg-admin/django_celery_beat/periodictask/add/`
* Programmatically scheduling a :class:`PeriodicTask` using one of the utilities:
* Schedule a repeating job using :func:`~megforms.celery_utils.schedule_repeating_task`
* Schedule a once-off job using :func:`~megforms.celery_utils.schedule_task`
.. _celery troubleshooting:
Monitoring & Troubleshooting
===============================
Celery dashboard is available in :url:`celery admin dashboard `.
Celery queue can be viewed in :url:`Celery page ` - it shows pending and currently running jobs as well as a list of workers online.
The following ``manage.py`` commands are available for troubleshooting:
:command:`celery_queue`
Lists all jobs in the queue
:command:`celery_results`
Prints out results of all completed jobs
:command:`cancel_celery_task [task_path]`
Cancels a task by name. For example::
./manage.py cancel_celery_task accounts.tasks.celery_update_user_form_permission_cache