====== Futurs a Java ======
{{tag> #FpInfor #Dam #DamMp09 #DamMp09Uf2 #DamMp09Uf02}}
----
==== Exemples ====
{{ ::futurs_java_exemple.zip |}}
==== Futurs ====
La clase ‘Futur’ de Java, representa el resultat futur d’un càlcul asíncron.
És a dir, un resultat o unes dades que obtindrem en un futur i cal esperar sense aturar el funcionament normal del programa.
És habitual fer servir futurs en càlculs molt complexes o quan s’esperen dades d’un servidor que pot trigar en respondre.
En realitat es fan amb fils paral·lels (threads), però la implementació ens allibera de definir-los un a un
==== Futurs, un sol Futur amb “newSingleThreadExecutor” ====
**ExecutorService**: crea un thread automàticament per executar la tasca que ha de retornar un futur
**.submit()**: permet fer el càlcul en paral·lel i retornar el Futur quan ha acabat
**.shutdown()**: atura el thread paral·lel
En aquest exemple es força una espera d’1 segon per simular una operació lenta
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class NumeroQuadratCalcul {
private ExecutorService executor =
Executors.newSingleThreadExecutor();
public Future calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
public void shutdown () { executor.shutdown(); }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class NumeroQuadratMain {
public static void main(String args[]) {
NumeroQuadratCalcul obj = new NumeroQuadratCalcul();
Future future = obj.calculate(7);
while (!future.isDone()) {
System.out.println("Calculant...");
try {
Thread.sleep(300);
} catch (InterruptedException e) { e.printStackTrace(); }
}
try {
Integer result = future.get();
System.out.println("Resultat: " + result);
obj.shutdown();
} catch (InterruptedException e) { e.printStackTrace();
} catch (ExecutionException e) { e.printStackTrace(); }
}
}
Quan demanem el càlcul (lent), rebem un futur enlloc de la dada
Hem d’esperar a que el Futur estigui disponible “**.isDone()**”
Quan ja està llest, afagem la dada resultant amb “**.get()**”
==== Futurs, aturar un futur amb .cancel() ====
A vegades esperar un futur triga més del què podem acceptar, per aturar-lo podem fer “.cancel()”
En aquest exemple es crea un futur que triga entre 2 i 4 segons a acabar, el programa principal l’atura si triga més de 3.
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class EsperaSegonsCalcul {
private ExecutorService executor = Executors.newSingleThreadExecutor();
public Future calculate(Integer input) {
return executor.submit(() -> {
long millis = (new Random()).nextInt(3000) + 1000;
System.out.println("Trigarà: " + (millis / 1000.0) + " milisegons");
Thread.sleep(millis);
return input * input;
});
}
public void shutdown () { executor.shutdown(); }
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class EsperaSegonsMain {
public static void main(String args[]) {
EsperaSegonsCalcul obj = new EsperaSegonsCalcul();
Future future = obj.calculate(7);
long start = System.currentTimeMillis();
long finish = 0;
while (!future.isDone()) {
finish = System.currentTimeMillis();
double segons = (finish - start) / 1000.0;
System.out.println("Esperant ... " + segons + " segons");
if (segons > 2) {
// Si esperem més de 2 segons, ja no volem el resultat
future.cancel(true);
}
try {
Thread.sleep(300);
} catch (InterruptedException e) { e.printStackTrace(); }
}
try {
if (future.isCancelled()) {
System.out.println("El resultat ha trigat massa");
} else {
Integer result = future.get();
System.out.println("Resultat: " + result);
}
obj.shutdown();
} catch (InterruptedException e) { e.printStackTrace();
} catch (ExecutionException e) { e.printStackTrace(); }
}
}
==== Futurs, multiples futurs amb ‘newFixedThreadPool’ ====
Per executar múltiples futurs alhora, cal iniciar “executor” amb “newfixedThreadPool(X)” on X representa el número de fils que funcionen en paral·lel
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MultiplesFilsCalcul {
private ExecutorService executor = Executors.newFixedThreadPool(4);
public Future calculate(Integer input) {
return executor.submit(() -> {
long millis = (new Random()).nextInt(4000) + 500;
System.out.println("Calcular " + input + " trigarà " + (millis / 1000.0) + " milisegons");
Thread.sleep(millis);
return input * input;
});
}
public void shutdown () { executor.shutdown(); }
}
I podrem crear un “Futur” per cada un dels fils que tenim disponibles. Cada un d’aquests futurs pot trigar diferent temps a executar-se.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class MultiplesFilsMain {
public static void main(String args[]) {
MultiplesFilsCalcul obj = new MultiplesFilsCalcul();
Future f0 = obj.calculate(1);
Future f1 = obj.calculate(2);
Future f2 = obj.calculate(4);
Future f3 = obj.calculate(8);
String txt = "";
while (!f0.isDone() || !f1.isDone() || !f2.isDone() || !f3.isDone()) {
// Esperar a que estiguin llestos
txt = String.format(
"Estats: f0 %s, f1 %s, f2 %s, f3 %s",
f0.isDone() ? "llest" : "calculant",
f1.isDone() ? "llest" : "calculant",
f2.isDone() ? "llest" : "calculant",
f3.isDone() ? "llest" : "calculant"
);
System.out.println(txt);
try {
Thread.sleep(300);
} catch (InterruptedException e) { e.printStackTrace(); }
}
try {
txt = String.format(
"Resultat: f0=%s, f1=%s, f2=%s, f3=%s",
f0.get(), f1.get(), f2.get(), f3.get()
);
System.out.println(txt);
obj.shutdown();
} catch (InterruptedException e) { e.printStackTrace();
} catch (ExecutionException e) { e.printStackTrace(); }
}
}
==== Futurs, Completable Future ====
**CompletableFuture** permet definir els processos sense necessitat de ‘**executors**’, bloquejant accions futures fins que han acabat els processos o fins i tot encadenant futurs. Es declaren amb:
**.runAsync** quan no tenen valor de retorn
**.supplyAsync** quan tenen valor de retorn
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class CompletableMain {
public static void main(String args[]) {
// Declarar els processos asíncrons al codi directament
CompletableFuture futureA = CompletableFuture.runAsync(getRunnable("A"));
CompletableFuture futureB = CompletableFuture.runAsync(getRunnable("B"));
// Posar els 'CompletableFuture' en una llista per esperar-los tots alhora
// tots han de retornar el mateix tipus (en aquest cas String)
System.out.println("Esperant futurs");
List> futureList = new ArrayList<>();
futureList.add(futureA);
futureList.add(futureB);
futureList.forEach(CompletableFuture::join);
System.out.println("Futurs llestos");
}
static Runnable getRunnable (String type) {
return new Runnable () {
@Override
public void run () {
try {
long millis = (new Random()).nextInt(4000) + 500;
System.out.println("futur" + type + " trigarà " + (millis / 1000.0) + " milisegons");
int cnt = 10;
while (cnt > 0) {
Thread.sleep(millis / 10);
System.out.println(" - " + type + " " + cnt);
cnt = cnt - 1;
}
System.out.println("future" + type + " acabat");
} catch (InterruptedException e) { e.printStackTrace(); }
}
};
}
}
A més, permet fer una llista d’objectes tipus ‘CompletableFuture’ i esperar a que tots han acabat l’execució per continuar.
Tots han de retornar el mateix tipus, en aquest cas ‘String’
System.out.println("Esperant resultats");
List> futureList
= new ArrayList<>();
futureList.add(futureA);
futureList.add(futureB);
System.out.println("Resultats llestos");
==== Futurs, CompletableFuture amb retorn i sense retorn ====
Quan un Futur gestionat per CompletableFuture no té retorn, fem servir ‘.**runAsync**’ i li passem una funció que retorni un objecte **Runnable** sobreescrivint la runció ‘**run**’ que executa el càlcul
CompletableFuture f0 = CompletableFuture.runAsync(getRunnable("ABC"));
static Runnable getRunnable (String cadena) {
return new Runnable () {
@Override
public void run() {
...
En canvi quan ha de retornar valors, fem servir ‘.**supplyAsync**’ i li passem una funció que retorni un objecte **Supplier** sobreescrivint la funció ‘**get**’ que executa el càlcul
CompletableFuture f0 = CompletableFuture.supplyAsync(getSupplier("ABC"));
static Supplier getSupplier (String cadena) {
return new Supplier () {
@Override
public String get() {
...
==== Futurs, CompletableFuture amb expressions lambda ====
CompletableFuture permet definir els processos paral·lels directament al codi, a través del què s'anomena expressions ‘lambda’ que s’escriuen amb () -> { }
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableLambdaMain {
public static void main(String args[]) {
// Declarar els processos asíncrons al codi directament
CompletableFuture futureA = CompletableFuture.supplyAsync(() -> {
try {
long millis = (new Random()).nextInt(4000) + 500;
System.out.println("futurA trigarà " + (millis / 1000.0) + " milisegons");
int cnt = 10;
while (cnt > 0) {
Thread.sleep(millis / 10);
System.out.println(" - A " + cnt);
cnt = cnt - 1;
}
} catch (InterruptedException e) { e.printStackTrace(); }
return "Done A";
});
CompletableFuture futureB = CompletableFuture.supplyAsync(() -> {
try {
long millis = (new Random()).nextInt(4000) + 500;
System.out.println("futurB trigarà " + (millis / 1000.0) + " milisegons");
int cnt = 10;
while (cnt > 0) {
Thread.sleep(millis / 10);
System.out.println(" - B " + cnt);
cnt = cnt - 1;
}
} catch (InterruptedException e) { e.printStackTrace(); }
return "Done B";
});
// Posar els 'CompletableFuture' en una llista per esperar-los tots alhora
System.out.println("Esperant resultats");
List> futureList = new ArrayList<>();
futureList.add(futureA);
futureList.add(futureB);
futureList.forEach(CompletableFuture::join);
System.out.println("Resultats llestos");
// Mostrar resultats
try {
String resultA = futureA.get();
String resultB = futureB.get();
System.out.println("Resultat A = " + resultA + ", Resultat B = " + resultB);
} catch (InterruptedException e) { e.printStackTrace();
} catch (ExecutionException e) { e.printStackTrace(); }
}
}
==== CompletableFuture i dades compartides ====
Cada futur en realitat és un thread que es pot executar en paral·lel a altres futurs.
Si aquests futurs accedeixen a les mateixes dades, s’ha de tenir algun mètode de bloqueig, per tal que els altres threads no modifiquin les dades simultàniament i per tant aquestes siguin errònies.
Aquest bloqueig es pot aconseguir amb "ReentrantLock", tal i com mostra el següent exemple.
En aquest codi es veu com les dades sense bloqueig només poden comptar fins a 10 perquè els fils es trepitgen entre ells, però les que implementen el bloqueig aconsegueixen comptar fins a 50.
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
public class FutMain {
// Vector de dades
private static int dadesSns = 0;
private static int dadesBqg = 0;
// Bloquejador de recursos
private static ReentrantLock mutex = new ReentrantLock();
// Main
public static void main(String[] args) {
// Creacio de futurs
ArrayList> futureList = new ArrayList<>();
for (int cnt = 0; cnt < 5; cnt++){
futureList.add(CompletableFuture.runAsync((getRunnable(cnt))));
}
// Execucio dels futurs
System.out.println("Esperant resultats");
futureList.forEach(CompletableFuture::join);
// Mostrar resultats
System.out.println("Resultats:");
System.out.println("Dades sense bloqueig: " + dadesSns);
System.out.println("Dades amb bloqueig: " + dadesBqg);
}
//Codi executat pels futurs
static Runnable getRunnable (int pos) {
return new Runnable () {
@Override public void run () {
// Modificar les dades sense bloqueig
for (int cnt = 0; cnt < 10; cnt++) {
int val = dadesSns;
try {Thread.sleep(10);}
catch (InterruptedException e) {e.printStackTrace();}
dadesSns = val + 1;
}
try {
// Bloquejem els recursos per assegurar la seva exclusivitat
mutex.lock();
// Modificar les dades amb bloqueig
for (int cnt = 0; cnt < 10; cnt++) {
int val = dadesBqg;
try {Thread.sleep(10);}
catch (InterruptedException e) {e.printStackTrace();}
dadesBqg = val + 1;
}
} finally {
mutex.unlock();
}
}
};
}
}