Best practices

Deni Bertovic / @denibertovic

What is Celery?

Distributed Task Queue

Task Queue?

Also know as job queue - means async/background task processing


Workers (the things processing the tasks) can be on different machines.

Broker? AMQP?

Central component that has a list of all tasks

  1. RabbitMQ
  2. Redis
  3. ...

So it kinda looks like this...

How does it fit with Django?

  1. Avoid long running requests
  2. Good abstraction for async/message passing

How to install?

                    # install
                    pip install celery

                    # celeryconfig.py
                    RABBITMQ_PORT = '5672'
                    RABBITMQ_USER = 'user'
                    RABBITMQ_PASS = 'password'

                    BROKER_URL = 'amqp://{0}:{1}@localhost:{2}//'.format(
                    CELERY_IMPORTS = ('mymodule.tasks',)

                    # and run it
                    celery worker -E -l INFO -n myWorker_1


How to use?

                    # in tasks.py
                    app = Celery('myapp')
                    ## app.config_from_object('django.conf:settings')
                    ## app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

                    def regular_py_function(self, x, y):
                        print("Doing stuff...")

How to use?

        # calling it
        from tasks import *
        regular_py_function.delay(x=1, y=2)

        # or
        regular_py_function.apply_async((1, 2), retry=True)

        # or even better
        from celery.execute import send_task
        send_task("tasks.regular_py_function", args=[1, 2], kwargs={})

Let's talk about best practices!


Don't use a Database as a broker.

It's a bad idea! Even in development.


Use more Queues (ie. not just the default one)


        Queue('default', Exchange('default'), routing_key='default'),
        Queue('for_task_A', Exchange('for_task_A'), routing_key='for_task_A'),
        Queue('for_task_B', Exchange('for_task_B'), routing_key='for_task_B'),

        'my_taskA': {'queue': 'for_task_A', 'routing_key': 'for_task_B'},
        'my_taskB': {'queue': 'for_task_B', 'routing_key': 'for_task_B'},



Use priority workers

Some tasks are just more important than others.

            celery worker -E -l INFO -n workerA -Q for_task_A
            celery worker -E -l INFO -n workerB -Q for_task_B


Use error handling

            app.task(bind=True, default_retry_delay=10, max_retries=5)
            def awesome_task(self, x):
                    print("Doing some network IO")
                    r = requests.get('https://httpbin.org/get')
                except ConnectionError as e:
                except MyException as e:
                    self.retry(e) # or don't


Keep track of results only if you really need them

                        CELERY_IGNORE_RESULT = True


Flower - https://github.com/mher/flower

            pip install flower
            celery flower --port=5555 # http://localhost:5555/
  1. Task progress and history.
  2. Ability to show task details (arguments, start time, runtime, etc)
  3. Graphs and statistics
  4. Remote Control

Thanks for listening!