PROJET AUTOBLOG


Sam & Max: Python, Django, Git et du cul

Site original : Sam & Max: Python, Django, Git et du cul

⇐ retour index

Mise à jour

Mise à jour de la base de données, veuillez patienter...

Files de tâches et tâches récurrentes avec Celery

samedi 27 juillet 2013 à 09:57

Quand on a à traiter des choses bloquantes, avec des dépendances, des flux complexes ou des actions répétitives, créer des files d’attente peut se révéler très judicieux.

Par exemple lancer la génération d’un gros zip sur le clic d’un utilisateur, télécharger plein fichiers en parallèle pour son site de cul, lancer des calculs sur plusieurs machines et récupérer le résultat, encoder des videos en arrière plan, etc.

Le problème, c’est que fabriquer des files d’attente à la main, ça mène généralement à une grosse galère. La première boîte dans laquelle j’ai travaillé avait tout un système de queues à base de PHP + SQL fait à la main qui tapait dans du MySQL, c’était pas marrant du tout

Je fais une pause, et je note que le potentiel de jeux de mots sur cet article est fortement élevé. Mais je resterai fort.

Locking, priorité, dépendance, asynchronicité, concurrence, sérialisation, encoding, stockage, accessibilité, load balancing… Toutes ces problématiques sont bien vicieuses et chronophages. Il vaut mieux utiliser une lib solide et éprouvée.

Je resterai fort.

Kombu est une telle lib, mais elle est lourde et complexe à utiliser. J’avais fais le choix de la prendre pour un gros projet avec Max, je le regrette sur le long terme : c’est dur à maintenir et à faire évoluer. Le code est vraiment pas sympa.

Heureusement il existe une bibliothèque qui se met au dessus de Kombu pour et nous expose juste les fonctionnalités que l’on souhaite : celery.

Fort.

Celery est simple pour démarrer, mais très puissant si on rentre dans le détail, et croyez moi, le détail, on peut y rentrer très très profondément.

F…

Installation

Qui dit file d’attente, dit stockage. Il faut bien mettre les tâches quelque part et communiquer avec ce quelque part. En base de données ? Dans un gestionnaire de messages ? En mémoire ? Dans un cache ?

Celery résout le problème en proposant la même interface, quelque soit le support. Actuellement, on peut utiliser :

Dans notre exemple, nous allons le faire avec Redis car :

Pour ceux qui ont pas redis, c’est généralement dans les dépôts. Par exemple sur Ubuntu :

sudo apt-get install redis-server

Il n’y a rien à faire de plus, ça tourne, c’est configuré avec des valeurs par défaut qui sont saines. Je vous l’ai dis, redis, c’est fantastiquement bien foutu.

Ensuite on install celery et la lib d’accès à redis en Python qui porte un nom très original :

pip install celery redis

Ca devrait compiler un peu, et comme hab avec les extensions en C, assurez vous d’avoir un compilateur et les headers en place comme indiqué dans l’article sur pip.

Ensuite on peut créer ses tâches. Créez un module, par exemple tasks.py :

import urllib2
 
from collections import Counter
 
from celery import Celery
 
# Configuration de celery. Ceci peut aussi se faire dans un fichier de config.
# Ici on dit à celery que pour le module 'tasks', on va utiliser redis
# comme broker (passeur de massage) et comme result backend (stockage du
# resultat des tâches).
celery = Celery('tasks', broker='redis://localhost', backend='redis://localhost')
 
 
# Et voici notre première tâche. C'est une fonction Python normale, décorée
# avec un decorateur de celery. Elle prend une URL, et calcule le nombre
# de lettre "e" qu'il y a dans la page.
@celery.task
def ecount(url):
    return Counter(urllib2.urlopen(url).read())['e']

On lance ensuite le processus celery dans un terminal (en production, mettez ça dans supervisord ou systemd pour que ça démarre automatiquement) :

[test] sam ~/Bureau/celery_test $ celery -A tasks worker -B --loglevel=info

 -------------- celery@sam v3.0.21 (Chiastic Slide)
---- **** -----
--- * ***  * -- Linux-3.2.0-48-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> broker:      redis://localhost:6379//
- ** ---------- .> app:         tasks:0x2a2fa50
- ** ---------- .> concurrency: 4 (processes)
- *** --- * --- .> events:      OFF (enable -E to monitor this worker)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery:      exchange:celery(direct) binding:celery

[Tasks]
  . tasks.ecount

[2013-07-26 13:22:21,631: INFO/Beat] Celerybeat: Starting..
[2013-07-26 13:04:51,274: WARNING/MainProcess] celery@sam ready.
[2013-07-26 13:04:51,280: INFO/MainProcess] consumer: Connected to redis://localhost:6379//.

-A précise le module à importer, -B démarre le beat (on verra ça plus tard), worker dit à celery que démarrer des processus de consommation de files d’attente (par défaut 4 qui travaillent en parallèle), et --loglevel=info va nous permettre d’avoir un affichage verbeux pour comprendre ce qui se passe.

Votre file d’attente est prête, et frétille d’impatience.

Lancer une tâche

A partir de là, vous pouvez envoyer des tâches dans la file d’attente, depuis n’importe où :

Plusieurs programmes peuvent envoyer plein de tâches, en même temps, et elles vont se loger dans la file d’attente, sans bloquer le programme qui les a envoyé.

Par exemple, depuis le shell :

>>> from tasks import ecount
>>> res = ecount.delay('http://danstonchat.com')

Ceci ne bloque pas mon shell, la ligne s’exécute immédiatement. La fonction ecount n’est pas appelée depuis le shell, elle est dans la file d’attente et sera appelée par un des processus (les fameux ‘worker’) qui consomment la queue. Du côté de la file, on peut voir dans le log :

[2013-07-26 14:18:08,609: INFO/MainProcess] Got task from broker: tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df]
[2013-07-26 14:18:09,070: INFO/MainProcess] Task tasks.ecount[599a52ea-ef6b-4499-981d-cd17fab592df] succeeded in 0.446974039078s: 1242

On a donc notre tâche qui a bien été traitée.

On peut récupérer le résultat dans le shell :

>>> res.state
'PENDING'

Ah… La tâche n’est pas encore terminée. Et un peu plus tard :

>>> res.state
'SUCCESS'
>>> res.result
1242

Lancer une tâche est bien entendu peu intéressant, les listes d’attente sont vraiment sympa quand on a plein de tâches à lancer, par plein de processus différents :

results = [ecount.delay(url) for url in ('http://google.com', 'http://sametmax.com', 'http://sebsauvage.com', 'http://multiboards.com', 'http://0bin.net', 'http://danstonchat.com')]
[2013-07-26 14:25:46,646: INFO/MainProcess] Got task from broker: tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724]
[2013-07-26 14:25:46,649: INFO/MainProcess] Got task from broker: tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2]
[2013-07-26 14:25:46,650: INFO/MainProcess] Got task from broker: tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e]
[2013-07-26 14:25:46,652: INFO/MainProcess] Got task from broker: tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d]
[2013-07-26 14:25:46,653: INFO/MainProcess] Got task from broker: tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246]
[2013-07-26 14:25:46,654: INFO/MainProcess] Got task from broker: tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7]
[2013-07-26 14:25:47,144: INFO/MainProcess] Task tasks.ecount[bbe46b1b-4719-4c42-bd2f-21e4d72e613e] succeeded in 0.479865789413s: 27
[2013-07-26 14:25:47,242: INFO/MainProcess] Task tasks.ecount[5d072a7b-29f8-4ea6-8d92-6a4c1740d724] succeeded in 0.578661203384s: 609
[2013-07-26 14:25:47,501: INFO/MainProcess] Task tasks.ecount[fc63f5db-8ade-4383-b719-c3d6390ca246] succeeded in 0.35736989975s: 263
[2013-07-26 14:25:47,645: INFO/MainProcess] Task tasks.ecount[8434e21d-79ea-4559-a90e-92e2bc2b9dc7] succeeded in 0.403187036514s: 1270
[2013-07-26 14:25:47,815: INFO/MainProcess] Task tasks.ecount[8fb35186-66e2-4eae-a40c-fc42e500ab9d] succeeded in 1.14100408554s: 23
[2013-07-26 14:25:49,010: INFO/MainProcess] Task tasks.ecount[402f6a4f-6b35-4f62-a786-9a5ba27707d2] succeeded in 2.34633708s: 3158

Car du coup on sait que ces multiples tâches ne vont pas bloquer le processus en cour, mais qu’en plus la charge sera répartie sur le nombre de workers qu’on a décidé au départ, ni plus (surcharge du serveur), ni moins (traitement trop lent).

Comment je sais quand une tâche est terminée ?

On peut attendre que la tâche soit terminée :

>>> print res.wait()
9999

Mais ce n’est pas vraiment le but. On cherche avant tout à ce que les tâches soient non bloquantes, et exécutées dans un processus à part voir potentiellement distribuées sur plusieurs serveurs.

Par ailleurs, Celery n’est pas un remplacement d’un système de traitement asynchrone comme Tornado ou NodeJS, il n’est pas fait pour envoyer des réponses asynchrones à l’utilisateur. Il est fait pour faire des tâches en background, répartir la charge et ordonner le traitement. Bien entendu, on peut faire communiquer un système asynchrone avec celery comme ici ou ici, mais c’est une autre histoire.

Concentrons nous sur les tâches.

La question de “Comment je sais quand une tâche est terminée ?” est souvent traduisible par “comment je réagis à une tâche pour lancer du code quand elle s’est terminée sans erreur ?”.

Et là, il y une solution toute simple :

res = tache1.s(arg1, arg2) | tache2.s() | tache3.s(arg1)

Ceci va créer une chaîne de tâches. Quand la première se termine, la deuxième se lance en recevant le résultat de la première en argument.

s() fabrique une sous-tâche, c’est à dire une tâche à envoyer dans la file plus tard avec des arguments pré-enregistrés. Dans notre exemple, celery va lancer tache1 avec deux arguments, puis si ça marche, va appeler tache2 en lui passant le résultat de tache1 comme argument, puis si ça marche, va appeler tache3 avec le résultat de tache2 en premier argument et arg1 en second argument.

En fait, celery vient avec tout un tas d’outils pour exécuter des tâches dépendantes les unes des autres : par groupes, par chaînes, par morceaux, etc. Mais de toute façon, vous pouvez appeler une tâche… à l’intérieur d’une autre tâche. Donc parti de là vous pouvez faire pas mal de choses.

Comment je fais pour faire une tâche récurrente ?

C’est là qu’intervient le “beat” dont j’ai parlé tout à l’heure. Avec cette option, celery va vérifier toutes les secondes si il n’y a pas une tâche répétitive à lancer, et la mettre dans une file d’attente, à la manière d’un cron.

Il suffit de définir une tâche comme periodic_task pour qu’elle soit lancée régulièrement.

import smtplib
 
from celery.schedules import crontab
from celery.decorators import periodic_task
 
# va executer la tâche à 5h30, 13h30 et 23h30 tous les lundi
# run_every accepte aussi un timedelta, pour par exemple dire "toutes les 10m"
@periodic_task(run_every=crontab(hour='5,13,23', minute=30, day_of_week='monday')
def is_alive():
    """
        Vérifie que le blog est toujours en ligne, et si ce n'est pas le cas,
        envoie un mail en panique.
    """
    if urllib2.urlopen('http://sametmax.com').code != 200:
        mail = 'lesametlemax__AT__gmail.com'.replace('__AT__', '@')
        server = smtplib.SMTP('smtp.gmail.com:587')
        server.starttls()
        server.login('root', 'admin123')
        server.sendmail(mail, mail, msg)
        server.quit()

Il y a de bons exemples sur la syntaxe sur crontab() dans la doc.

D’une manière générale, la doc de Celery est très très riche, donc plongez vous dedans si cet article ne répond pas à vos besoins, car si ça peut être mis dans une file, ça peut être fait par Celery.

Note de fin

Celery n’autoreload pas le code, donc redémarrez les workers à chaque fois que vous modifiez vos tasks.

Attention aussi aux tâches récurrentes, la suivante peut se lancer avant que la précédente soit terminée. C’est à vous de faire des tâches idempotentes, ou alors de mettre en place un système de locking.

flattr this!

Error happened! 0 - count(): Argument #1 ($value) must be of type Countable|array, null given In: /var/www/ecirtam.net/autoblogs/autoblogs/autoblog.php:428 http://www.ecirtam.net/autoblogs/autoblogs/sametmaxcom_a844ada43a979e3b1395ab9acb6afafb84340999/?Files-de-t%C3%A2ches-et-t%C3%A2ches-r%C3%A9currentes-avec-Celery #0 /var/www/ecirtam.net/autoblogs/autoblogs/autoblog.php(999): VroumVroum_Blog->update() #1 /var/www/ecirtam.net/autoblogs/autoblogs/sametmaxcom_a844ada43a979e3b1395ab9acb6afafb84340999/index.php(1): require_once('...') #2 {main}