๐ญ RxJS Operators: The Magic Factory Workers
Imagine a toy factory where raw materials flow through different machines. Each machine does ONE special job โ painting, shaping, sorting, or packaging. RxJS operators are exactly like these machines! Data flows in, gets transformed, and flows out ready for use.
๐ The River of Data
Think of your app as a river. Data flows through it like water. Observables are the river itself. Operators are the special machines along the riverbank that:
- Create new streams of water
- Transform the water (maybe add color!)
- Filter out the dirty bits
- Combine multiple streams into one
- Handle problems when the flow breaks
Letโs meet each type of factory worker!
๐ญ 1. Creation Operators โ The Spring Makers
Creation operators start a new stream of data from scratch. Theyโre like opening a tap!
of() โ The Instant Deliverer
Wraps values and sends them one by one, then says โdone!โ
import { of } from 'rxjs';
of('๐', '๐', '๐').subscribe(
fruit => console.log(fruit)
);
// Output: ๐ ๐ ๐
from() โ The Array Unpacker
Takes a collection and sends each item separately.
import { from } from 'rxjs';
from([1, 2, 3]).subscribe(
num => console.log(num)
);
// Output: 1 2 3
interval() โ The Ticking Clock
Sends a number every X milliseconds, forever.
import { interval } from 'rxjs';
interval(1000).subscribe(
count => console.log(count)
);
// Output: 0 1 2 3 ... (every second)
timer() โ The Delayed Start
Waits, then ticks. Like setting an alarm.
import { timer } from 'rxjs';
timer(2000, 1000).subscribe(
count => console.log(count)
);
// Waits 2s, then: 0 1 2 ... (every 1s)
fromEvent() โ The Event Listener
Turns DOM events into a stream.
import { fromEvent } from 'rxjs';
fromEvent(button, 'click').subscribe(
() => console.log('Clicked!')
);
๐ 2. Transformation Operators โ The Shape Shifters
These operators change whatโs flowing through the stream.
map() โ The Translator
Changes each value into something new.
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(n => n * 10)
).subscribe(console.log);
// Output: 10 20 30
pluck() โ The Property Picker
Grabs one property from objects.
import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';
of(
{ name: 'Ana', age: 25 },
{ name: 'Bob', age: 30 }
).pipe(
pluck('name')
).subscribe(console.log);
// Output: Ana Bob
scan() โ The Running Total Keeper
Like reduce(), but shows every step.
import { of } from 'rxjs';
import { scan } from 'rxjs/operators';
of(1, 2, 3, 4).pipe(
scan((total, n) => total + n, 0)
).subscribe(console.log);
// Output: 1 3 6 10
reduce() โ The Final Answer
Collects everything, emits only the final result.
import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';
of(1, 2, 3, 4).pipe(
reduce((sum, n) => sum + n, 0)
).subscribe(console.log);
// Output: 10
๐ฆ 3. Filtering Operators โ The Bouncers
These let only certain values through.
filter() โ The Gatekeeper
Only passes values that meet your rule.
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';
of(1, 2, 3, 4, 5).pipe(
filter(n => n % 2 === 0)
).subscribe(console.log);
// Output: 2 4
take() โ The Counter
Takes only the first N values, then stops.
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
interval(500).pipe(
take(3)
).subscribe(console.log);
// Output: 0 1 2 (then completes)
takeUntil() โ The Until-Stopper
Keeps taking until another observable fires.
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
interval(500).pipe(
takeUntil(timer(2000))
).subscribe(console.log);
// Output: 0 1 2 (stops at 2 seconds)
first() & last() โ The Edge Catchers
Grab just the first or last value.
of('a', 'b', 'c').pipe(first())
.subscribe(console.log); // a
of('a', 'b', 'c').pipe(last())
.subscribe(console.log); // c
distinctUntilChanged() โ The Duplicate Detector
Ignores value if itโs the same as previous.
of(1, 1, 2, 2, 3, 1).pipe(
distinctUntilChanged()
).subscribe(console.log);
// Output: 1 2 3 1
debounceTime() โ The Wait-and-See
Waits for silence before passing the last value.
fromEvent(input, 'keyup').pipe(
debounceTime(300)
).subscribe(/* fires 300ms after typing stops */);
๐ข 4. Higher-Order Mapping โ The Stream Managers
Sometimes one value creates a whole new stream. These operators handle streams-within-streams!
The Big Picture
Outer Stream: --A------B------C-->
| | |
Inner Streams: โผa1-a2 โผb1-b2 โผc1-c2
Higher-order operators decide how to handle overlapping inner streams.
๐บ๏ธ 5. Mapping Operators Compared
Hereโs the secret sauce โ choosing the RIGHT one!
mergeMap โ The Juggler ๐คน
Runs all inner streams at the same time.
clicks.pipe(
mergeMap(() => http.get('/data'))
).subscribe(console.log);
// 3 clicks = 3 parallel requests
Use when: Order doesnโt matter, run everything!
switchMap โ The Channel Changer ๐บ
Cancels the previous stream when a new one starts.
searchInput.pipe(
debounceTime(300),
switchMap(term => http.get(`/search?q=${term}`))
).subscribe(console.log);
// Only latest search matters!
Use when: Only the latest matters (search, autocomplete).
concatMap โ The Polite Queue ๐ถโโ๏ธ
Waits for each stream to finish before starting next.
clicks.pipe(
concatMap(() => timer(1000))
).subscribe(console.log);
// Each click waits its turn
Use when: Order matters, one at a time.
exhaustMap โ The Busy Bouncer ๐
Ignores new requests while one is in progress.
submitBtn.pipe(
exhaustMap(() => http.post('/save'))
).subscribe(console.log);
// No double-submits!
Use when: Prevent duplicate actions.
๐ Quick Comparison Table
| Operator | Parallel? | Cancels? | Queues? | Best For |
|---|---|---|---|---|
mergeMap |
โ | โ | โ | Parallel work |
switchMap |
โ | โ | โ | Latest only |
concatMap |
โ | โ | โ | Sequential |
exhaustMap |
โ | โ | โ | No overlaps |
๐ค 6. Combination Operators โ The Team Builders
Merge multiple streams into one!
combineLatest() โ The Latest Combo
Emits when ANY source emits, combining latest from each.
import { combineLatest, of } from 'rxjs';
combineLatest([
of('Hello'),
of('World')
]).subscribe(console.log);
// Output: ['Hello', 'World']
merge() โ The Race Track
All streams race together, first come first served.
import { merge, interval } from 'rxjs';
import { map, take } from 'rxjs/operators';
merge(
interval(500).pipe(map(() => 'A'), take(2)),
interval(300).pipe(map(() => 'B'), take(2))
).subscribe(console.log);
// Output: B A B A
forkJoin() โ The Group Photo
Waits for ALL streams to complete, then emits last values.
import { forkJoin } from 'rxjs';
forkJoin([
http.get('/user'),
http.get('/posts')
]).subscribe(([user, posts]) => {
console.log('All loaded!', user, posts);
});
zip() โ The Pair Matcher
Pairs values by their index, waits for a match.
import { zip, of } from 'rxjs';
zip(
of('๐', '๐', '๐ฎ'),
of('Cola', 'Fanta', 'Sprite')
).subscribe(console.log);
// ['๐', 'Cola'] ['๐', 'Fanta'] ...
withLatestFrom() โ The Snapshot
Main stream grabs the latest from another.
clicks.pipe(
withLatestFrom(currentUser$)
).subscribe(([click, user]) => {
console.log('Clicked by', user);
});
๐จ 7. Error Handling Operators โ The Safety Net
Errors happen. These operators keep your stream alive!
catchError() โ The Fixer
Catches errors and returns a backup value or stream.
http.get('/data').pipe(
catchError(err => {
console.log('Oops!', err);
return of({ fallback: true });
})
).subscribe(console.log);
retry() โ The Try-Again
Retries the source N times before giving up.
http.get('/flaky-api').pipe(
retry(3)
).subscribe(console.log);
// Tries up to 4 times total
retryWhen() โ The Smart Retry
Custom retry logic with delays.
http.get('/api').pipe(
retryWhen(errors =>
errors.pipe(delay(1000), take(3))
)
).subscribe(console.log);
// Retries 3 times, 1 second apart
finalize() โ The Cleanup Crew
Always runs when stream ends (complete OR error).
http.get('/data').pipe(
finalize(() => hideSpinner())
).subscribe({
next: data => showData(data),
error: err => showError(err)
});
๐ฏ 8. takeUntilDestroyed โ The Angular Guardian
Angular 16+ brought a superhero for cleanup!
The Problem
Components subscribe to observables. When the component dies, subscriptions must die too โ or you get memory leaks!
Old Way (Manual Cleanup)
export class OldComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(console.log);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
New Way (takeUntilDestroyed) โจ
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
@Component({...})
export class NewComponent {
constructor() {
interval(1000).pipe(
takeUntilDestroyed()
).subscribe(console.log);
// Auto-cleanup when component dies!
}
}
Rules for takeUntilDestroyed()
- Call it in constructor or field initializer (not ngOnInit!)
- Or pass
DestroyRefexplicitly:
export class MyComponent {
private destroyRef = inject(DestroyRef);
ngOnInit() {
interval(1000).pipe(
takeUntilDestroyed(this.destroyRef)
).subscribe(console.log);
}
}
๐งฉ Putting It All Together
graph TD A["Data Source"] --> B["Creation Operator"] B --> C["Transform with map/scan"] C --> D["Filter unwanted values"] D --> E{Higher-Order?} E -->|Yes| F["switchMap/mergeMap/etc"] E -->|No| G["Combine with others"] F --> G G --> H["Handle Errors"] H --> I["takeUntilDestroyed"] I --> J["Subscribe!"]
๐ You Made It!
You now understand the factory workers of RxJS:
| Worker Type | Job | Example Operators |
|---|---|---|
| ๐ญ Creation | Start streams | of, from, interval |
| ๐ Transform | Change values | map, scan, pluck |
| ๐ฆ Filter | Block values | filter, take, debounceTime |
| ๐ข Higher-Order | Handle streams | switchMap, mergeMap |
| ๐ค Combine | Merge streams | combineLatest, forkJoin |
| ๐จ Errors | Stay safe | catchError, retry |
| ๐ฏ Cleanup | No leaks | takeUntilDestroyed |
Remember: Operators are just functions that take a stream in and return a new stream out. Chain them like LEGO blocks to build exactly what you need!
โRxJS is like having a superpower for managing async data. Once you understand operators, you can handle anything!โ
