Taula de continguts

Futurs a Java

, , , ,

Exemples

futurs_java_exemple.zip

Futurs

La clase ‘Futur’ de Java, representa el resultat futur d’un càlcul asíncron.

És a dir, un resultat o unes dades que obtindrem en un futur i cal esperar sense aturar el funcionament normal del programa.

És habitual fer servir futurs en càlculs molt complexes o quan s’esperen dades d’un servidor que pot trigar en respondre.

En realitat es fan amb fils paral·lels (threads), però la implementació ens allibera de definir-los un a un

Futurs, un sol Futur amb “newSingleThreadExecutor”

ExecutorService: crea un thread automàticament per executar la tasca que ha de retornar un futur

.submit(): permet fer el càlcul en paral·lel i retornar el Futur quan ha acabat

.shutdown(): atura el thread paral·lel

En aquest exemple es força una espera d’1 segon per simular una operació lenta

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class NumeroQuadratCalcul {
 
   private ExecutorService executor =        
           Executors.newSingleThreadExecutor();
 
    public Future<Integer> calculate(Integer input) {
 
       return executor.submit(() -> {
           Thread.sleep(1000);
           return input * input;
       });
   }
   public void shutdown () { executor.shutdown(); }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
 
public class NumeroQuadratMain {
    public static void main(String args[]) {
        NumeroQuadratCalcul obj = new NumeroQuadratCalcul();
        Future<Integer> future = obj.calculate(7);
 
        while (!future.isDone()) {
            System.out.println("Calculant...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) { e.printStackTrace(); }
        }
 
        try {
            Integer result = future.get();
            System.out.println("Resultat: " + result);
            obj.shutdown();
        } catch (InterruptedException e) { e.printStackTrace();
        } catch (ExecutionException e) { e.printStackTrace(); }
    }
}

Quan demanem el càlcul (lent), rebem un futur enlloc de la dada

Hem d’esperar a que el Futur estigui disponible “.isDone()

Quan ja està llest, afagem la dada resultant amb “.get()

Futurs, aturar un futur amb .cancel()

A vegades esperar un futur triga més del què podem acceptar, per aturar-lo podem fer “.cancel()”

En aquest exemple es crea un futur que triga entre 2 i 4 segons a acabar, el programa principal l’atura si triga més de 3.

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class EsperaSegonsCalcul {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
 
    public Future<Integer> calculate(Integer input) {        
        return executor.submit(() -> {
            long millis = (new Random()).nextInt(3000) + 1000;
            System.out.println("Trigarà: " + (millis / 1000.0) + " milisegons");
            Thread.sleep(millis);
            return input * input;
        });
    }
 
    public void shutdown () { executor.shutdown(); }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
 
public class EsperaSegonsMain {
    public static void main(String args[]) {
        EsperaSegonsCalcul obj = new EsperaSegonsCalcul();
        Future<Integer> future = obj.calculate(7);
        long start = System.currentTimeMillis();
        long finish = 0;
 
        while (!future.isDone()) {
            finish = System.currentTimeMillis();
            double segons = (finish - start) / 1000.0;
            System.out.println("Esperant ... " + segons + " segons");
            if (segons > 2) {
                // Si esperem més de 2 segons, ja no volem el resultat
                future.cancel(true);
            }
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) { e.printStackTrace(); }
        }
 
        try {
            if (future.isCancelled()) {
                System.out.println("El resultat ha trigat massa");
            } else {
                Integer result = future.get();
                System.out.println("Resultat: " + result);
            }
            obj.shutdown();
        } catch (InterruptedException e) { e.printStackTrace();
        } catch (ExecutionException e) { e.printStackTrace(); }
    }  
}

Futurs, multiples futurs amb ‘newFixedThreadPool’

Per executar múltiples futurs alhora, cal iniciar “executor” amb “newfixedThreadPool(X)” on X representa el número de fils que funcionen en paral·lel

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class MultiplesFilsCalcul {
    private ExecutorService executor = Executors.newFixedThreadPool(4);
 
    public Future<Integer> calculate(Integer input) {        
        return executor.submit(() -> {
            long millis = (new Random()).nextInt(4000) + 500;
            System.out.println("Calcular " + input + " trigarà " + (millis / 1000.0) + " milisegons");
            Thread.sleep(millis);
            return input * input;
        });
    }
    public void shutdown () { executor.shutdown(); }
}

I podrem crear un “Futur” per cada un dels fils que tenim disponibles. Cada un d’aquests futurs pot trigar diferent temps a executar-se.

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
 
public class MultiplesFilsMain {
 
    public static void main(String args[]) {
        MultiplesFilsCalcul obj = new MultiplesFilsCalcul();
        Future<Integer> f0 = obj.calculate(1);
        Future<Integer> f1 = obj.calculate(2);
        Future<Integer> f2 = obj.calculate(4);
        Future<Integer> f3 = obj.calculate(8);
        String txt = "";
 
        while (!f0.isDone() || !f1.isDone() || !f2.isDone() || !f3.isDone()) {
            // Esperar a que estiguin llestos
            txt = String.format(
                "Estats: f0 %s, f1 %s, f2 %s, f3 %s", 
                f0.isDone() ? "llest" : "calculant", 
                f1.isDone() ? "llest" : "calculant",
                f2.isDone() ? "llest" : "calculant",
                f3.isDone() ? "llest" : "calculant"
            );
            System.out.println(txt);
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) { e.printStackTrace(); }
        }    
 
        try {
            txt = String.format(
                "Resultat: f0=%s, f1=%s, f2=%s, f3=%s", 
                f0.get(), f1.get(), f2.get(), f3.get()
            );
            System.out.println(txt);
            obj.shutdown();
        } catch (InterruptedException e) { e.printStackTrace();
        } catch (ExecutionException e) { e.printStackTrace(); }
    }
}

Futurs, Completable Future

CompletableFuture permet definir els processos sense necessitat de ‘executors’, bloquejant accions futures fins que han acabat els processos o fins i tot encadenant futurs. Es declaren amb:

.runAsync quan no tenen valor de retorn

.supplyAsync quan tenen valor de retorn

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
 
public class CompletableMain {
    public static void main(String args[]) {
 
        // Declarar els processos asíncrons al codi directament
        CompletableFuture<Void> futureA = CompletableFuture.runAsync(getRunnable("A"));
        CompletableFuture<Void> futureB = CompletableFuture.runAsync(getRunnable("B"));
 
        // Posar els 'CompletableFuture' en una llista per esperar-los tots alhora
        // tots han de retornar el mateix tipus (en aquest cas String)
        System.out.println("Esperant futurs");
        List<CompletableFuture<Void>> futureList = new ArrayList<>();
        futureList.add(futureA);
        futureList.add(futureB);
        futureList.forEach(CompletableFuture::join); 
        System.out.println("Futurs llestos");
    }
 
    static Runnable getRunnable (String type) {
        return new Runnable () {
            @Override
            public void run () {
                try {
                    long millis = (new Random()).nextInt(4000) + 500;
                    System.out.println("futur" + type + " trigarà " + (millis / 1000.0) + " milisegons");
                    int cnt = 10;
                    while (cnt > 0) {
                        Thread.sleep(millis / 10);
                        System.out.println(" - " + type + " " + cnt);
                        cnt = cnt - 1;
                    }
                    System.out.println("future" + type + " acabat");
 
                } catch (InterruptedException e) { e.printStackTrace(); }
            }
        };
    }
}

A més, permet fer una llista d’objectes tipus ‘CompletableFuture’ i esperar a que tots han acabat l’execució per continuar.

Tots han de retornar el mateix tipus, en aquest cas ‘String’

System.out.println("Esperant resultats");
List<CompletableFuture<String>> futureList 
                                       = new ArrayList<>();
futureList.add(futureA);
futureList.add(futureB);
System.out.println("Resultats llestos");

Futurs, CompletableFuture amb retorn i sense retorn

Quan un Futur gestionat per CompletableFuture no té retorn, fem servir ‘.runAsync’ i li passem una funció que retorni un objecte Runnable sobreescrivint la runció ‘run’ que executa el càlcul

CompletableFuture<Void> f0 = CompletableFuture.runAsync(getRunnable("ABC"));
static Runnable getRunnable (String cadena) {
       return new Runnable () {
           @Override
           public void run() {
...

En canvi quan ha de retornar valors, fem servir ‘.supplyAsync’ i li passem una funció que retorni un objecte Supplier<tipus> sobreescrivint la funció ‘get’ que executa el càlcul

CompletableFuture<String> f0 = CompletableFuture.supplyAsync(getSupplier("ABC"));
static Supplier<String> getSupplier (String cadena) {
       return new Supplier<String> () {
           @Override
           public String get() {
 
...

Futurs, CompletableFuture amb expressions lambda

CompletableFuture permet definir els processos paral·lels directament al codi, a través del què s'anomena expressions ‘lambda’ que s’escriuen amb () → { }

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
 
public class CompletableLambdaMain {
    public static void main(String args[]) {
 
        // Declarar els processos asíncrons al codi directament
        CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
            try {
                long millis = (new Random()).nextInt(4000) + 500;
                System.out.println("futurA trigarà " + (millis / 1000.0) + " milisegons");
                int cnt = 10;
                while (cnt > 0) {
                    Thread.sleep(millis / 10);
                    System.out.println(" - A " + cnt);
                    cnt = cnt - 1;
                }                    
            } catch (InterruptedException e) { e.printStackTrace(); }
            return "Done A";
        });
 
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> {
            try {
                long millis = (new Random()).nextInt(4000) + 500;
                System.out.println("futurB trigarà " + (millis / 1000.0) + " milisegons");
                int cnt = 10;
                while (cnt > 0) {
                    Thread.sleep(millis / 10);
                    System.out.println(" - B " + cnt);
                    cnt = cnt - 1;
                }                    
            } catch (InterruptedException e) { e.printStackTrace(); }
            return "Done B";
        });
 
        // Posar els 'CompletableFuture' en una llista per esperar-los tots alhora
        System.out.println("Esperant resultats");
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        futureList.add(futureA);
        futureList.add(futureB);
        futureList.forEach(CompletableFuture::join); 
        System.out.println("Resultats llestos");
 
        // Mostrar resultats
        try {
            String resultA = futureA.get(); 
            String resultB = futureB.get(); 
            System.out.println("Resultat A = " + resultA + ", Resultat B = " + resultB);
        } catch (InterruptedException e) { e.printStackTrace();
        } catch (ExecutionException e) { e.printStackTrace(); }
    }
}

CompletableFuture i dades compartides

Cada futur en realitat és un thread que es pot executar en paral·lel a altres futurs.

Si aquests futurs accedeixen a les mateixes dades, s’ha de tenir algun mètode de bloqueig, per tal que els altres threads no modifiquin les dades simultàniament i per tant aquestes siguin errònies.

Aquest bloqueig es pot aconseguir amb «ReentrantLock», tal i com mostra el següent exemple.

En aquest codi es veu com les dades sense bloqueig només poden comptar fins a 10 perquè els fils es trepitgen entre ells, però les que implementen el bloqueig aconsegueixen comptar fins a 50.

import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
 
public class FutMain {
 
    // Vector de dades
    private static int dadesSns = 0;
    private static int dadesBqg = 0;
 
    // Bloquejador de recursos
    private static ReentrantLock mutex = new ReentrantLock();
 
    // Main
    public static void main(String[] args) {
 
        // Creacio de futurs
        ArrayList<CompletableFuture<Void>> futureList = new ArrayList<>();
        for (int cnt = 0; cnt < 5; cnt++){
            futureList.add(CompletableFuture.runAsync((getRunnable(cnt))));
        }
 
        // Execucio dels futurs
        System.out.println("Esperant resultats");
        futureList.forEach(CompletableFuture::join); 
 
        // Mostrar resultats
        System.out.println("Resultats:");
        System.out.println("Dades sense bloqueig: " + dadesSns);
        System.out.println("Dades amb bloqueig: " + dadesBqg);
    }
 
    //Codi executat pels futurs
    static Runnable getRunnable (int pos) {
        return new Runnable () {
            @Override public void run () {
 
                // Modificar les dades sense bloqueig
                for (int cnt = 0; cnt < 10; cnt++) {
                    int val = dadesSns;
                    try {Thread.sleep(10);} 
                    catch (InterruptedException e) {e.printStackTrace();}
                    dadesSns = val + 1;
                }
 
                try {
                    // Bloquejem els recursos per assegurar la seva exclusivitat
                    mutex.lock();
 
                    // Modificar les dades amb bloqueig 
                    for (int cnt = 0; cnt < 10; cnt++) {
                        int val = dadesBqg;
                        try {Thread.sleep(10);} 
                        catch (InterruptedException e) {e.printStackTrace();}
                        dadesBqg = val + 1;
                    }
 
                } finally {
                    mutex.unlock();
                }
            }
        };
    }
}