Programowanie reaktywne

Programowanie reaktywne najłatwiej opisać posługując się przykładem. Załóżmy, że mamy dwa obiekty A i B, które są ze sobą powiązane - obiekt B potrzebuje do poprawnego działania stanu obiektu A. Gdy stan obiektu A ulega zmianie należy zaktualizować informacje, które posiada obiekt B.

W programowaniu interaktywnym to obiekt A musi wiedzieć jakie inne obiekty od niego należą i co należy zrobić aby je poinformować o zmianie swojego stanu. Wraz z rozrastaniem się aplikacji, ilość obiektów zależnych od A będzie rosła i w końcu utrzymanie stanie się bardzo trudne.

Na pomoc przychodzi programowanie reaktywne. Teraz obiekt A wystawia specjalny interfejs, przez który informuje o zmianie swojego stanu. To obiekt B musi skorzystać z tego interfejsu i zaktualizować swój stan. Zwiększenie ilości obiektów zależnych od A nie powoduje zmian jego implementacji.

Opisana powyżej komunikacja obiektów A i B przez specjalny interfejs jest zastosowaniem wzorzeca obserwatora.

everything is a stream

W przypadku aplikacji wszystkie występujące zdarzenia mogą być traktowane jako strumień zdarzeń. Użytkownik klika przycisk - zdarzenie. Kursor myszy jest umieszczany w pewnym obszarze - zdarzenie. Zmienia się czas na zegarze - zdarzenie. Taki strumień zdarzeń jest dostawcą danych.

W programowaniu reaktywnym wszyscy dostawcy danych są ujednoliceni i przedstawiani jako obserwowalny strumień. Strumień to sekwencja uporządkowanych zdarzeń zachodzących w czasie.

Aby pobrać wartość ze strumienia, trzeba go zasubskrybować. Obiekt subskrybowalny to dowolny obiekt z danymi, który implementuje wzorzec obserwatora.

W przypadku obserwowalnego strumienia zdarzeń najprościej nie myśleć o nim jak o strumieniach lecz jak o tablicach. Można na nich wykonać operacje typowe dla tablic takie jak mapowanie czy filtrowanie.

RxJS

RxJS (Reactive Extensions for JavaScript) jest reaktywną biblioteką, która umożliwia zgrabne łączenie kodu asynchronicznego i opartego na zdarzeniach. Zapewnia ona wysoki poziom abstrakcji i wiele operacji dających dodatkowe możliwości.

Think of RxJS as Lodash for events.

Jest ona rozwijana przez Microsoft ze wsparciem społeczności.

Observable

Typ Rx.Observable reprezentuje obserwowalny strumień. Dzięki niemu możemy pracować z dowolnym typem danych w ten sam sposób ponieważ są one przekształcane w strumień. Typ Rx.Observable łączy świat programowania funkcyjnego i reaktywnego.

Observables wypełniają brakujący element tabeli pull-push.

Strategia pull zakłada, że producent jest pasywny i zwraca dane gdy konsument o to poprosi. Producent nie wie kiedy będzie działał, pełna kontrola leży po stronie konsumenta.

Strategia push zakłada, że to producent jest aktywny i zwraca dane kiedy chce. Konsument jest stroną pasywną i jedynie reaguje na nadejście nowych informacji.

Przykład 3.1
Rx.Observable.from([1, 2, 3, 4])
   .subscribe(item => { 
      console.log(`item: ${item}`); 
   });
// -> 'item: 1'
// -> 'item: 2'
// -> 'item: 3'
// -> 'item: 4'

Metoda Rx.Observable.from() pozwala stworzyć strumień z tablicy, który emituje jej elementy jako kolejne zdarzenia. Aby pobrać elementy ze strumienia korszystamy z metody Observable.subscribe() i przekazujemy do niej callback nazywany też Observer-em.

Metoda Observable.subscribe() przyjmuje jako parametr trzy funkcje.

Przykład 3.2
let observer = {
   next: item => { console.log(`next: ${item}`); },
   error: err => { console.log(`error: ${err}`); },
   complete: () => { console.log(`complete`) }
};

Rx.Observable.from([1, 2, 3, 4])
   .subscribe(observer);
// -> 'next: 1'
// -> 'next: 2'
// -> 'next: 3'
// -> 'next: 4'
// -> 'complete'

Pierwsza z funkcji - next - odpowiada za obsługę kolejnych elementów strumienia.

Druga - error - za zakończenie przetwarzania spowodowane błędem.

Trzecia - complete -za zakończenie przetwarzania strumienia.

Możemy podać dowolną ilość funkcji, jednak zawsze trzeba pamiętać o ich kolejności.

Operatory

Biblioteka RxJS zapewnia zbiór operatorów do przetwarzania strumieni, część z nich znamy już z przetwarzania kolekcji - map, filter.

Mapowanie strumienia

Operator Rx.Observable.map() pozwala na mapowanie wartości elementów strumienia podobnie jak metoda Array.map().

Przykład 3.3
Rx.Observable.from([1, 2, 3, 4])
   .map(item => item * item)
   .subscribe(observer);
// -> 'next: 1'
// -> 'next: 4'
// -> 'next: 9'
// -> 'next: 16'
// -> 'complete'
Filtrowanie strumienia

Metoda Rx.Observable.filter() pozwala na filtrowanie elementów strumienia, podobnie jak metoda Array.filter().

Przykład 3.4
Rx.Observable.from([1, 2, 3, 4])
   .filter(item => item % 2 === 0)
   .subscribe(observer);
// -> 'next: 2'
// -> 'next: 4'
// -> 'complete'
Operator Rx.Observable.do()

Pozwala na wykonanie zadanej akcji bez wypływania na elementy strumienia. Możemy go wykorzystać do wypisania wartości elementu na przykład podczas debugowania.

Przykład 3.5
Rx.Observable.from([1, 2, 3, 4])
   .do(item => { console.log(`do: ${item}`); })
   .subscribe(observer);
// -> 'do: 1'
// -> 'next: 1'
// -> 'do: 2'
// -> 'next: 2'
// -> 'do: 3'
// -> 'next: 3'
// -> 'do: 4'
// -> 'next: 4'
// -> 'complete'
Scalanie strumieni

Operator Rx.Observable.merge() pozwala scalić dwa strumienie zdarzeń w jeden, podobnie jak metoda Array.concat().

Przykład 3.6
let stream1$ = Rx.Observable.from([1, 2, 3, 4]);
let stream2$ = Rx.Observable.from(['a', 'b', 'c']);

stream1$
   .merge(stream2$)
   .subscribe(observer);
// -> 'next: 1'
// -> 'next: 2'
// -> 'next: 3'
// -> 'next: 4'
// -> 'next: a'
// -> 'next: b'
// -> 'next: c'
// -> 'complete'
Ograniczanie strumienia

Operator Rx.Observable.take() pozwala ograniczyć ilość elementów pobieranych ze strumienia i jego zakończenie.

Przykład 3.7
Rx.Observable.from([1, 2, 3, 4])
   .take(2)
   .subscribe(observer);
// -> 'next: 1'
// -> 'next: 2'
// -> 'complete'
Obsługa zdarzeń

Obserwowalne strumienie idealnie nadają się do obsługi asynchronicznych akcji oraz zdarzeń.

Aby stworzyć strumienień ze zdarzeń skorzystamy z metody Rx.Observable.fromEvent(). Przyjmuje ona dwa parametry: element oraz typ zdarzenia.

Utworzymy strumień ze zdarzeń mousemove, które są wyzwalane na elemencie window.

Przykład 3.8
Rx.Observable.fromEvent(window, 'mousemove')
   .subscribe(observer);

Jednak interesują nas jedynie te ruchy myszy, które odbywają się w kwadracie 200 na 200 pikseli w lewym górnym rogu ekranu. Skorzystamy z mapowania i filtrowania strumieni aby ograniczyć zdarzenia tylko do tego obszaru.

Przykład 3.9
Rx.Observable.fromEvent(window, 'mousemove')
   .map(event => ({x: event.clientX, y: event.clientY}))
   .filter(position => position.x < 200 && position.y < 200)
   .subscribe(observer);
Obsługa błędów

Zawsze musimy się liczyć, że coś może pójść nie po naszej myśli. Aby zabezpieczyć się przed błędami, które mogą pojawić się w czasie przetwarzania stumienia wykorzystamy metodę Rx.Observable.catch().

Przykład 3.10
Rx.Observable.fromEvent(window, 'mousemove')
   .map(event => ({
      x: event.clientX, 
      y: event.clientY, 
      z: event.client.zIndex
   }))
   .catch(error => { 
      console.log(`catched error: ${error.message}`); 
      return Rx.Observable.of({x: 0, y:0, z: 0}); 
   })
   .subscribe(observer);

Pozwala ona na obsłużenie błędu i przekazanie do subskrybenta innej wartości lub lepszego komuniaktu o błędzie.

Tworzenie obiektów typu Rx.Observable

Poznaliśmy już kilka sposobów na utworzenie strumienia. Rx.Observable.from() pozwala utworzyć strumień z obiektu iterowalnego. Rx.Observable.fromEvent() ze zdarzeń a Rx.Observable.of() pojedynczej wartości. BibliotekaRxJS` dostarcza jeszcze wiele przydatnych metod pozwalających na tworzenie strumieni.

Przyjrzyjmy się jak samemu utworzyć strumień. Pozwala na to metoda Rx.Observable.create(). Przyjmuje ona jako parametr funkcję, która jest odpowiedzialna za generowanie elementów strumienia i jego ewentualne zakończenie - nazywaną subscribe.

Przykład 3.11
Rx.Observable.create(observer => {
   observer.next(1);
   observer.next(2);
   observer.next(3);
   observer.next(4);
   observer.complete();
})
.subscribe(observer);

Funkcja subscribe przyjmuje jako argument obiekt observer, który podobnie jak omawiany wcześniej Observer, posiada trzy metody:

  • observer.next() - emituje kolejny element strumienia,
  • observer.complete() - zamyka strumień,
  • observer.error() - zamyka strumień zwracając błąd błędem.
Jednoczesne wykonywanie strumieni

Podczas obsługi asynchronicznych akcji, na przykład komunikacji z serwerem, zdarza się, że chcemy jednocześnie wykonać kilka strumieni. Pozwala na to metoda Rx.Observable.forkJoin(), która subskrybuje kilka strumieni, agreguje ostatnie zwrócone przez nie wartości i kończy działanie kiedy wszystkie strumienie zostaną zakończone.

Przykład 3.12
let stream1$ = Rx.Observable.create(observer => {
   observer.next(1);
   observer.complete();
});

let stream2$ = Rx.Observable.create(observer => {
   setTimeout(() => {
      observer.next(2);
      observer.complete();
   }, 1000);
});

Rx.Observable
   .forkJoin(stream1$, stream2$)
   .subscribe(observer);
Chain-owanie strumieni

Podczas komunikacji z serwerem często zdarza się tak, że jedno zapytanie jest zależne od drugiego - wymaga zwróconej przez nie wartości. Możemy dokonać chain-owania strumieni wykorzystując metodę Rx.Observable.flatMap().

Przykład 3.13
let stream1$ = Rx.Observable.of(3);

let stream2$ = (count) => Rx.Observable.create(observer => {
   for(let i = 0; i < count; i += 1) {
      observer.next(i);
   }
   observer.complete();
});

stream1$
   .flatMap(count => stream2$(count))
   .subscribe(observer);

Observables w akcji

Pobieranie danych na podstawie wartości pola

Załóżmy, że mamy pole input, w którym znajdzie się zapytanie. Chcemy nasłuchiwać na jego zmiany i wykonać akcję tylko gdy długość zapytania jest równa lub większa niż 3.

Przykład 3.14
let queryInput = document.querySelector('#query');

Rx.Observable.fromEvent(queryInput, 'input')
   .map(event => event.target.value)
   .filter(value => value && value.length >= 3)
   .subscribe(observer);

Gdy zapytanie jest odpowiednie pobieramy dane z API wykorzystując operator Rx.Observble.ajax().

Przykład 3.15

function search (query) {
   let id = query.split('')
      .map(c => c.charCodeAt(0))
      .reduce((c, acc) => acc + c, 0) % 100 + 1;

   return Rx.Observable
      .ajax(`https://jsonplaceholder.typicode.com/posts/${id}`);
}

let queryInput = document.querySelector('#query');

Rx.Observable.fromEvent(queryInput, 'input')
   .map(event => event.target.value)
   .filter(value => value && value.length >= 3)
   .flatMap(value => search(value))
   .map(response => response.response)
   .subscribe(observer);

Powstaje jednak pewny problem. Kiedy szybko wpisujemy zapytanie to wysyłanych jest wiele żądań i nie wiadomo w jakiej kolejności wrócą. Gdy wpisujemy zapytanie interesuje nas wynik ostatniego z żądań, poprzednie są zbędne i warto byłoby je anulować.

Przykład 3.16
Rx.Observable.fromEvent(queryInput, 'input')
   .map(event => event.target.value)
   .filter(value => value && value.length >= 3)
   .switchMap(value => search(value))
   .map(response => response.response)
   .subscribe(observer);

Operator Rx.Observable.switchMap() zwróci nam wynik ostatniego z zapytań. Dodatkowo anuluje poprzednie żądania dzięki czemu nie będziemy niepotrzebnie obciążać serwera.

Obsługa kilkania

Załóżmy, że mamy przycisk Click me! i chcemy reagować na kliknięcia w niego.

Przykład 3.17
let clickMeBtn = document.querySelector('#click-me');

let clicks$ = Rx.Observable.fromEvent(clickMeBtn, 'click');

clicks$.subscribe(observer);

Jednak interesują nas tylko potrójne kliknięcia.

Przykład 3.18
let clicks$ = Rx.Observable.fromEvent(clickMeBtn, 'click');

clicks$
   .bufferCount(3)
   .subscribe(observer);

Operator Rx.Observable.bufferCount() pozwala na buforowanie określonej liczby zdarzeń i zwraca je jako tablicę.

Jednak to jeszcze nie to o co nam chodziło. Potrójne kliknięcie musi odbyć się w krótkim przedziale czasu, na przykład w przeciągu 400 milisekund.

Przykład 3.19
let clicks$ = Rx.Observable.fromEvent(clickMeBtn, 'click');

clicks$
   .bufferWhen(() => clicks$.delay(400))
   .filter(events => events.length >= 3)
   .subscribe(observer);

Operator Rx.Observable.bufferWhen() zbie­ra wszyst­kie klik­nię­cia aż do mo­men­tu gdy prze­ka­za­na funk­cja coś wy­emi­tu­je. Ta ro­bi to dopie­ro po 400ms po klik­nię­ciu. Dzięki temu udało nam się wykryć potrójne kliknięcia, które miały miejsce w przeciągu maksymalnie 400 milisekund.


Źródła

results matching ""

    No results matching ""