La clase ‘Futur’ de Java, representa el resultat futur d’un càlcul asíncron.
És a dir, un resultat o unes dades que obtindrem en un futur i cal esperar sense aturar el funcionament normal del programa.
És habitual fer servir futurs en càlculs molt complexes o quan s’esperen dades d’un servidor que pot trigar en respondre.
En realitat es fan amb fils paral·lels (threads), però la implementació ens allibera de definir-los un a un
ExecutorService: crea un thread automàticament per executar la tasca que ha de retornar un futur
.submit(): permet fer el càlcul en paral·lel i retornar el Futur quan ha acabat
.shutdown(): atura el thread paral·lel
En aquest exemple es força una espera d’1 segon per simular una operació lenta
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class NumeroQuadratCalcul { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future<Integer> calculate(Integer input) { return executor.submit(() -> { Thread.sleep(1000); return input * input; }); } public void shutdown () { executor.shutdown(); } }
import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class NumeroQuadratMain { public static void main(String args[]) { NumeroQuadratCalcul obj = new NumeroQuadratCalcul(); Future<Integer> future = obj.calculate(7); while (!future.isDone()) { System.out.println("Calculant..."); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } try { Integer result = future.get(); System.out.println("Resultat: " + result); obj.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Quan demanem el càlcul (lent), rebem un futur enlloc de la dada
Hem d’esperar a que el Futur estigui disponible “.isDone()”
Quan ja està llest, afagem la dada resultant amb “.get()”
A vegades esperar un futur triga més del què podem acceptar, per aturar-lo podem fer “.cancel()”
En aquest exemple es crea un futur que triga entre 2 i 4 segons a acabar, el programa principal l’atura si triga més de 3.
import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class EsperaSegonsCalcul { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future<Integer> calculate(Integer input) { return executor.submit(() -> { long millis = (new Random()).nextInt(3000) + 1000; System.out.println("Trigarà: " + (millis / 1000.0) + " milisegons"); Thread.sleep(millis); return input * input; }); } public void shutdown () { executor.shutdown(); } }
import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class EsperaSegonsMain { public static void main(String args[]) { EsperaSegonsCalcul obj = new EsperaSegonsCalcul(); Future<Integer> future = obj.calculate(7); long start = System.currentTimeMillis(); long finish = 0; while (!future.isDone()) { finish = System.currentTimeMillis(); double segons = (finish - start) / 1000.0; System.out.println("Esperant ... " + segons + " segons"); if (segons > 2) { // Si esperem més de 2 segons, ja no volem el resultat future.cancel(true); } try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } try { if (future.isCancelled()) { System.out.println("El resultat ha trigat massa"); } else { Integer result = future.get(); System.out.println("Resultat: " + result); } obj.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Per executar múltiples futurs alhora, cal iniciar “executor” amb “newfixedThreadPool(X)” on X representa el número de fils que funcionen en paral·lel
import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class MultiplesFilsCalcul { private ExecutorService executor = Executors.newFixedThreadPool(4); public Future<Integer> calculate(Integer input) { return executor.submit(() -> { long millis = (new Random()).nextInt(4000) + 500; System.out.println("Calcular " + input + " trigarà " + (millis / 1000.0) + " milisegons"); Thread.sleep(millis); return input * input; }); } public void shutdown () { executor.shutdown(); } }
I podrem crear un “Futur” per cada un dels fils que tenim disponibles. Cada un d’aquests futurs pot trigar diferent temps a executar-se.
import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class MultiplesFilsMain { public static void main(String args[]) { MultiplesFilsCalcul obj = new MultiplesFilsCalcul(); Future<Integer> f0 = obj.calculate(1); Future<Integer> f1 = obj.calculate(2); Future<Integer> f2 = obj.calculate(4); Future<Integer> f3 = obj.calculate(8); String txt = ""; while (!f0.isDone() || !f1.isDone() || !f2.isDone() || !f3.isDone()) { // Esperar a que estiguin llestos txt = String.format( "Estats: f0 %s, f1 %s, f2 %s, f3 %s", f0.isDone() ? "llest" : "calculant", f1.isDone() ? "llest" : "calculant", f2.isDone() ? "llest" : "calculant", f3.isDone() ? "llest" : "calculant" ); System.out.println(txt); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } try { txt = String.format( "Resultat: f0=%s, f1=%s, f2=%s, f3=%s", f0.get(), f1.get(), f2.get(), f3.get() ); System.out.println(txt); obj.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
CompletableFuture permet definir els processos sense necessitat de ‘executors’, bloquejant accions futures fins que han acabat els processos o fins i tot encadenant futurs. Es declaren amb:
.runAsync quan no tenen valor de retorn
.supplyAsync quan tenen valor de retorn
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; public class CompletableMain { public static void main(String args[]) { // Declarar els processos asíncrons al codi directament CompletableFuture<Void> futureA = CompletableFuture.runAsync(getRunnable("A")); CompletableFuture<Void> futureB = CompletableFuture.runAsync(getRunnable("B")); // Posar els 'CompletableFuture' en una llista per esperar-los tots alhora // tots han de retornar el mateix tipus (en aquest cas String) System.out.println("Esperant futurs"); List<CompletableFuture<Void>> futureList = new ArrayList<>(); futureList.add(futureA); futureList.add(futureB); futureList.forEach(CompletableFuture::join); System.out.println("Futurs llestos"); } static Runnable getRunnable (String type) { return new Runnable () { @Override public void run () { try { long millis = (new Random()).nextInt(4000) + 500; System.out.println("futur" + type + " trigarà " + (millis / 1000.0) + " milisegons"); int cnt = 10; while (cnt > 0) { Thread.sleep(millis / 10); System.out.println(" - " + type + " " + cnt); cnt = cnt - 1; } System.out.println("future" + type + " acabat"); } catch (InterruptedException e) { e.printStackTrace(); } } }; } }
A més, permet fer una llista d’objectes tipus ‘CompletableFuture’ i esperar a que tots han acabat l’execució per continuar.
Tots han de retornar el mateix tipus, en aquest cas ‘String’
System.out.println("Esperant resultats"); List<CompletableFuture<String>> futureList = new ArrayList<>(); futureList.add(futureA); futureList.add(futureB); System.out.println("Resultats llestos");
Quan un Futur gestionat per CompletableFuture no té retorn, fem servir ‘.runAsync’ i li passem una funció que retorni un objecte Runnable sobreescrivint la runció ‘run’ que executa el càlcul
CompletableFuture<Void> f0 = CompletableFuture.runAsync(getRunnable("ABC"));
static Runnable getRunnable (String cadena) { return new Runnable () { @Override public void run() { ...
En canvi quan ha de retornar valors, fem servir ‘.supplyAsync’ i li passem una funció que retorni un objecte Supplier<tipus> sobreescrivint la funció ‘get’ que executa el càlcul
CompletableFuture<String> f0 = CompletableFuture.supplyAsync(getSupplier("ABC"));
static Supplier<String> getSupplier (String cadena) { return new Supplier<String> () { @Override public String get() { ...
CompletableFuture permet definir els processos paral·lels directament al codi, a través del què s'anomena expressions ‘lambda’ que s’escriuen amb () → { }
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableLambdaMain { public static void main(String args[]) { // Declarar els processos asíncrons al codi directament CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> { try { long millis = (new Random()).nextInt(4000) + 500; System.out.println("futurA trigarà " + (millis / 1000.0) + " milisegons"); int cnt = 10; while (cnt > 0) { Thread.sleep(millis / 10); System.out.println(" - A " + cnt); cnt = cnt - 1; } } catch (InterruptedException e) { e.printStackTrace(); } return "Done A"; }); CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> { try { long millis = (new Random()).nextInt(4000) + 500; System.out.println("futurB trigarà " + (millis / 1000.0) + " milisegons"); int cnt = 10; while (cnt > 0) { Thread.sleep(millis / 10); System.out.println(" - B " + cnt); cnt = cnt - 1; } } catch (InterruptedException e) { e.printStackTrace(); } return "Done B"; }); // Posar els 'CompletableFuture' en una llista per esperar-los tots alhora System.out.println("Esperant resultats"); List<CompletableFuture<String>> futureList = new ArrayList<>(); futureList.add(futureA); futureList.add(futureB); futureList.forEach(CompletableFuture::join); System.out.println("Resultats llestos"); // Mostrar resultats try { String resultA = futureA.get(); String resultB = futureB.get(); System.out.println("Resultat A = " + resultA + ", Resultat B = " + resultB); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Cada futur en realitat és un thread que es pot executar en paral·lel a altres futurs.
Si aquests futurs accedeixen a les mateixes dades, s’ha de tenir algun mètode de bloqueig, per tal que els altres threads no modifiquin les dades simultàniament i per tant aquestes siguin errònies.
Aquest bloqueig es pot aconseguir amb «ReentrantLock», tal i com mostra el següent exemple.
En aquest codi es veu com les dades sense bloqueig només poden comptar fins a 10 perquè els fils es trepitgen entre ells, però les que implementen el bloqueig aconsegueixen comptar fins a 50.
import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantLock; public class FutMain { // Vector de dades private static int dadesSns = 0; private static int dadesBqg = 0; // Bloquejador de recursos private static ReentrantLock mutex = new ReentrantLock(); // Main public static void main(String[] args) { // Creacio de futurs ArrayList<CompletableFuture<Void>> futureList = new ArrayList<>(); for (int cnt = 0; cnt < 5; cnt++){ futureList.add(CompletableFuture.runAsync((getRunnable(cnt)))); } // Execucio dels futurs System.out.println("Esperant resultats"); futureList.forEach(CompletableFuture::join); // Mostrar resultats System.out.println("Resultats:"); System.out.println("Dades sense bloqueig: " + dadesSns); System.out.println("Dades amb bloqueig: " + dadesBqg); } //Codi executat pels futurs static Runnable getRunnable (int pos) { return new Runnable () { @Override public void run () { // Modificar les dades sense bloqueig for (int cnt = 0; cnt < 10; cnt++) { int val = dadesSns; try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} dadesSns = val + 1; } try { // Bloquejem els recursos per assegurar la seva exclusivitat mutex.lock(); // Modificar les dades amb bloqueig for (int cnt = 0; cnt < 10; cnt++) { int val = dadesBqg; try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} dadesBqg = val + 1; } } finally { mutex.unlock(); } } }; } }