Learning Observables the hard (easy) way
ReactiveX is the new black.
October 22, 2020 | 4 min. read
Observables (ReactiveX/Observer pattern) seem to be showing up everywhere I look. Or maybe I have just been looking for it. Either way. Iāve felt like I needed to learn to use them. Even though I havenāt had a problem requiring Observables. Well, I guess not even really knowing how it worked meant I had no idea when I should opt for it. So I did the next best thing: I shoe-horned it into a problem I had. And now that Iām through with it, I have a grasp of observables enough that I wanted to write down the process. For posterity. This will focus on rxjs (the Reactive library for JS) but the Observable pattern is implemented in a lot of major languages, see here.
The real problem that led me down the Observable rabbit hole: While working on the revamp from python to JS of my LastFM based project (š RIP. Cause of death: needlessly complicated code). I needed to make 0-100s of API call to LastFM, to pull in all of the historical data that any user scrobbled to the service. This only matters for the first run per user but I still had to deal with it; Now I wanted to deploy this app to Vercel. Itās free for pet projects! But Vercel only provides serverless functions, so I had to design the āAPI callingā workflow around a serverless API that cannot run for more than 10s. The set up I wanted to create is a portal of some sort that shows the progress of the download (5%, 10% ā¦). The actual pulling and saving to the database process have to happen on the backend (API keys and whatnot). But because of the serverless max run time, I definitely canāt make 100 calls in 10s.
So, I came up with a not-so-pretty solution. I have an endpoint that specifies how many API calls (pages) need to be made to get all the latest scrobbles. And for each page, the UI calls the backend to request a āpullā for that specific page. So instead of invoking the backend function to pull all the data at once - which will timeout. I call the backend n+1 times, (n=number of pages). Invocations are ācheaperā than run times. And the part that needs Observables? Well, it turns out showing the progress of the download is harder than it seems.
And this is where Obsevables shine. In fact the table on the rxjs
page for observables hints at it - observables are the solution to the āmultiple promisesā scenario. You have a bunch (or infinite) number of promises and want to receive the result when any of them resolve, and keep receiving the result until exhausted.
I was surprised this is not a very straightforward issue to solve natively in JavaScript. But, Promise
only has .all()
and .allSettled()
- they both wait for all []Promise
to resolve before resolving themselves. all()
even exits on the first error!
Whereas with an observable, you can dispatch multiple async calls, subscribe, and then listen for each resolved/emitted/errored value in a function.
This is the implementation I came up with:
export function saveScrobbles(username, totalPages, fromTime) {
let currentPage = totalPages;
const promises = [];
while (currentPage > 0) {
const p = fetch(`/api/scrobbles/${username}`, {...});
promises.push(p);
currentPage -= 1;
}
return from(promises).pipe(mergeAll());
}
from()
is anrxjs
function for converting values and promises to observables;pipe()
is a sort of helper function, for easily chaining operators. So if a, b, and c are operators that need to be applied to result of an observable o$, and they need to be applied in the order c(b(a(x))). this can expressed in a pipe as.pipe(a(), b(), c())
where each function gets the result of the preceding function call.rxjs
has a lot of operators, infact, thatās the real sell of the project Think of RxJS as Lodash for events. It facilitates writing in a functional way; you canpipe
afilter
into adebounce
into amap
intomerge
quite easily.
The saveScrobbles
function returns an observable, and the subscription looks like:
const pages$ = saveScrobbles(username, totalPages, from);
...
pages$.subscribe({
next: () => {
setCompletedPages((prev) => prev + 1);
},
complete: () => {...},
});
Once an observable is called with.subscribe()
, the next()
function is called every time a new value is returned. Technically, the signature is next(value)
but I didnāt use the returned value here.
The complete
function is called when all values have been emitted. This is equivalent to what you would normally have access to when using Promise.all()
or Promise.allSettled()
.
Thereās also an error
function whenever an error occurs while getting a value - not used here.
One very important property of observables that I am not even taking advantage of here, is that there can be multiple subscribers. This is another part of what makes observables powerful. Itās not possible to Promise.all()
more than once. But page$.subscribe
can be called multiple times from different places (and the next
, error
and complete
will be called individually).
So, completedPages
is the number of pages updated, and since thereās already access to the total number of pages. Itās trivial to show the percentage completed.
And thatās it, a very simple to use Reactivity/Observables.
I should reiterate this is barely scratching the surface of whatās possible with observables, more a basic intro for understanding what they are.
Also, I know there are better solutions to this problem. For one, a regular web-server can make all the calls and have an endpoint that gets polled to get the progress state. Or, using a Websocket for instant progress pings - I think this is preferable since it will avoid the polling logic - which could lead to a lot of GET calls as well.
Some good references:
- Netflix Engineering video video on rxjs.
- rxjs.
- ReactiveX - Intro