Reactive programming has gained significant popularity in recent years, and RxJS (Reactive Extensions for JavaScript) has emerged as a powerful library for implementing reactive programming in JavaScript applications. At the core of RxJS lie observables and operators, which enable developers to work with asynchronous data streams in a concise and powerful manner. In this article, we'll explore the fundamental concepts of RxJS, including observables, subscriptions, and observers, and delve into the details of some essential operators such as map, tap, filter, and take.

Understanding The Process

An observable represents a stream of data that can be observed over time. Think of it as a sequence of values emitted over time, whether those values are user inputs, HTTP responses, or any other asynchronous events. Observables are crucial in the reactive paradigm as they allow developers to model and work with these asynchronous data streams in a consistent and declarative way.

Understanding Observables - Manufacturing Line Analogy

A good way to understand working with observables is to imagine a manufacturing assembly line. Imagine you're overseeing a manufacturing assembly line, and you want to ensure the efficient and organized production of goods. This assembly line represents the flow of data in an Angular application using RxJS observables.

In a manufactoring assembly line you have a Conveyor Belt (Observable), Workers (Observers), Production Stations (Operators), Quality Control (Error Handling), Start and Stop Buttons (Subscribe, Unsubscribe) and Speed Control (Schedulers).

Conveyor Belt (Observable)

The RxJS Observable is like a conveyor belt. It's a continuous, dynamic channel through which different items (data) move. On the conveyor belt, items come in and out in a seamless flow, much like data emitted and subscribed to in an observable stream. In an observable data can come from a server, from a service, from inputs or it can be generated by RxJS. One way to generate an observable is by using of() method implemented by RxJS


  observable = of(1, 2, 3, 4, 5, 6, 7, 8); 
  

The of() method generates an observable sequence from the arguments we add, then it automatically completes. Each argument you pass in the method becomes a next notification.

Observables generated from arguments passed in of() RxJS method

Workers (Observers)

Workers at different stations observe the items passing through and may perform additional actions based on what they see. Similarly, in Angular, components or services can act as observers, subscribing to the observable stream and reacting to the emitted data.

An Observer is a consumer of values delivered by an Observable. Actually is an object with three optional methods: next, error, and complete. The next method is called when a new value is emitted by the Observable, the error method handles any errors that occur during the observable's lifecycle (Quality Control), and the complete method is called when the observable completes, indicating that no more values will be emitted.

    
    const observer = {
      next: x => console.log('Observer got a next value: ' + x),
      error: err => console.error('Observer got an error: ' + err),
      complete: () => console.log('Observer got a complete notification'),
    }

What Is A Subscription?

When you subscribe to an observable, you create a connection between the observer (the entity interested in the data) and the observable (the data source). The subscription is essentially the link between the observer and the observable, establishing the means for the observer to receive and react to the emitted values.


    observable.subscribe(observer);        
	

An Observer may also be partial, you don't need to include all three callbacks (next, error and complete). You may pass the Observer directly into the subscription as an arrow function.


    observable.subscribe(x => console.log('Observer got a next value: ' + x));        
	

You can also leave the subscribe arguments blank and not pass any callbacks.


    observable.subscribe();        
  

The subscription will still work, however it will not show anything as there is no method to work with the data, like the console.log() we had at the next callback.

Production Stations (Operators)

The conveyor belt passes through various production stations, each responsible for a specific task in the manufacturing process. Similarly, operators in RxJS, such as tap, map, or filter, act as production stations modifying or processing the data as it moves through the observable stream.

An RxJS Operator is a way to handle the data flow from the observable.

RxJS operators serve as essential tools in the Angular framework, allowing developers to efficiently transform and manipulate asynchronous data streams within observables. These operators empower a declarative and concise approach to reactive programming, enhancing the management and control of data flow in Angular applications.

There are two types of RxJS Operators: Pipeable Operators and Creation Operators. We will only focus on Pipeable Operators in this article.

RxJS Pipeable Operators are functions that take an Observable as input and return another Observable. It is a pure operation, the previous Observable stays unmodified.

To use pipeable operators, you have to pass them into the RxJS pipe.


    observable.pipe().subscribe();
  

There are a lot of RxJS Operators: Transformation Operators, Filtering Operators, Error Handling Operators, Utility Operators, s.o., but in this article we will only focus on four essential operators

1. Tap Operator:

The tap operator allows you to perform side effects on each emitted value without modifying the value itself. It's often used for debugging, logging, or performing actions that don't change the data but provide insights into the observable's behavior.

Following our example above with the console.log() passed as a next callback, we can get the same effect by using the tap(). We will pass the tap() function in the pipe and add console.log() inside the tap method as an arrow function.


    observable.pipe(tap( x => console.log('Observer got a next value: ' + x))).subscribe();
  

To use RxJS in Angular Framework, you first need to create a component and create an observable of some kind, in this example we created a series of numbers using of() method.

You can subscribe to RxJS Observables within the Lifecycle Hooks, usually in OnInit. An important part of working with observables is to unsubscribe from it when you finish processing the data, and you can do this by implementing OnDestroy in your component, but we will discuss about that a little later in this article.

You have to import any RxJS operators as well as the lifecycle hooks you use in your component, like of() or tap() in the component's import section: import { of, tap } from 'rxjs';.

Example of using Tap operator in Angular Component:

  import { Component, OnInit } from '@angular/core';
  import { of, tap } from 'rxjs';

  @Component({
    selector: 'app-home',
    templateUrl: './home.component.html',
    styleUrls: ['./home.component.css']
  })
  export class HomeComponent implements OnInit {
    observable = of(1, 2, 3, 4, 5, 6, 7, 8);

    ngOnInit() {
      this.observable
      	.pipe(
            tap( x => console.log('Observer got a next value: ' + x))
        )
        .subscribe(
      );    
    }
  }
  

2. Map Operator:

The map operator transforms the values emitted by an observable using a provided function. It takes each emitted value, applies the transformation logic, and then emits the modified value. This is particularly useful when you need to transform data before it reaches the observer.

Example of using Map operator:

  import { Component, OnInit } from '@angular/core';
  import { map, of, tap } from 'rxjs';

  @Component({
    selector: 'app-home',
    templateUrl: './home.component.html',
    styleUrls: ['./home.component.css']
  })
  export class HomeComponent implements OnInit {
    observable = of(1, 2, 3, 4, 5, 6, 7, 8);

    ngOnInit() {
      this.observable
      	.pipe(
            map( x => x * 2),
            tap( x => console.log('Observer got a next value: ' + x))
        )
        .subscribe();    
    }
  }
  

3. Filter Operator:

The filter operator allows you to selectively emit values based on a provided condition. It acts as a gatekeeper, allowing only values that meet the specified criteria to pass through the observable.

In the following example we add a condition to check if each number emited divides exactly to 2. The outputed observable is a new sequence that contains only the values that pass the condition.

Example of using Filter operator:

  import { Component, OnInit } from '@angular/core';
  import { filter, of, tap } from 'rxjs';

  @Component({
    selector: 'app-home',
    templateUrl: './home.component.html',
    styleUrls: ['./home.component.css']
  })
  export class HomeComponent implements OnInit {
    observable = of(1, 2, 3, 4, 5, 6, 7, 8);

    ngOnInit() {
      this.observable
      	.pipe(
            filter( x => x % 2 === 0),
            tap( x => console.log('Observer got a next value: ' + x))
        )
        .subscribe();    
    }
  }
  

4. Take Operator:

The take operator limits the number of emitted values from an observable. It ensures that only the specified number of values is observed before automatically completing the observable.

The take takes in a number as argument and will only emit that number of values.

Example of using Take operator:

  import { Component, OnInit } from '@angular/core';
  import { of, take, tap } from 'rxjs';

  @Component({
    selector: 'app-home',
    templateUrl: './home.component.html',
    styleUrls: ['./home.component.css']
  })
  export class HomeComponent implements OnInit {
    observable = of(1, 2, 3, 4, 5, 6, 7, 8);

    ngOnInit() {
      this.observable
      	.pipe(
            take(2),
            tap( x => console.log('Observer got a next value: ' + x))
        )
        .subscribe();    
    }
  }
  

In this example, only the first two values are emitted, after that the observable is completed.

Preventing Infinite Loops - unsubscribe()

In the examples we used so far we used a finite observables generator, like of(), where we introduced a small set of numbers as argument to the of() method. But let's say you have to work with an infinite flow of data, like one that is time generated. Let's create an observable by using interval() method from RxJS:


  import { Component, OnInit } from '@angular/core';
  import { interval, map, of, tap } from 'rxjs';

  @Component({
    selector: 'app-home',
    templateUrl: './home.component.html',
    styleUrls: ['./home.component.css']
  })
  export class HomeComponent implements OnInit {
    // observable = of(1, 2, 3, 4, 5, 6, 7, 8);
    observable = interval(1000);

    ngOnInit() {
      this.observable
      .pipe(
        tap( x => console.log('Observer got a next value: ' + x))).subscribe(
      );    
    }
  }
  

The interval() method emits sequential numbers at the interval you specify (in miliseconds) as an argument. We passed 1000 as an argument, meaning that every second the method will emit a new number.

If you test this code, you will see in the console that every second a new number is generated and this goes on and on as long as this window is open, even if you navigate to other sections of your angular application. Imagine you have, 2-3 subscriptions in each component, in a very short time this will generate a huge ammount of useless data that will slow down the performance and take valuable resources.

That's why is so important to stop the observable and unsubscribe from it as soon as you finished working with the data.

To be able to unsubscribe from the observable, we have to do the things a little different and attach the subscription to a variable, so that we can unsubscribe at the end.


  import { Component, OnDestroy, OnInit } from '@angular/core';
  import { Subscription, interval, map, of, tap } from 'rxjs';

  @Component({
    selector: 'app-home',
    templateUrl: './home.component.html',
    styleUrls: ['./home.component.css']
  })
  export class HomeComponent implements OnInit, OnDestroy {
    
    observable = interval(1000);
    sub!: Subscription;

    ngOnInit() {
      this.sub = this.observable
      .pipe(
        tap( x => console.log('Observer got a next value: ' + x))).subscribe(
      );    
    }
    
    ngOnDestroy () {
    	this.sub.unsubscribe();
        console.log('Unsubscribed');
    }
  }
  

In the example above, in the moment you are triggering OnDestroy, the component unsubscribes from the observable and data is no longer emited to this observer. Please note that OnDestroy is triggered when you move away from your component to another one by using <router-outlet> for example.

RxJS operators provide a powerful toolkit for working with asynchronous data streams in a reactive manner. Understanding observables, subscriptions, and observers lays the foundation for effective use of these operators. The map, tap, filter, and take operators, among others, enable developers to manipulate, filter, and control the flow of data within their applications, leading to more maintainable and expressive code. As you delve deeper into RxJS, these concepts and operators will become essential tools in your toolkit for building reactive and efficient JavaScript applications.

No comments