V4RaceMachine

Commençons par le commencement. Jusqu'à il y a peu, le multithreading était un domaine dans lequel peu osaient se lancer, et pour cause : c'est un domaine assez complexe à maîtriser où les erreurs sont sanctionnées violemment. Le spectre du deadlock a hanté tous ceux qui s'y sont penchés. Et lorsqu'il était utilisé, le multithreading servait finalement assez peu à paralléliser réellement les tâches, mais plutôt à permettre, de façon assez simple, d'obtenir des traitements asynchrones. Ce qui était déjà un progrès :) (Je pense au SwingWorker tout à coup...) Les choses se sont simplifiées avec l'arrivée de Java 5 et du package java.util.concurrent fournissant un ensemble de commodités pour s'abstraire un peu de la complexité initiale du multithreading. Les pools de thread, les queues, ... ont apporté un peu d'air frais. L'ExecutorService a simplifié l'execution en parallèle de tâches en s'affranchissant de la manipulation directe des Threads. Cette classe propose un pool de threads qui seront utilisés pour executer une liste de taches lui étant soumises. L'objectif est atteint : ça fonctionne bien et de façon simple. Mais cette solution souffre de quelques limitations parmi lesquelles :

  • tout va bien si les tâches sont indépendantes : une tâche s'execute et une fois terminée, le thread l'ayant executée est libre pour traiter la suivante... mais si une tâche doit attendre le résultat d'une autre, qui s'execute en parallèle, le thread qu'elle monopolise n'est pas rendu au pool tant que l'autre tâche n'est pas terminée et que la tâche courante n'a pas exploité le retour de l'autre.
  • apprécier la taille optimale du pool est toujours une tâche délicate
  • le nombre de tâches parallèlisables souhaitées ne peut pas être déraisonnablement grand, car limité par le nombre de threads que cela induit. Trop de threads tue le thread ! La concurrence induite par un grand nombre de threads sur les ressources devient nuisible à la rapidité globale d'execution.

C'est là que la JSR 166y, conduite par Doug Lea, apporte un souffle nouveau... ou plutôt apportera car le développement est toujours en cours (NOTE : vous pouvez d'ores et déjà tester les fonctionnalités avec le jar jsr166y.jar). Le principe de base est la subdivision des traitements en plus petites unités de travail. Les traitements à un niveau de subdivision seront lancés en parallèle et les résultats obtenus en retour, agrégés. Ce principe est appelé fork/join. Chaque tâche, si sa taille est au dessus d'un certain seuil, sera subdivisée en sous-tâches (fork) qui seront exécutées individuellement et parallèlement (ou subdivisées encore) puis agrège (join) les résultats de chacune. Le comportement ici diffère de celui observable avec l'ExecutorService en cela que lors des périodes d'attente, le thread n'est plus monopolisé pour rien mais est affecté à l'execution d'une autre unité de travail.

Prenons, pour illustrer ce fonctionnement, l'exemple proposé par Doug Lea : la résolution par recursivité (NOTE : l'approche récursive n'est pas la plus performante, juste pratique pour la demo) de la suite de Fibonacci (dont la forme récurrente est : u(n+2) = u(n+1) + u(n), u(1) = u(2) = 1). Voici l'approche fork/join en java :

class Fibonacci extends RecursiveAction {
[...]

    volatile int number;
    volatile int result;

[...]

    public void compute() {
        int n = number;
        if (n <= threshold) { // seuil de granularité sous lequel on ne parallèlise plus
            result = seqFib(n);
        } else {
            final Fibonacci f1 = new Fibonacci(n - 1); // subdivision
            final Fibonacci f2 = new Fibonacci(n - 2);
            forkJoin(f1, f2); // invocation en parallèle des sous-taches (fork & join)
            result = f1.result + f2.result; // agregation des resultats
        }
    }

    public int seqFib(int n) {
        if (n <= 1) {
            return n;
        } else {
            return seqFib(n - 1) + seqFib(n - 2);


        }
    }

    public static void main(String[] args) {
        int groupSize = 2; // for example, try Runtime.getRuntime().availableProcessors() too
        ForkJoinPool pool = new ForkJoinPool(groupSize);
        Fibonacci f = new Fibonacci(44); // for example
        pool.invoke(f);
        int result = f.getAnswer();
        System.out.println("Answer: " + result);
    }

[...]
}

L'exemple montre le fonctionnement de subdivision au delà d'un seuil ('threshold' dans le code), librement définissable, puis la parallélisation de l'execution (via l'appel de la méthode forkJoin(..) de la classe RecursiveAction fournie par l'API fork/join) et enfin la consolidation des résultats. La méthode main(..) permet de voir le dimensionnement du pool d'execution (groupSize, 2 threads ici, la methode availableProcessors(..) de la classe Runtime peut très bien fixer cette valeur) en charge de l'execution des tâches parallèlisables. Ce dimensionnement du pool ne doit pas être le simple fruit du hasard : il est optimal de définir comme nombre de threads le nombre de 'cores' disponibles (NOTE : ce nombre peut être inférieur au nombre physique) sur le serveur executant le code en question. Définir un nombre inférieur n'exploiterait pas toute la puissance disponible tandis qu'un nombre supérieur n'apporterait pas ou peu de gain : les threads en surnombre ne pouvant s'executer sans interruption d'un voisin actif (rappelons que le but du fork/join est de faire travailler tout le monde à 100% sans temps d'attente).

Chaque thread va être responsable de sa propre liste de tâches à traiter. Cependant, comme il peut avoir terminé son travail avant les autres threads du pool, il va pouvoir aller aider ses semblables. Voici comment un thread choisit une tâche. Une liste des tâches est implémentée dans chaque thread sous la forme d'une double-ended queue (appelée aussi 'deque'). La règle qui régit les accès des threads à cette structure est la suivante : Lors d'une opération de 'fork', les tâches créées par le thread sont empilées en tête de sa deque. Lorsqu'un thread est en attente d'une autre tâche pour une opération de type 'join', il va suspendre son traitement courant pour aller dépiler une nouvelle tâche depuis la tête de la deque. Si aucune autre tâche n'est présente dans sa propre deque, il ira 'voler' une tâche à un autre thread, mais cette fois par la queue de la deque. Ce mécanisme de consommation de tâches entre threads est appelé 'work stealing'. Le fait d'accéder par la queue à la deque de son voisin a plusieurs avantages : cela évite des problèmes de contention sur la tête : le voleur attaque la queue pendant que le volé s'occupe de la tête : les risques de contention sur une extrémité sont faibles. Comme voler une tâche est une opération qui ne survient qu'en cas d'épuisement de sa propre deque, la contention sur la queue de ces collections est aussi limitée naturellement car les occurences de vols sont moins fréquentes que les accès à la tête des deque. Autre avantage d'accéder à la queue de la deque du voisin : les tâches en queue de collections sont plus susceptibles d'être subdivisables car leur granularité est potentiellement plus importante que celles en tête (rappelez vous : on empile les sous-tâches sur la tête). De ce fait, voler une grosse tâche donnera suffisament de travail au voleur pour ne pas revenir de si tôt piquer chez le voisin.

Cette approche 'work stealing' est ainsi garante d'une répartition simple et efficace du travail entre threads, assez équitable en terme de charge et ne nécessitant que peu de synchronisation sur les ressources (du fait de la contention très limitée).

La parallèlisation des traitements peut être utilisée dans ces scénarii bien plus fréquents que l'évaluation de la suite de Fibonacci. On peut citer parmi les tâches classiques pouvant tirer des bénéfices immédiats de cette approche fork/join : les tris, les recherches, les calculs comme des moyennes, determination de maximum, minimum... sur des collections d'éléments.

Pour réaliser ces traitements sur des collections, l'API fournit une entité apportant un niveau d'abstraction des concepts de parallèlisation encore plus important. Cette dernière va prendre en charge la subdivision, les déclenchements d'executions et les consolidations de résultats de façon transparente. La classe en question se nomme ParallelArray et va contenir un ForkJoinExecutor ainsi qu'un tableau de données sur lequel porteront les opérations. Voici un exemple d'utilisation. Soit people un tableau d'objets de type Person décrivant une population d'individus, nous allons trouver celui qui présente le plus fort indice de masse corporelle dans une cible donnée : les hommes de 18 à 25 ans.

// Creation du pool de threads et initialisation du ParallelArray :
final ForkJoinPool pool = new ForkJoinPool(2);
final ParallelArray<Person> peopleArray = ParallelArray.createFromCopy(people, pool);

// Creation du filtre: les hommes entre 18 et 25 ans
final Ops.Predicate<Person> isYoungMan = new Ops.Predicate<Person>() {
    public boolean op(final Person p) {
         return p.getGender() == Person.MALE && p.getAge() > 18 && p.getAge() <= 25;
    }
};

// Creation de l'opération de calcul de l'IMC
final Ops.ObjectToDouble<Person> retrieveBMI = new Ops.ObjectToDouble<Person>() {
    public double op(Person p) {
        return p.getWeight() / (p.getHeight() * p.getHeight());
    }
};

SummaryStatistics summary = peopleArray.withFilter(isYoungMan)
                                 .withMapping(retrieveBMI)
                                 .summary();

System.out.println("Personne à plus haut IMC : " + peopleArray.get(summary.indexOfMax()).getName());

Cet exemple montre la simplicité d'utilisation de la classe ParallelArray. Nulle part, sauf à sa création (puisqu'on y injecte un ForkJoinExecutor), la notion de parallelisme n'apparait et pourtant sous le capot, les chevaux sont prêt à bondir.

Pour conclure, l'arrivée prochaine de Java 7 (quelqu'un a une date au fait ? mi 2009 ?) va ouvrir les portes, grâce à cette gestion de la parallélisation, à des améliorations sensibles des performances dans un certain nombre de programmes. Les gains obtenus peuvent être extremement élevés sur des serveurs dotés de beaucoup de core (Des tests menés par Doug Lea montrent des gains allant de x15 à x28 sur un serveur doté de 30 processeurs). La bonne nouvelle aussi derrière cela est qu'il n'est pas exclu que ces modifications soient, au fur et à mesure, remontées certaines parties existantes de Java (java.util.Arrays, ...), offrant "gratuitement" aux utilisateurs des gains sensibles. Enfin, il existe des approches, plus complexes, de gestion de la parallèlisation (algorithmes parallèles à grain adaptatif) qui permettent encore d'améliorer ce concept et qui finiront peut être implémentées dans des librairies tierces.

http://jcp.org/en/jsr/detail?id=166

http://gee.cs.oswego.edu/dl/concurrent/dist/docs/index.html