====== Futurs a Java ====== {{tag> #FpInfor #Dam #DamMp09 #DamMp09Uf2 #DamMp09Uf02}} ---- ==== Exemples ==== {{ ::futurs_java_exemple.zip |}} ==== Futurs ==== La clase ‘Futur’ de Java, representa el resultat futur d’un càlcul asíncron. És a dir, un resultat o unes dades que obtindrem en un futur i cal esperar sense aturar el funcionament normal del programa. És habitual fer servir futurs en càlculs molt complexes o quan s’esperen dades d’un servidor que pot trigar en respondre. En realitat es fan amb fils paral·lels (threads), però la implementació ens allibera de definir-los un a un ==== Futurs, un sol Futur amb “newSingleThreadExecutor” ==== **ExecutorService**: crea un thread automàticament per executar la tasca que ha de retornar un futur **.submit()**: permet fer el càlcul en paral·lel i retornar el Futur quan ha acabat **.shutdown()**: atura el thread paral·lel En aquest exemple es força una espera d’1 segon per simular una operació lenta import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class NumeroQuadratCalcul { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future calculate(Integer input) { return executor.submit(() -> { Thread.sleep(1000); return input * input; }); } public void shutdown () { executor.shutdown(); } } import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class NumeroQuadratMain { public static void main(String args[]) { NumeroQuadratCalcul obj = new NumeroQuadratCalcul(); Future future = obj.calculate(7); while (!future.isDone()) { System.out.println("Calculant..."); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } try { Integer result = future.get(); System.out.println("Resultat: " + result); obj.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } Quan demanem el càlcul (lent), rebem un futur enlloc de la dada Hem d’esperar a que el Futur estigui disponible “**.isDone()**” Quan ja està llest, afagem la dada resultant amb “**.get()**” ==== Futurs, aturar un futur amb .cancel() ==== A vegades esperar un futur triga més del què podem acceptar, per aturar-lo podem fer “.cancel()” En aquest exemple es crea un futur que triga entre 2 i 4 segons a acabar, el programa principal l’atura si triga més de 3. import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class EsperaSegonsCalcul { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future calculate(Integer input) { return executor.submit(() -> { long millis = (new Random()).nextInt(3000) + 1000; System.out.println("Trigarà: " + (millis / 1000.0) + " milisegons"); Thread.sleep(millis); return input * input; }); } public void shutdown () { executor.shutdown(); } } import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class EsperaSegonsMain { public static void main(String args[]) { EsperaSegonsCalcul obj = new EsperaSegonsCalcul(); Future future = obj.calculate(7); long start = System.currentTimeMillis(); long finish = 0; while (!future.isDone()) { finish = System.currentTimeMillis(); double segons = (finish - start) / 1000.0; System.out.println("Esperant ... " + segons + " segons"); if (segons > 2) { // Si esperem més de 2 segons, ja no volem el resultat future.cancel(true); } try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } try { if (future.isCancelled()) { System.out.println("El resultat ha trigat massa"); } else { Integer result = future.get(); System.out.println("Resultat: " + result); } obj.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } ==== Futurs, multiples futurs amb ‘newFixedThreadPool’ ==== Per executar múltiples futurs alhora, cal iniciar “executor” amb “newfixedThreadPool(X)” on X representa el número de fils que funcionen en paral·lel import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class MultiplesFilsCalcul { private ExecutorService executor = Executors.newFixedThreadPool(4); public Future calculate(Integer input) { return executor.submit(() -> { long millis = (new Random()).nextInt(4000) + 500; System.out.println("Calcular " + input + " trigarà " + (millis / 1000.0) + " milisegons"); Thread.sleep(millis); return input * input; }); } public void shutdown () { executor.shutdown(); } } I podrem crear un “Futur” per cada un dels fils que tenim disponibles. Cada un d’aquests futurs pot trigar diferent temps a executar-se. import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class MultiplesFilsMain { public static void main(String args[]) { MultiplesFilsCalcul obj = new MultiplesFilsCalcul(); Future f0 = obj.calculate(1); Future f1 = obj.calculate(2); Future f2 = obj.calculate(4); Future f3 = obj.calculate(8); String txt = ""; while (!f0.isDone() || !f1.isDone() || !f2.isDone() || !f3.isDone()) { // Esperar a que estiguin llestos txt = String.format( "Estats: f0 %s, f1 %s, f2 %s, f3 %s", f0.isDone() ? "llest" : "calculant", f1.isDone() ? "llest" : "calculant", f2.isDone() ? "llest" : "calculant", f3.isDone() ? "llest" : "calculant" ); System.out.println(txt); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } try { txt = String.format( "Resultat: f0=%s, f1=%s, f2=%s, f3=%s", f0.get(), f1.get(), f2.get(), f3.get() ); System.out.println(txt); obj.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } ==== Futurs, Completable Future ==== **CompletableFuture** permet definir els processos sense necessitat de ‘**executors**’, bloquejant accions futures fins que han acabat els processos o fins i tot encadenant futurs. Es declaren amb: **.runAsync** quan no tenen valor de retorn **.supplyAsync** quan tenen valor de retorn import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; public class CompletableMain { public static void main(String args[]) { // Declarar els processos asíncrons al codi directament CompletableFuture futureA = CompletableFuture.runAsync(getRunnable("A")); CompletableFuture futureB = CompletableFuture.runAsync(getRunnable("B")); // Posar els 'CompletableFuture' en una llista per esperar-los tots alhora // tots han de retornar el mateix tipus (en aquest cas String) System.out.println("Esperant futurs"); List> futureList = new ArrayList<>(); futureList.add(futureA); futureList.add(futureB); futureList.forEach(CompletableFuture::join); System.out.println("Futurs llestos"); } static Runnable getRunnable (String type) { return new Runnable () { @Override public void run () { try { long millis = (new Random()).nextInt(4000) + 500; System.out.println("futur" + type + " trigarà " + (millis / 1000.0) + " milisegons"); int cnt = 10; while (cnt > 0) { Thread.sleep(millis / 10); System.out.println(" - " + type + " " + cnt); cnt = cnt - 1; } System.out.println("future" + type + " acabat"); } catch (InterruptedException e) { e.printStackTrace(); } } }; } } A més, permet fer una llista d’objectes tipus ‘CompletableFuture’ i esperar a que tots han acabat l’execució per continuar. Tots han de retornar el mateix tipus, en aquest cas ‘String’ System.out.println("Esperant resultats"); List> futureList = new ArrayList<>(); futureList.add(futureA); futureList.add(futureB); System.out.println("Resultats llestos"); ==== Futurs, CompletableFuture amb retorn i sense retorn ==== Quan un Futur gestionat per CompletableFuture no té retorn, fem servir ‘.**runAsync**’ i li passem una funció que retorni un objecte **Runnable** sobreescrivint la runció ‘**run**’ que executa el càlcul CompletableFuture f0 = CompletableFuture.runAsync(getRunnable("ABC")); static Runnable getRunnable (String cadena) { return new Runnable () { @Override public void run() { ... En canvi quan ha de retornar valors, fem servir ‘.**supplyAsync**’ i li passem una funció que retorni un objecte **Supplier** sobreescrivint la funció ‘**get**’ que executa el càlcul CompletableFuture f0 = CompletableFuture.supplyAsync(getSupplier("ABC")); static Supplier getSupplier (String cadena) { return new Supplier () { @Override public String get() { ... ==== Futurs, CompletableFuture amb expressions lambda ==== CompletableFuture permet definir els processos paral·lels directament al codi, a través del què s'anomena expressions ‘lambda’ que s’escriuen amb () -> { } import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class CompletableLambdaMain { public static void main(String args[]) { // Declarar els processos asíncrons al codi directament CompletableFuture futureA = CompletableFuture.supplyAsync(() -> { try { long millis = (new Random()).nextInt(4000) + 500; System.out.println("futurA trigarà " + (millis / 1000.0) + " milisegons"); int cnt = 10; while (cnt > 0) { Thread.sleep(millis / 10); System.out.println(" - A " + cnt); cnt = cnt - 1; } } catch (InterruptedException e) { e.printStackTrace(); } return "Done A"; }); CompletableFuture futureB = CompletableFuture.supplyAsync(() -> { try { long millis = (new Random()).nextInt(4000) + 500; System.out.println("futurB trigarà " + (millis / 1000.0) + " milisegons"); int cnt = 10; while (cnt > 0) { Thread.sleep(millis / 10); System.out.println(" - B " + cnt); cnt = cnt - 1; } } catch (InterruptedException e) { e.printStackTrace(); } return "Done B"; }); // Posar els 'CompletableFuture' en una llista per esperar-los tots alhora System.out.println("Esperant resultats"); List> futureList = new ArrayList<>(); futureList.add(futureA); futureList.add(futureB); futureList.forEach(CompletableFuture::join); System.out.println("Resultats llestos"); // Mostrar resultats try { String resultA = futureA.get(); String resultB = futureB.get(); System.out.println("Resultat A = " + resultA + ", Resultat B = " + resultB); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } ==== CompletableFuture i dades compartides ==== Cada futur en realitat és un thread que es pot executar en paral·lel a altres futurs. Si aquests futurs accedeixen a les mateixes dades, s’ha de tenir algun mètode de bloqueig, per tal que els altres threads no modifiquin les dades simultàniament i per tant aquestes siguin errònies. Aquest bloqueig es pot aconseguir amb "ReentrantLock", tal i com mostra el següent exemple. En aquest codi es veu com les dades sense bloqueig només poden comptar fins a 10 perquè els fils es trepitgen entre ells, però les que implementen el bloqueig aconsegueixen comptar fins a 50. import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantLock; public class FutMain { // Vector de dades private static int dadesSns = 0; private static int dadesBqg = 0; // Bloquejador de recursos private static ReentrantLock mutex = new ReentrantLock(); // Main public static void main(String[] args) { // Creacio de futurs ArrayList> futureList = new ArrayList<>(); for (int cnt = 0; cnt < 5; cnt++){ futureList.add(CompletableFuture.runAsync((getRunnable(cnt)))); } // Execucio dels futurs System.out.println("Esperant resultats"); futureList.forEach(CompletableFuture::join); // Mostrar resultats System.out.println("Resultats:"); System.out.println("Dades sense bloqueig: " + dadesSns); System.out.println("Dades amb bloqueig: " + dadesBqg); } //Codi executat pels futurs static Runnable getRunnable (int pos) { return new Runnable () { @Override public void run () { // Modificar les dades sense bloqueig for (int cnt = 0; cnt < 10; cnt++) { int val = dadesSns; try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} dadesSns = val + 1; } try { // Bloquejem els recursos per assegurar la seva exclusivitat mutex.lock(); // Modificar les dades amb bloqueig for (int cnt = 0; cnt < 10; cnt++) { int val = dadesBqg; try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();} dadesBqg = val + 1; } } finally { mutex.unlock(); } } }; } }