Using Celery in Python with tasks defined in different modules

Setup

Requirements

To get started, you will need Python installed on your system. Additionally, you will need RabbitMQ and Redis. You can install RabbitMQ and Redis on your local machine or use Docker containers.

Python Dependencies

Install Celery using pip:

pip install celery

Project Structure

Here’s a simple project structure to organize your Celery tasks:

celery_project/
│
├── celery_app.py    # Celery configuration and instance
├── task1.py         # Module for 'add' task
├── task2.py         # Module for 'multiply' task
└── main.py          # Main script to execute tasks

Celery Configuration

In celery_app.py, we configure our Celery application:

from celery import Celery

app = Celery("tasks", broker='amqp://username:password@localhost',
             backend='redis://localhost:6379/0',
             include=['task1', 'task2'])

if __name__ == '__main__':
    app.start()
  • broker: The URL of the RabbitMQ server.
  • backend: The URL of the Redis server used to store task results.
  • include: List of modules to include so Celery knows where to find the defined tasks.

Defining Tasks

Tasks are defined in task1.py and task2.py:

task1.py:

from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info(f'Starting to add {x} + {y}')
    result = x + y
    logger.info(f'Task completed with result {result}')
    return result

task2.py:

from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def multiply(x, y):
    logger.info(f'Starting to multiply {x} * {y}')
    result = x * y
    logger.info(f'Task completed with result {result}')
    return result

Running Tasks

In main.py, we initiate and execute tasks asynchronously:

from task1 import add
from task2 import multiply

result1 = add.delay(1, 2)
result2 = multiply.delay(2, 3)

print("add: " + str(result1.get(timeout=10)))
print("multiply: " + str(result2.get(timeout=10)))

Running Celery Worker

To run the Celery worker, use the following command:

.venv\Scripts\python.exe -m celery -A celery_app worker --loglevel=info -E --pool=solo