La Prog Fonctionnelle, Partie I bis
La Prog Fonctionnelle, Annexe: MapReduce
Faisons maintenant une petite parenthèse a propos de MapReduce, le framework théorique utilisé pour l'exécution de grosses requêtes dans Hadoop ou MongoDB.
Dans MapReduce, la structure de données de base est le dictionnaire (aka hash, aka table associative). Une requête MapReduce se compose de deux fonctions, un mapper et un reducer.
Le mapper sera d'abord exécuté sur toutes les paires clef-valeur de l'input, et retourne (éventuellement) une (ou plusieurs) nouvelle(s) paire(s) clef-valeur.
Ensuite, pour chaque clef dans les résultats fournis par le mapper, le reducer reçoit toutes les valeurs correspondantes. Il émet ensuite une valeur finale, et c'est cette valeur qui est stockée, avec la clef correspondante, dans l'output.
Une contrainte supplémentaire est que le reducer doit être une fonction associative (oui oui, comme l'associativité de l'addition que vous avez vu a l'école) et commutative. Le résultat de reduce([reduce([a,d]), reduce([b, reduce([c,e]), f])), par exemple, doit être identique à reduce([a,b,c,d,e,f]).
Enfin, le mapper comme le reducer ne peuvent pas avoir d'effets de bord. Interdites donc les variables globales ou la communication avec l'extérieur.
Prenons un exemple extrêmement simple: le comptage des mots dans une collection de documents (ce que ferait un moteur de recherche pour trouver les mots les plus populaires.)
Notre input est donc une collection de documents, on peut imaginer
Notre mapper va compter les mots, et émettre la valeur 1 pour chaque mot trouvé:
Notre reducer va calculer la somme des valeurs:
On soumettra la requête a la base, avec quelque chose du genre
Oui oui, mais quel intérêt ?
Il se trouve que ce modèle est particulièrement adapté pour exécuter, sur des gros clusters, des requêtes de type analytique, qui ont besoin de lire une table entière, pas juste accéder à un index.
On peut stocker les données sur un énorme cluster de machines relativement modestes. Chaque machine est responsable de son propre disque, et peut le lire entièrement en un temps raisonnable. Pour augmenter la puissance du cluster, on ajoute simplement des machines, mais leur taille ne change pas.
Quand un système pareil grandit, le coût des communications réseau devient de plus en plus élevé, alors que les coûts d'I/O ou de calcul ne changent pas. Les coûts réseau finissent donc par dominer le temps d'exécution de la requête.
Les tables clef-valeur sont stockées distribuées sur toutes les machines; chaque machine est responsable d'un petit sous-ensemble de clefs. L'allocation se fait en utilisant une fonction de hachage, de cette manière n'importe quel noeud sait instantanément sur quel autre machine se trouve une clef donnée.
Comme les mappers s'exécutent clef par clef et ne peuvent pas communiquer entre eux, ils peuvent tous être exécutés en parallèle par toutes les machines du cluster, sans aucun problème.
Les résultats collectés, peuvent ensuite passer optionnellement par une phase reduce locale, ce qui permet de réduire la taille de l'output intermédiaire. La aussi, tout est fait complètement en parallèle.
Arrive ensuite la phase du shuffle: chaque machine examine son output intermédiaire, et le transmet, en fonction de la nouvelle clef produite par le mapper, à la machine responsable de cette clef. Conséquemment, chaque machine reçoit de toutes les autres les résultats intermédiaires. C'est l'étape qui est limitée par le réseau.
Enfin, chaque machine exécute un reducer final sur les clefs collectées, avant de placer le résultat dans l'output. La aussi, cette phase se fait entièrement en parallèle.
Un petit exemple visuel
Au départ:
Après l'exécution du mapper, et du premier reducer:
Après le shuffle (la répartition des clefs est choisie par hash, donc sans logique apparente, mais ici j'ai choisi un pseudo-ordre alphabétique):
Et après l'exécution du reducer final:
Exercice
Ecrivez une fonction MapReduce qui calcule le mot le plus courant de toute la collection
Fin de la parenthèse, et retour à un aspect plus théorique de la programmation !
Faisons maintenant une petite parenthèse a propos de MapReduce, le framework théorique utilisé pour l'exécution de grosses requêtes dans Hadoop ou MongoDB.
Dans MapReduce, la structure de données de base est le dictionnaire (aka hash, aka table associative). Une requête MapReduce se compose de deux fonctions, un mapper et un reducer.
Le mapper sera d'abord exécuté sur toutes les paires clef-valeur de l'input, et retourne (éventuellement) une (ou plusieurs) nouvelle(s) paire(s) clef-valeur.
Ensuite, pour chaque clef dans les résultats fournis par le mapper, le reducer reçoit toutes les valeurs correspondantes. Il émet ensuite une valeur finale, et c'est cette valeur qui est stockée, avec la clef correspondante, dans l'output.
Une contrainte supplémentaire est que le reducer doit être une fonction associative (oui oui, comme l'associativité de l'addition que vous avez vu a l'école) et commutative. Le résultat de reduce([reduce([a,d]), reduce([b, reduce([c,e]), f])), par exemple, doit être identique à reduce([a,b,c,d,e,f]).
Enfin, le mapper comme le reducer ne peuvent pas avoir d'effets de bord. Interdites donc les variables globales ou la communication avec l'extérieur.
Prenons un exemple extrêmement simple: le comptage des mots dans une collection de documents (ce que ferait un moteur de recherche pour trouver les mots les plus populaires.)
Notre input est donc une collection de documents, on peut imaginer
Code :
input = {'0001': 'blah bli blu', '0002': 'YOU LOST THE GAME', ... }
Notre mapper va compter les mots, et émettre la valeur 1 pour chaque mot trouvé:
Code :
def mapper(key, value):
# key = ID du document, value = contenu du document
for word in value.split():
emit(word, 1) # emission de la paire clef-valeur en sortie
Notre reducer va calculer la somme des valeurs:
Code :
def reducer(key, values):
return sum(values)
On soumettra la requête a la base, avec quelque chose du genre
Code :
output = runMapReduce(input, mapper, reducer)
Oui oui, mais quel intérêt ?
Il se trouve que ce modèle est particulièrement adapté pour exécuter, sur des gros clusters, des requêtes de type analytique, qui ont besoin de lire une table entière, pas juste accéder à un index.
On peut stocker les données sur un énorme cluster de machines relativement modestes. Chaque machine est responsable de son propre disque, et peut le lire entièrement en un temps raisonnable. Pour augmenter la puissance du cluster, on ajoute simplement des machines, mais leur taille ne change pas.
Quand un système pareil grandit, le coût des communications réseau devient de plus en plus élevé, alors que les coûts d'I/O ou de calcul ne changent pas. Les coûts réseau finissent donc par dominer le temps d'exécution de la requête.
Les tables clef-valeur sont stockées distribuées sur toutes les machines; chaque machine est responsable d'un petit sous-ensemble de clefs. L'allocation se fait en utilisant une fonction de hachage, de cette manière n'importe quel noeud sait instantanément sur quel autre machine se trouve une clef donnée.
Comme les mappers s'exécutent clef par clef et ne peuvent pas communiquer entre eux, ils peuvent tous être exécutés en parallèle par toutes les machines du cluster, sans aucun problème.
Les résultats collectés, peuvent ensuite passer optionnellement par une phase reduce locale, ce qui permet de réduire la taille de l'output intermédiaire. La aussi, tout est fait complètement en parallèle.
Arrive ensuite la phase du shuffle: chaque machine examine son output intermédiaire, et le transmet, en fonction de la nouvelle clef produite par le mapper, à la machine responsable de cette clef. Conséquemment, chaque machine reçoit de toutes les autres les résultats intermédiaires. C'est l'étape qui est limitée par le réseau.
Enfin, chaque machine exécute un reducer final sur les clefs collectées, avant de placer le résultat dans l'output. La aussi, cette phase se fait entièrement en parallèle.
Un petit exemple visuel
Au départ:
- Machine 1:
Code :0001: I LIKE TURTLES
0002: I LOST THE GAME
- Machine 2:
Code :0003: I LIKE THE GAME
0004: I LOST MY KEYS
Après l'exécution du mapper, et du premier reducer:
- Machine 1:
Code :I: 2
GAME: 1
LIKE: 1
LOST: 1
THE: 1
TURTLES: 1
- Machine 2:
Code :I: 2
GAME: 1
KEYS: 1
LIKE: 1
LOST: 1
MY: 1
THE: 1
Après le shuffle (la répartition des clefs est choisie par hash, donc sans logique apparente, mais ici j'ai choisi un pseudo-ordre alphabétique):
- Machine 1:
Code :I: 2
I: 2
GAME: 1
GAME: 1
KEYS: 1
LIKE: 1
LIKE: 1
- Machine 2:
Code :LOST: 1
LOST: 1
MY: 1
THE: 1
THE: 1
TURTLES: 1
Et après l'exécution du reducer final:
- Machine 1:
Code :I: 4
GAME: 2
KEYS: 1
LIKE: 1
- Machine 2:
Code :LOST: 2
MY: 1
THE: 2
TURTLES: 1
Exercice
Ecrivez une fonction MapReduce qui calcule le mot le plus courant de toute la collection
Fin de la parenthèse, et retour à un aspect plus théorique de la programmation !