==================== Reactive Programming ==================== .. toctree:: :maxdepth: 1 .. contents:: Librerie :local: :depth: 1 .. _introduction-reactive-programming: Introduzione ============ I concetti sui quali la programmazione reattiva si basa sono: - incentrata sul concetto di flussi di dati e la propagazione del cambiamento. - Permette di gestire facilmente flussi asincroni di dati ed eventi (asynchronous streams). - Fortemente correlata alla programmazione ad eventi. La programmazione reattiva è un paradigma di programmazione costruito sulla nozione del cambiamento dei valori nel tempo e la propagazione dei cambiamenti. Essa facilita lo sviluppo di applicazioni event-driven permettendo allo sviluppatore di esprimere programmi in termini di che cosa devono fare, consentendo al linguaggio di gestire automaticamente quando farlo. In questo paradigma, i cambiamenti di stato sono efficientemente e automaticamente propagati attraverso la rete delle dipendenze computazionali dal sottostante modello di esecuzione. .. code-block:: Text var1 = 1 var2 = 2 var3 = var1 + var2 Nella programmazione reattiva il valore della variabile **var3** è sempre mantenuto aggiornato cioè, viene automaticamente ricalcolato dopo ogni cambiamento di **var1** o di **var2**. Questa è una nozione chiave della programmazione reattiva. I valori cambiano nel tempo e quando ciò avviene ogni espressione che dipende da questi viene automaticamente ricalcolata. Flussi asincroni di dati e eventi --------------------------------- La programmazione reattiva permette di gestire facilmente flussi asincroni di dati ed eventi. Si possono creare flussi di dati da qualsiasi cosa come, dal tap su un bottone, da variabili, dall’input di un utente, dai feed ecc... Vengono, inoltre, fornite diverse funzioni per combinare, creare o interrogare questi flussi. Esempi di operazioni che posso fare con gli stream: passarlo in input o unirlo ad un altro, ottenerne uno nuovo con solo gli eventi che rispettano un determinato predicato di selezione, rappresentare i suoi elementi in un modo diverso in un altro stream ecc.. Essi rappresentano un altro aspetto chiave della programmazione reattiva. Consideriamo un esempio concreto di flusso di eventi e, in particolare, il click su un bottone che genera un Observable. .. image:: img/reative_pic.png :scale: 60 % Lo stream è una sequenza di eventi ordinati nel tempo che può o non terminare. Anche una lista o una collezione di dati può essere considerata come un stream: un stream finito e deterministico. Esso può emettere tre differenticose: un nuovo valore, un errore o un segnale che indica il fatto che è completo cioè che dallo stream non verranno emessi più valori. Nel nostro esempio il segnale "completo" potrà essere emesso quando la finestra che contiene il bottone del quale stiamo gestendo l’evento click viene chiusa. Questi eventi vengono catturati in modo asincrono definendo delle funzioni che verranno eseguite rispettivamente quando un nuovo valore viene emesso, quando avviene un errore o quando lo stream termina. Viene detto sottoscrittore o osservatore colui che rimane in ascolto di nuovi valori mentre, osservabile lo stream di dati o eventi (pattern Observer). Non è possibile modificare gli stream poichè sono immutabili: ogni operazione, infatti, ne produce uno nuovo. Reagire ad eventi nel classico modo imperativo porta alla scrittura di codice ingombrante, difficile da comprendere e, talvolta, è molto semplice commettere errori poiché la coordinazione fra evento e i cambiamenti ai dati è responsabilità del programmatore il quale deve fare tutto questo manualmente in porzioni di codice separate che possono manipolare gli stessi dati e che potrebbero essere eseguite in qualsiasi ordine nel tempo. Usando le tradizionali soluzioni le applicazioni interattive sono costruite sulla nozione di callbacks asincrone ma la loro gestione non è affatto facile poiché si hanno frammenti di codice isolato del quale non si conosce l’ordine di esecuzione (il flusso di controllo del programma "salta" fra i vari event handlers, non ha un ordine specificato dal programmatore). La programmazione reattiva prevede astrazioni per rappresentare gli eventi e il cambiamento dei valori nel tempo, liberando il programmatore dalla gestione delle dipendenza fra questi. Caratteristiche dei linguaggi di programmazione reattivi -------------------------------------------------------- Passiamo ora ad una breve descrizione delle principali caratteristiche dei linguaggi di programmazione reattivi e, in particolare, verranno analizzate: - le astrazioni base - il modello di valutazione (evaluation model) - le operazioni di sollevamento (lifting operations) - linguaggi multi-direzionali ( multidirectionality) - problemi tecnici come i glitch (glitch avoidance) - il supporto per il distribuito (support for distribution) Astrazioni base --------------- **COMPORTAMENTI (BEHAVIORS)** Comportamento è il termine utilizzato per esprimere il cambiamento dei valori nel tempo. Il comportamento cambia continuamente ed, un suo esempio base, potrebbe essere il tempo stesso. In un linguaggio di programmazione reattivo il comportamento può essere espresso come semplice funzione del tempo. Altri esempi di comportamento: - finestra di un browser - popup in un'app - cursore del mouse **EVENTI(EVENTS)** Quando si parla di eventi ci si riferisce ad uno stream, potenzialmente infinito, di cambiamenti ai valori. A differenza del comportamento che continuamente cambia nel tempo, gli eventi avvengono in istanti ben precisi. Esempi di eventi sono: - pressione di un bottone della tastiera - click del mouse - tap su un bottone Modelli di valutazione ---------------------- L'immagine mostra modelli di valutazione pull-based and push-based a confronto .. image:: img/modelli_valutazione.png :scale: 60 % Il modello di valutazione di un linguaggio di programmazione reattivo riguarda il modo con cui i cambiamenti sono propagati attraverso il grafo delle dipendenze. Dal punto di vista del programmatore la propagazione dei cambiamenti avviene automaticamente (un cambiamento ad un valore dovrebbe essere automaticamente propagato ad ogni sua dipendenza computazionale). Quando avviene un evento da una sorgente di eventi, le computazioni dipendenti devono essere notificate del cambiamento e dovono essere nuovamente valutate. A livello di linguaggio occorre stabilire chi da origine alla propagazione dei cambiamenti: è la sorgente che dovrebbe spingere “push” nuovi dati alle sue dipendenze(consumatori) o sono le dipendenze che dovrebbero tirare “pull” dati dalla sorgente di eventi(produttore). Esistono due modelli di valutazione: **PULL-BASED** Nel modello pull-based le computazioni che necessitano di valori hanno bisogno di "tirarli" dalla sorgente. La propagazione è guidata dalla domanda di un nuovo valore (deman driven). Questo modello è abbastanza flessibile poiché le computazioni che richiedono dei valori posso “tirarli” quando necessitano e non a ogni cambiamento di questi. Si potrebbero, però, verificare latenze significative fra il momento in cui si verifica un evento e il momento in cui avviene la relativa reazione. **PUSH-BASED** Nel modello push-based, quando la sorgente ha nuovi dati, questi vengono "spinti" ad ogni loro dipendenza computazionale. La propagazione è guidata dalla disponibilità di nuovi dati (data driven) piuttosto che dalla domanda. I linguaggi che implementano un modello push-based hanno bisogno di una efficiente soluzione al problema di computazioni superflue poiché ad ogni cambiamento della sorgente ha luogo una nuova rivalutazione di tutte le dipendenze. Esistono linguaggi di programmazione reattiva che adottano entrambi i modelli e quindi possono usufruire dei vantaggi offerti dal modello push-based(efficienza e bassa latenza) e dal modello pull-based (la flessibilità di richiamare valori all’occorrenza). Glitch ------ Una proprietà che occorre considerare in un linguaggio reattivo è la sua capacità di evitare glitch, in questo contesto, vengono chiamati così gli aggiornamenti inconsistenti di espressioni che possono avvenire durante la propagazione dei cambiamenti. Questo fenomeno si potrebbe verificare nel caso in cui, un calcolo viene eseguito prima della valutazione di tutte le sue dipendenze cioè vengono combinati valori aggiornati e non. Ciò può succedere solo in linguaggi che adottano un modello di valutazione push-based. Consideriamo un esempio nella programmazione reattiva: .. code-block:: Text (1) var1 = 1 (2) var2 = var1 * 1 (3) var3 = var1 + var2 In questo esempio, il valore della variabile **var2** sarà sempre uguale al valore di **var1** e, il valore di **var3** sarà sempre uguale al doppio di **var1**. Se il valore di **var1** cambia in 2 ci si aspetta che **var2** assuma il valore di 2 e **var3** di 4. In alcune implementazioni reattive potrebbe accadere che la valutazione dell’espressione (3) avvenga prima della (2) e che **var3** assuma momentaneamente il valore incorretto di 3. Solo dopo la rivalutazione dell’espressione (2) verrà nuovamente ricalcolata la (3) portando **var3** ad un valore consistente. .. image:: img/glitch.png :scale: 60 % Questi aggiornamenti inconsistenti e superflui vengono chiamati glitch. Molti linguaggi di programmazione reattivi per eliminare tale fenomeno organizzano le espressioni in un grafo topologico ordinato: questo garantisce che ogni espressione venga valutata dopo che tutte le sue dipendenza sono già stata ricalcolate. Inoltre, una efficiente implementazione reattiva dovrebbe evitare rivalutazioni non necessarie di valori che non cambiano; riprendendo l’esempio sopra, se **var1** inizialmente uguale ad 1 viene aggiornata allo stesso valore non dovrebbe avvenire la rivalutazione delle espressioni che da essa dipendono. Operazioni di sollevamento -------------------------- Quando la programmazione reattiva viene incorporata in un linguaggio di programmazione, come libreria o come estensione del linguaggio stesso, occorre prevedere un meccanismo per convertire gli operatori e i metodi che il programmatore è abituato ad utilizzare in modo che possano operare con le astrazioni base della RP. Quando parliamo di operazione di sollevamento, lifting operation, ci riferiamo proprio alla conversione di un operatore ordinario in una sua variante che è in grado di lavorare con valori che possono cambiare nel tempo e, quindi, al processo che permette di passare al mondo della programmazione reattiva. Lifiting è un’operazione che può essere formalizzata con la seguente definizione, assumiamo che la funzione prenda un solo parametro in ingresso: .. code-block:: Text lif t : f(T) → flif ted(Behaviour < T >) Nella definizione, **T** non è un tipo di comportamento mentre, Behavior è il comportamento di un valore di tipo **T**. La funzione **f**, definita per operare su valori senza comportamento viene trasformata in una funzione che è possibile applicare ad un comportamento. Al tempo **i** il valore di una funzione di sollevamento **f** richiamata fornendole in ingresso il comportamento di un valore di tipo **T** può essere definito come segue: .. code-block:: Text flif ted(Behaviour < T >) → f(T i) Dove **Ti** denota il valore del comportamento al tempo **i**. Ci sono almeno tre principali tipo di operazioni di sollevamento. **LIFTING IMPLICITO** In un’operazione di sollevamento implicita quando un tradizionale operatore di un linguaggio di programmazione è applicato ad un comportamento, questo automaticamente viene convertito in un operatore in grado di lavorare con il comportamento stesso. .. code-block:: Text f(b1) → flif ted(b1) **LIFTING ESPLICITO** In questo modello, il linguaggio fornisce al programmatore una serie di primitive che possono essere utilizzate per sollevare l’operazione ordinaria e quindi, per permettergli di lavorare con valori che cambiano nel tempo. .. code-block:: Text lif t(f)(b1) → flif ted(b1) **LIFTING MANUALE** Il linguaggio di programmazione non prevede operatori di sollevamento; il programmatore manualmente deve acquisire il valore corrente, che nel frattempo può essere cambiato, usando gli operatori ordinari del linguaggio. .. code-block:: Text f(b1) → f(currentvalue(b1)) Multidirezionalità ------------------ Un’altra proprietà dei linguaggi di programmazione reattivi è la direzione di propagazione dei cambiamenti. I cambiamenti, infatti, possono avvenire in modalità unidirezionale o multirezionale. Un linguaggio è multidirezionale quando cambiamenti a valori derivati sono propagati anche ai valori dai quali questi derivano. Per esempio, riportiamo la formula che permette di convertire la temperatura fra Fahrenheit e Celsius: .. code-block:: Text F = (C * 1.8) +32 In questo caso, ad ogni cambiamento di **F** , **C** verrà automaticamente aggiornata e viceversa. Supporto per il distribuito --------------------------- Questa proprietà riguarda i linguaggi di programmazione reattivi che supportano la scrittura di programma reattivi distribuiti. I linguaggi con questa caratteristica, permettono di avere dipendenze computazionali e dipendenze fra dati distribuiti fra più nodi. .. _rx-java: RxJava ====== Questa libreria ci offre un modello di programmazione dove possiamo lavorare con gli eventi derivanti dalla UI o da chiamate asincrone alla stessa maniera con cui operiamo sulle collezioni e sugli stream di Java 8. Possiamo trasformare gli eventi usando funzioni come map, flatmap, filter, ecc. Però al posto di costruire del codice che risponde agli eventi, trattiamo questi eventi come un flusso che viene modificato e gestito man mano che si genera ad opera della classe **Observable**. Quando ci serve avere un oggetto da un metodo, definiamo il nostro metodo in questo modo: .. code-block:: Java T getObject() mentre se ci serve una collezione scriveremo .. code-block:: Java Iterable getCollection() Ora se ci spostiamo nel campo dell’esecuzione asincrona e vogliamo gestire un oggetto che sarà disponibile successivamente, andiamo a realizzare un metodo con questo stile: .. code-block:: Java Future getObject() Ci manca quindi l’ultima combinazione: una collezione i cui elementi saranno disponibili in futuro ma non necessariamente tutti nello stesso istante. La soluzione di RxJava per questo caso è proprio la classe Observable: .. code-block:: Java Observable getCollection() .. image:: img/rxjava-observable.png :scale: 40 % Un **Observable** è un oggetto che emette zero o più elementi per poi terminare con successo oppure durante il flusso degli elementi si interrompe a causa di un errore. Dall’altra parte abbiamo gli osservatori di tipo **Observer** che consumano l’elemento e gestiscono la terminazione dell’**Observable** (oppure il suo errore). Per gestire correttamente un **Observable**, un **Observer** implementa i metodi: - **onNext** - **onComplete** - **onError** Questo pattern è molto simile all’observer “classico” in quanto lo estende ma ci sono delle differenze importanti. In generale l’**Observable** non emette elementi se non c’è alcun **Observer** (ma è possibile realizzare anche hot **Observable** dove questa regola non vale). Inoltre, l’**Observer** viene notificato quanto l’**Observable** smette di produrre elementi perché termina o perché fallisce. Questo implica anche che non è necessario rimuovere l’**Observer** dall’**Observable** quando quest’ultimo è stato consumato perché avviene automaticamente. Definire un **Observable** è molto semplice: qualsiasi collezione può essere trasformata in **Observable**: .. code-block:: Java List cityList = new ArrayList<>(); cityList.add("Berlin"); cityList.add("Roma"); cityList.add("Madrid"); cityList.add("Wien"); Observable cities = Observable.from(cityList); **RxJava** mette a disposizione anche dei metodi di utilità per semplificarci la vita, come il metodo **just** che costruisce un **Observable** partendo da una lista di oggetti. Un modo alternativo di costruire un **Observable** è questo: .. code-block:: Java Observable moreCities = Observable.just("Zurich", "London", "Paris"); Possiamo anche generare **Observable** combinandone di esistenti. Ad esempio, posso fondere i due **Observable** precedenti con l’operatore **concatWith**: .. code-block:: Java Observable allCities = cities.concatWith(moreCities); Sotto il *marble diagram* per il metodo **concatWith**: .. image:: img/concat.png :scale: 60 % La freccia a destra indica l’avanzamento del tempo, abbiamo due **Observable** quindi due frecce. Il risultato del metodo è un nuovo **Observable** composto dagli elementi del primo seguiti dagli elementi del secondo mantenendo l’ordine. Gli **Observable** indicati terminano nell’istante indicato dalla stanghetta verticale. L’**Observable** è pronto, implementiamo l’interfaccia Observer per fare qualcosa con la lista delle città: ci servono tre metodi. .. code-block:: Java Observer traveller = new Observer() { @Override public void onCompleted() { System.out.println("My trip is finished"); } @Override public void onError(Throwable e) { System.out.println("I won't complete my trip!"); } @Override public void onNext(String t) { System.out.println("I've just visited " + t); } }; L’unica cosa che resta ora da fare è associare l’Observable al suo Observer tramite il metodo **subscribe** .. code-block:: Java allCities.subscribe(traveller); Il risultato prodotto è quello riportato qui sotto .. code-block:: Text I've just visited Zurich I've just visited London I've just visited Paris I've just visited Berlin I've just visited Roma I've just visited Madrid I've just visited Wien My trip is finished Se non vogliamo costruire un **Observer**, possiamo anche limitarci a passare il comportamento che vogliamo applicare solo nel caso onNext: basta implementare l’interfaccia **Action1** .. code-block:: Java Action1 weather = new Action1(){ @Override public void call(String city) { System.out.println("The weather is sunny in " + city); } }; allCities.subscribe(weather); per ottenere questo risultato: .. code-block:: Text The weather is sunny in Zurich The weather is sunny in London The weather is sunny in Paris The weather is sunny in Berlin The weather is sunny in Roma The weather is sunny in Madrid The weather is sunny in Wien Per fortuna con le lambda di Java 8, possiamo scrivere tutto in maniera più compatta e risparmiare un sacco di codice. .. code-block:: Java allCities.subscribe(city -> System.out.println("The weather is sunny in " + city)); Un po’ di trasformazioni ------------------------ Abbiamo visto le due operazioni indispensabili: la creazione di un Observable e la sottoscrizione di un Observer. Prima che gli elementi arrivino all’Observer possiamo applicare gli operatori che vogliamo in un modo del tutto simile a quelli che applichiamo su di uno stream di Java 8. Facciamo un piccolo esempio: .. code-block:: Java allCities.map(city -> city.toUpperCase()) .filter(city -> city.length() > 5) .take(3) .subscribe(city -> System.out.println("I LOVE " + city)); Guardiamo in dettaglio un operatore alla volta, il metodo **map** trasforma in maiuscolo gli elementi emessi dall’Observable. Dell’Observable risultante non prendiamo tutti gli elementi ma solo quelli che soddisfano la condizione passata a **filter**, cioè solo le città con un nome più lungo di cinque caratteri. Infine, non usiamo tutte le stringhe a disposizione ma solo le prime tre (metodo **take**) e finalmente raggiungono l’Observer che li stampa su console ottenendo questo risultato: .. code-block:: Text I LOVE BERLIN I LOVE MADRID I LOVE WIEN Facciamo un esempio diverso in modo da introdurre altri operatori: .. code-block:: Java cities.zipWith(moreCities, (String s1, String s2) -> "From " + s1 + " to " + s2) .doOnNext(s -> System.out.println("#Debug: "+ s)) .map(s -> s.toLowerCase()) .subscribe(System.out::println); Anche il metodo **zipWith** unisce due Observable, ma a differenza di **concat**, prende gli elementi emessi da due Observable e li combina a due a due secondo una funzione passata come parametro. Qui usiamo **zipWith** per generare una nuova stringa che comprende due città. Si noti che siccome cities ha quattro elementi mentre moreCities ne ha tre, il risultato sarà lungo quanto il più corto dei due Observable. Il metodo **doOnNext** consente di applicare un’azione intermedia sull’Observable, nel nostro caso lo usiamo per debuggare su console il risultato del metodo zip. Eseguendo questo esempio otteniamo quindi: .. code-block:: Text #Debug: From Zurich to Berlin from zurich to berlin #Debug: From London to Roma from london to roma #Debug: From Paris to Madrid from paris to madrid Gestione dei thread ------------------- Partiamo dalla lista di città .. code-block:: Java Observable allCities = Observable.just("Zurich", "London", "Paris", "Berlin", "Roma", "Madrid", "Wien"); in più introduciamo un semplicissimo log sulla console che ci servirà per sapere qual è il thread corrente che sta usando l’Observable: .. code-block:: Java public static void logStringWithThread(String string) { System.out.printf("%-26.26s: %-10.10s%n", Thread.currentThread().getName(), string); } public static Action1 debugString = string -> logStringWithThread(string); Usiamo **doOnNext** per stampare la stringa corrente e conoscere il thread in esecuzione attraverso la funzione **debugString**, in più eseguiremo due banali trasformazioni sulla lista di città: .. code-block:: Java allCities .doOnNext(debugString) .map(s -> s + "-") .doOnNext(debugString) .map(s -> "-" + s) .subscribe(debugString); Il risultato è riportato qui sotto. .. code-block:: Text main : Zurich main : Zurich- main : -Zurich- main : London main : London- main : -London- main : Paris main : Paris- .... I log ci indicano che, a partire dalla generazione dell’elemento fino ad arrivare all’Observer, siamo sempre nel main thread. E’ vero che RxJava si esprime al suo meglio quando lavoriamo in modalità asincrona, ma nulla indica che questo sia il comportamento di default. Abbiamo visto proprio in questo esempio che l’Observable inizia ad emettere elementi usando il thread che lo genera, in questo caso il principale. Non dobbiamo però neanche fare l’assunzione opposta, cioè che un’esecuzione sincrona resti sempre tale. Guardiamo questo esempio in cui chiediamo all’Observable di ritardare di mezzo secondo l’emissione di un elemento tramite l’operatore **delay**: .. code-block:: Java allCities .doOnNext(debugString) .delay(500, TimeUnit.MILLISECONDS) .map(s -> s + "-") .doOnNext(debugString) .map(s -> "-" + s) .subscribe(debugString); il risultato dell’esecuzione è il seguente. .. code-block:: Text main : Zurich main : London main : Paris main : Berlin main : Roma main : Madrid main : Wien RxComputationThreadPool-1 : Zurich- RxComputationThreadPool-1 : -Zurich- RxComputationThreadPool-1 : London- RxComputationThreadPool-1 : -London- Come si vede, l’operatore delay fa cambiare thread usandone uno preso da un thread pool definito da RxJava. Non solo l’Observable diventa asincrono ma anche il suo Observer. Dato che questo codice viene eseguito in una semplice applicazione da riga di comando, se non mettiamo uno sleep come ultima riga di codice, rischiamo di non vedere il risultato perché l’applicazione termina prima. Quindi, come complemento dell’indicazione di prima, possiamo dire questo: è sempre bene gestire l’Observable in modalità asincrona, anche se non lo stiamo forzando esplicitamente. Il primo metodo che possiamo usare per cambiare il thread dell’Observable è il metodo **subscribeOn**: ci consente di passare in input uno **Scheduler**, cioè, secondo la documentazione, un oggetto capace di eseguire un’unità di lavoro. Non vogliamo entrare nel dettaglio dello Scheduler, ci basta sapere che usandolo andiamo a “pescare” un thread da uno specifico pool e su questo eseguiamo l’Observable. RxJava mette a disposizione alcuni Scheduler predefiniti. Per il nostro primo esempio useremo **Schedulers.computation** che è pensato per svolgere lavori di calcolo. Se vogliamo gestire attività input/output useremo lo Scheduler **Schedulers.io**. Cambiamo, quindi, il codice in questo modo: .. code-block:: Java allCities .subscribeOn(Schedulers.computation()) .doOnNext(debugString) .map(s -> s+ "-") .doOnNext(debugString) .map(s -> "-" + s) .subscribe(debugString); e osserviamo il seguente risultato: .. code-block:: Text RxComputationThreadPool-1 : Zurich RxComputationThreadPool-1 : Zurich- RxComputationThreadPool-1 : -Zurich- RxComputationThreadPool-1 : London RxComputationThreadPool-1 : London- RxComputationThreadPool-1 : -London- RxComputationThreadPool-1 : Paris ... L’Observable viene gestito in maniera asincrona dall’inizio alla fine utilizzando un unico thread preso dal thread pool di **Schedulers.computation**. Due cose importanti da tenere presenti su questo metodo. La prima è che se lo si usa più volte cambiando Scheduler, le chiamate successive non hanno effetto. Quindi se, dopo il primo **map**, scrivessimo **subscribeOn(Schedulers.io())** questa istruzione verrebbe ignorata. Seconda cosa: **subscribeOn** può essere invocato in qualsiasi punto della catena, il suo effetto non cambia. Complementare a **subscribeOn** è il metodo **observeOn** che modifica il thread su cui l’Observable sta lavorando nel punto in cui viene inserito e ci permette di cambiare Scheduler, anche più volte. Vediamolo in azione: .. code-block:: Java allCities .doOnNext(debugString) .observeOn(Schedulers.computation()) .map(s -> "-" + s) .doOnNext(debugString) .map(s -> s + "-") .subscribe(debugString); Dopo aver emesso l’elemento e averlo stampato la prima volta in console, cambiamo thread spostandolo sullo **Schedulers.computation** e continuiamo il flusso dati ottenendo questo output: .. code-block:: Text main : Zurich main : London main : Paris main : Berlin main : Roma RxComputationThreadPool-3 : -Zurich main : Madrid RxComputationThreadPool-3 : -Zurich- main : Wien RxComputationThreadPool-3 : -London ... Prima ancora che il thread principale termini, l’esecuzione asincrona ha inizio e quindi i due thread stanno lavorando in concorrenza. Possiamo usare **subscribeOn** e **observeOn** assieme? Certamente, facciamo un esempio in cui usiamo tre thread diversi usando **observeOn** due volte. .. code-block:: Java allCities .subscribeOn(Schedulers.computation()) .doOnNext(debugString) .map(s -> "-" + s) .observeOn(Schedulers.io()) .doOnNext(debugString) .map(s -> s + "-") .observeOn(Schedulers.computation()) .subscribe(debugString); .. code-block:: Text RxComputationThreadPool-4 : Zurich RxComputationThreadPool-4 : London RxComputationThreadPool-4 : Paris RxComputationThreadPool-4 : Berlin RxComputationThreadPool-4 : Roma RxCachedThreadScheduler-1 : -Zurich RxComputationThreadPool-4 : Madrid RxComputationThreadPool-3 : -Zurich- RxCachedThreadScheduler-1 : -London RxComputationThreadPool-4 : Wien RxComputationThreadPool-3 : -London- ... Senza dare alcuna parametrizzazione aggiuntiva, l’effetto che otteniamo è quello di avere tre thread che lavorano in pipeline: ogni città passa in ordine attraverso i thread gestiti dagli **Scheduler**. Notiamo che da **Scheduler.computation** vengono estratti due thread.