Distributed Task Queue
Also know as job queue - means async/background task processing
Workers (the things processing the tasks) can be on different machines.
Central component that has a list of all tasks
# install
pip install celery
# celeryconfig.py
RABBITMQ_PORT = '5672'
RABBITMQ_USER = 'user'
RABBITMQ_PASS = 'password'
BROKER_URL = 'amqp://{0}:{1}@localhost:{2}//'.format(
RABBITMQ_USER,
RABBITMQ_PASS,
RABBITMQ_PORT)
CELERY_IMPORTS = ('mymodule.tasks',)
# and run it
celery worker -E -l INFO -n myWorker_1
# in tasks.py
app = Celery('myapp')
app.config_from_object('celeryconfig')
## app.config_from_object('django.conf:settings')
## app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def regular_py_function(self, x, y):
print("Doing stuff...")
# calling it
from tasks import *
regular_py_function.delay(x=1, y=2)
# or
regular_py_function.apply_async((1, 2), retry=True)
# or even better
from celery.execute import send_task
send_task("tasks.regular_py_function", args=[1, 2], kwargs={})
Don't use a Database as a broker.
It's a bad idea! Even in development.
Use more Queues (ie. not just the default one)
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),
)
CELERY_ROUTES = {
'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_B'},
'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},
}
Use priority workers
Some tasks are just more important than others.
celery worker -E -l INFO -n workerA -Q for_task_A
celery worker -E -l INFO -n workerB -Q for_task_B
Use error handling
app.task(bind=True, default_retry_delay=10, max_retries=5)
def awesome_task(self, x):
try:
print("Doing some network IO")
r = requests.get('https://httpbin.org/get')
parse_request(r)
except ConnectionError as e:
self.retry(e)
except MyException as e:
do_some_cleanup()
self.retry(e) # or don't
Keep track of results only if you really need them
CELERY_IGNORE_RESULT = True
Flower - https://github.com/mher/flower
pip install flower
celery flower --port=5555 # http://localhost:5555/