Fiddling with RxJs Streams
In this post I’ll touch on the emergence of JavaScript reactive streams. We’ll look at a cool online interactive tool that is useful for checking behavior of stream operators. I’ll finish by showing six RxJs JSFiddle examples that you can run and modify.
Background
Rich applications, such as modern SPAs, deal in state changes due to events. We’re accustomed to having each interested event listener self-register a callback with an event emitter. That’s the observer pattern. This pattern is dynamic and more workable than hard-coding each event emitter to invoke foreign callbacks.
Nowadays a complex application may have a morass of cooperating event handlers that have intermittent life cycles. This is hard to reason about. It can lead to memory leaks from dangling listeners, intermittent navigation-instigated rogue behaviors, or a triangle-of-doom handler tangle in source code.
Promises mitigate many problems by flattening callback source. They are a uniform result container. I’ve written about promises. A settled promise eventually contains a future data result or an error value. That promise is immutable. It’s one-shot: valid for only a single event.
We often deal with successive sequences of events. Think of mouse position movement; it produces a stream of cooperating events that we may need to handle. Imagine implementing a drag-drop for example.
Reactive Stream
A reactive stream elevates the observer pattern to a uniform framework. It’s been called “The observer pattern done right.” It provides a pushed stream of items that we can transform, merge with other streams, or reduce to a single value. Functional programming techniques are in-play. Among them are reliance on first-class functions (passed as data), high-order functions (parameter functions; return value function), and lambdas (anonymous functions).
Event stream processing resembles the iterator pattern familiar in data streams. Events are divas, however. They occur unpredictably. Thus, event streams use a push model. Iterators use a pull model. Three nutshell concepts are:
- Reactive (adjective – medieval Latin): responds to a stimulus in a particular manner.
- Stream (noun – middle English): a steady succession (as of items or events).
- Push versus pull – event streams versus data streams.
The following diagram shows a reactive stream pattern. An Observable
wraps an item that is an event source. An Observable.subscribe
triggers an intermittent stream of discrete items. It is the stream’s listener. An Observable
can wrap almost any kind of data, including promises, or nothingness.
A set of well-known operators can tap into its output stream. Those are higher-order functions that often take a parameter lambda, object, or a primitive. Each operator takes a value from the stream, returning a value onto the stream. Together, they resemble a monadic chain process. An operator’s parameter function could internally start a new stream or combine two streams.
ReactiveX
It’s difficult to roll your own reactive stream framework. There are several packages available across platforms and languages. I focused on ReactiveX. See http://reactivex.io/intro.html. The introduction explains it nicely. It’s an umbrella project that has a JavaScript binding named “Reactive Extensions,” or RxJs.
I used the RxJs framework in examples that follow. Refer to https://github.com/Reactive-Extensions/RxJS/blob/master/readme.md.
A side-benefit of ReactiveX technology is that it cleans up event listeners. Programmers that use traditional event listeners must be ever-mindful of disengaging unneeded listeners, or these will be memory leaks.
The reactive mode of programming could be difficult to ease into. The next section of this article shows an interactive reference tool that help to visualize various operator functions. Afterward, I’ll show six JsFiddle snippets that cover disparate use cases.
Marble Diagrams
Applying operators for transforming, filtering, and merging streams is a distinct way of reasoning about an event solution. For reference, try this API reference: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md.
Some of us can learn in a visually interactive manner. Look at the interactive visualization tool at http://rxmarbles.com/.
Want to see what really happens when you operate on streams? For example, a marble diagram of the merge operator allows you to drag event marbles of two input streams to instantly see the output stream. Read on …
Combining
Here’s the merge
combining operator. The two topmost input streams apply merge operator to produce the bottom output stream.
Below, I’ve repositioned the “40” and the “1” event marbles of the second input stream.
See the stable ordering of the bottom output stream afterward? Apparently, Egon, of Ghostbusters, never tried crossing reactive streams.
Transforming
The well-known transformational map
operator occurs in other functional libraries that process streams, collections, or plain old arrays. Recall each
or forEach
from lodash and underscore.
Additionally, there is the JS Array prototype’s forEach
, or the Immutable.js map
function. All, including our RxJs map operator, pass a collection or stream to a caller-supplied function, creating a new output sequence.
Here’s the map marble diagram. For each member of the input, its function parameter returns ten times its numeric input parameter. In the end, it transforms the input stream, producing a new stream consisting of each input multiplied by ten. It preserves ordering.
Below, I moved the “2” event marble to the extreme left to show that ordering remains.
Filtering
We’ll see a takeUntil
filtering operator in a later drag-drop example. Look at that operator’s marble diagram. The second input stream conditionally filters the first stream’s members into the output stream. The second JS stream’s earliest false “0” item halts output. Note the vertical bar on the bottom output that shows the end of the output stream.
Below, I’ve moved the rightmost middle stream “0” item leftward to just after the “2” item of the upper stream. The takeUntil
stops replicating the upper stream there, now producing a result stream containing only “1” and “2”. Again RxJs preserved the ordering.
JSFiddle Examples
Each example depends only upon the RxJs library and JQuery. I dropped back to ES5 syntax in the online JSFiddle code because Safari doesn’t support arrow functions. The following examples do use ES6 syntax. See https://babeljs.io/docs/learn-es2015/ for succinct examples of ES6 and ES2015.
Example 1: A Stream of Nothing
Recall that an Observable
represents a push-based collection. In RxJS, an Observable can front any number of legal JavaScript entity or entities. Let’s lean on the boundary. How about an empty Observable?
Yes, but what use is a stream of nothing? Well, it’s an edge case. Consider a dynamic application that might need to handle a source of any number of indeterminate dynamic items, including an undefined JS value. We can handle that.
Refer to JSFiddle https://jsfiddle.net/mauget/3u4po86u/. It produces an Observable
by mumbling an Rx.Observable.empty()
incantation. Nothing happens until the Observable suffers a subscribe
function call. An RxJs subscribe can take three optional functional parameters:
onNext
onError
onCompletion
The onError
and onCompletion
calls are mutually exclusive. The framework invokes one or the other, not both. For effect I used a window.setTimeout
to delay invoking the subscribe until three seconds pass. When the timer pops, I call subscribe. The stream immediately awakens to push the “empty” item. The onCompletion
logs a message and updates the DOM with the message.
const delay = 3, // seconds source = Rx.Observable.empty(), // Dormant until subscription occurs msg1 = `Will subscribe to empty source in ${delay} seconds ... `; $('body').append(msg1); // HTML output console.log(msg1); // deb log output - window.setTimeout( () => { const msg2 = `Subscribing ... here comes the data ...`; $('body').append(` ${msg2}`); console.log(msg2); // Kick the stream source.subscribe( v => console.log(`onNext: ${v}`), // arg1 fn v => console.log(`onError: ${v}`), // arg2 fn v => { // arg3 fn const msg3 = `yes ... received the expected "${v}" from the stream.`; $('body').append(` ${msg3}`); console.log(msg3); } ); }, delay * 1000 );
Example 2: Time Interval
Example 1 had a standard window.setTimer
that provided a timed interval as a demo prop.
You object: “Isn’t a timer expiration an event? I thought RxJs is about observing events!” Yup. In that hello-world kind of snippet, I didn’t want to confuse a trivial stream example by involving a second stream.
Now you’re ready, Grasshopper! Let’s observe timer expirations via RxJs. An Observable
stream of timer expirations will generate a dynamically increasing trip counter of values. Refer to the JSFiddle at https://jsfiddle.net/mauget/t03fcr8y/. It generated the before and after panels here.
Look at the code below. Notice the RxJs interval
, skip
, and take
operators that sequentially process the stream input items into an output stream? Those govern the pace, content, and extent of the product stream.
The interval operator is the heart of the code. See https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/interval.md. This is a reworked example from RxJs samples. At each delayed invocation the interval call injects a numeric value from a monotonically-increasing zero-origin set.
The skip(0)
operator omits the zeroth item. The take(10)
limits processing to ten output members.
The subscribe
call starts the stream. Its onNext
functional parameter calculates a current odometer value from each emitted stream item. The onError
and onCompletion
functional parameters log their respective calls.
// distance = rate * time const R = 60; // mph let t = 0; // We'll step up at intervals const source = Rx.Observable .interval(500) // 1/2 second interval .skip(1) // Skip zeroth .take(10); // Take next 10 // Kick off ... const subscription = source.subscribe( t => { console.log('hours: ' + t); $('#mph').html(R); $('#hours').html(t); $('#miles').html(R * t); // Do the math, display it }, err => console.log('Error: ' + err), () => console.log('Completed'));
Example 3: Interactive Character / Word Counter
Let’s maintain a dynamic count of words and non-whitespace characters. They’re entered or pasted live into an HTML textarea element. Backspace or delete keys decrement the counts. I count whatever is held in the textarea at each event instant.
Refer to https://jsfiddle.net/mauget/o6rprdhx/. It’s another reworked RxJs sample. It defines an Observable
of a DOM textarea keyup event. The Observable
fronts a push-based collection consisting of a single object that holds the character count and word count of the textarea. The snippet generates the contents from a jQuery map
function chained to an event selector. The map callback uses regular expressions to count current non-whitespace characters and words held within the textarea.
Nothing happens until the subscribe
call on the Observable. The subscribe
’s onNext
function parameter populates the result, much like a traditional event callback. I omitted onError
and onCompletion
callback functions. A pushed Observable-to-subscribe event pulse occurs at each keyup. The onNext
carries out an instant update of the two counts in the UI and console log.
// #A. DOM elements-of-interest from DOM query selectors const $txtInput = $('#txtInput'), $charCount = $('#charCount'), $wordCount = $('#wordCount'); // #B. Observable of keyup triggered at textInput -- it's a monad Rx.Observable.fromEvent($txtInput.get(0), 'keyup') .map(e => { return { cc: (e.target.value.match(/[^\s]/g) || []).length, wc: (e.target.value.match(/\b\S+\b/g) || []).length } }) // #C. Kick off the stream .subscribe(v => { console.log(v); $charCount.html(` Chars: ${v.cc}`); $wordCount.html(` Words: ${v.wc}`); });
Example 4: Drag and Drop
Here is a simple drag-drop example. It uses cooperating Observables on the events: mousedown, mousemove, and mouseup. These collectively implement dragging and dropping a DOM item anywhere within a panel.
Refer to the JsFiddle at https://jsfiddle.net/mauget/9bhp0qu8/. Try dragging the fruit. This snippet, too, grew out of an RxJs example. It creates three kinds of mouse action observables. The code enables a user to reposition a draggable fruit image on a panel.
The subscribe to the mousedown Observable fruit starts the action. The event item on the fruit chains to a flatMap
operator chained to a takeUntil
mouseup operator. A flatMap
is a common function encountered in functional programming. It’s a map
function that flattens its input sequences into a single output sequence.
(function(dragable) { // Observe three major kinds of mouse events: // mousedown, mouseup on the dragable; mousemove over the document. const mousedown = Rx.Observable.fromEvent(dragable, 'mousedown'), mousemove = Rx.Observable.fromEvent(document, 'mousemove'), mouseup = Rx.Observable.fromEvent(dragable, 'mouseup'); // A. Detect mouse down; // B. While down, listen for mouse move and mouse up; // C. When mouse up: stop listening for moves and mouse up. const mousedrag = mousedown.flatMap( md => { // Capture mouse down offset position const startX = md.offsetX, startY = md.offsetY; // B1. Track pos differentials using mousemove, until mouseup return mousemove.map( mm => { mm.preventDefault(); return { left: mm.clientX - startX, top: mm.clientY - startY }; // C. stop the drag when mousup }).takeUntil(mouseup); }) // Subscribe to mousemove's mousedrag stream: // B2. Update draggable's position from mousedrag stream of events mousedrag.subscribe( pos => { dragable.style.top = pos.top + 'px'; dragable.style.left = pos.left + 'px'; }); })($('#dragable').get(0));
Example 5: AJAX Promise – A Set of Quantum-Random Numbers
Let’s display a set of profoundly random numbers requested from a REST service.
Refer to JSFiddle https://jsfiddle.net/mauget/b0jfmge6/. In this example, we access the random number REST service via AJAX. We can make an Observable
for almost anything. We’ll observe a promise. An AJAX call can return a promise to provide asynchronous behavior that we need to drive a UI from a latent network response. The stream will push the promise.
To review, a promise is either pending or settled. If it settles to a success
state, it forever returns the data it contains from that success. Thus you can call then
on that promise whenever you desire, as often as you desire, but it will always return its settled success or reject value for its lifespan.
Our example observes a button click. The click Observable
, onNext
, displays a working
banner. A chained flatMap
operator calls getNumbers
to create a second Observable
from a promise. That promise is an immediate result of an AJAX call to the REST service. This is easier to understand by looking at the snippet.
The flatMap
operator gates the settled then
object payload of the promise Observable
onto the button click stream as a flat sequence of numbers. A subscribe on the button click stream displays the random numbers.
I kick off activity by triggering a refresh button click at the bottom of the code. Notice what happens if you click the button repeatedly? You see the identical random number sequence repainted!
I claimed the sequence is random, so why does it repeat? Answer: because the promise stream already resolved that settled one-shot AJAX promise. There is no second network invocation.
const $button = $('#button'), $result = $('#result'); // AJAX fn: returns observable promise const getNumbers = count => { return Rx.Observable.fromPromise( // Get promise of: {"type":"uint8","length":1,"data":[99],"success":true} $.ajax({ url: `https://qrng.anu.edu.au/API/jsonI.php?length=${count}&type=uint8` }) .promise()); }; // Observe button click, subscribe to result Rx.Observable.fromEvent($button.get(0), 'click') .map(e => { $result.html('<span class="working">(working)</span>'); return e.target.value; }).flatMap(getNumbers(300)) .subscribe(d => { let sep = ''; console.log(d); if (d.success) { $result.empty(); d.data.forEach(v => { $result.append(sep + v); sep = ', '; }); } }); $button.trigger('click');
Sidebar
As a physics major diverted to computer programming, I’m compelled to speak about our data.
The numbers originate from a vacuum chamber in Canberra, Australia. A vacuum that emits random numbers! The count of particles and photons is zero in a perfect dark classical vacuum. At the quantum level, however, a vacuum is weird. A lab at the Australian National University measures field variations from continually materializing and dematerializing virtual particle pairs. See https://en.wikipedia.org/wiki/Virtual_particle.
A REST service streams those values. Statistical tests show that the sequence is profoundly random.
Example 6: AJAX Promise – A Quilt of Quantum-Random Colors
My wife is a quilter. I couldn’t resist converting example 5 output into a random patch quilt.
Refer to JSFiddle https://jsfiddle.net/mauget/9c2ecLqs/. The code is largely that of example 5, aside from presentation logic in the subscribe
call. The subscribe callback forces each sequence item into a number of the interval 0 … 255. It combines each group of three into one CSS RGB value. Finally, it emits each RGB value into a grid. Again, notice that the refresh button causes an identical quilt to render after the first rendering. That settled promise remains immutable and happy.
const valCount = 1020; const $button = $('#button'), $result = $('#result'); // AJAX fn: returns observable promise const getNumbers = count => { return Rx.Observable.fromPromise( // Get promise of: {"type":"uint8","length":1,"data":[99],"success":true} $.ajax({ url: `https://qrng.anu.edu.au/API/jsonI.php?length=${count}&type=uint8`}) .promise()); }; // Listen for button clicks, do service request, subscribe to result Rx.Observable.fromEvent($button.get(0), 'click') .map(e => { $result.html('<span class="working">(working)</span>'); return e.target.value; }) .flatMap( getNumbers( valCount ) ) .subscribe(data => { console.log(data); if (data.success) { $result.empty(); let cols = 0 const c = []; // rgb value data.data.forEach( v => { c.push( v % 256 ); if (c.length > 2 ) { const rgb = `rgb(${c.pop()},${c.pop()},${c.pop()})`; $result.append( `<span style="background-color:${rgb}"> </span>`); if ( ++cols > 33 ) { $result.append(` `); cols = 0; } } }); } }); $button.trigger('click');
Wrap-Up
Reactive streams are a unified way of dealing with asynchronous events. They’re a higher-level implementation of the observer pattern. They rely on functional programming. An Observable
pushes events onto a stream. Operators modify it. A subscribe operator starts, handles, and ends stream processing.
This paradigm may require a shift of your thinking for programming in an asynchronous environment. Marble diagrams help to understand many functional operators used in ReactiveX.
We saw six operational RxJs snippets that support varying use cases. You could run and modify them on JSFiddle for familiarization. Reactive streaming libraries exist across platforms and operating systems. ReactiveX is fairly unified across the board, so concepts from this article could apply beyond JavaScript.
Reference: | Fiddling with RxJs Streams from our WCG partner Lou Mauget at the Keyhole Software blog. |