Advanced Streams

Loading concept...

🌊 Advanced Streams in Node.js

The Water Park Adventure

Imagine you’re at the world’s greatest water park. Water flows through amazing slides, tunnels, and pools. Some slides go one way. Some let you float both directions. Some transform the water into bubbles or colors!

Streams in Node.js work exactly like this water park. Data flows like water through pipes, gets transformed, splits, merges, and arrives at exciting destinations.

Let’s explore every ride in this park!


🔄 Duplex Streams: The Two-Way Tunnel

What Is It?

A Duplex stream is like a water tunnel where water flows both ways at the same time. You can push water in AND pull water out.

Real Life Example: Think of a phone call. You can talk AND listen at the same time. That’s duplex!

When Do We Use It?

  • Network sockets (talking to servers)
  • WebSockets (real-time chat)
  • Any time you need to read AND write

Simple Example

const { Duplex } = require('stream');

const tunnel = new Duplex({
  read(size) {
    // Water coming OUT
    this.push('Hello from tunnel!');
    this.push(null); // No more water
  },
  write(chunk, encoding, callback) {
    // Water going IN
    console.log('Received:', chunk.toString());
    callback();
  }
});

// Use the tunnel both ways
tunnel.on('data', (data) => {
  console.log('Got:', data.toString());
});

tunnel.write('Hi tunnel!');

How It Flows

graph TD A[Your Code] -->|write| B[Duplex Stream] B -->|read| A B -->|Both directions!| C[Other Side] C -->|Both directions!| B

Key Point: Duplex = Read + Write happening together, but they’re independent. The data you write doesn’t automatically come out the read side.


🎨 Transform Streams: The Magic Color Changer

What Is It?

A Transform stream is a special Duplex stream. But here’s the magic: what goes in gets changed before coming out!

Real Life Example: Imagine a slide that turns regular water into rainbow water. You pour in clear water, rainbow water comes out!

When Do We Use It?

  • Converting data formats (JSON to CSV)
  • Compressing files (zip)
  • Encrypting messages
  • Changing text (uppercase, lowercase)

Simple Example

const { Transform } = require('stream');

const shoutMaker = new Transform({
  transform(chunk, encoding, callback) {
    // Take the water, make it LOUD!
    const loud = chunk.toString().toUpperCase();
    callback(null, loud + '!');
  }
});

// Let's try it
process.stdin
  .pipe(shoutMaker)
  .pipe(process.stdout);

// Type "hello" → See "HELLO!"

How It Flows

graph TD A[Input: hello] -->|enters| B[Transform Stream] B -->|magic happens| C[Output: HELLO!]

Key Point: Transform = Duplex where write and read are connected. Data goes in, gets changed, comes out different!


🛠️ Implementing Custom Streams

Building Your Own Water Slide!

You can create your own stream types. It’s like designing your own water slide!

Custom Readable Stream

const { Readable } = require('stream');

class NumberStream extends Readable {
  constructor() {
    super();
    this.current = 1;
  }

  _read() {
    if (this.current <= 5) {
      this.push(String(this.current));
      this.current++;
    } else {
      this.push(null); // Done!
    }
  }
}

const numbers = new NumberStream();
numbers.on('data', (n) => {
  console.log('Number:', n.toString());
});
// Prints: 1, 2, 3, 4, 5

Custom Writable Stream

const { Writable } = require('stream');

class Logger extends Writable {
  _write(chunk, encoding, callback) {
    console.log('📝', chunk.toString());
    callback();
  }
}

const logger = new Logger();
logger.write('Hello!');
logger.write('World!');
// Prints: 📝 Hello! 📝 World!

Custom Transform Stream

const { Transform } = require('stream');

class Doubler extends Transform {
  _transform(chunk, encoding, callback) {
    const num = parseInt(chunk.toString());
    callback(null, String(num * 2) + '\n');
  }
}

const doubler = new Doubler();
doubler.write('5');  // Outputs: 10
doubler.write('10'); // Outputs: 20

🔗 Stream Piping and Composition

Connecting the Slides!

Piping is like connecting water slides together. Water flows from one slide to the next, automatically!

Basic Piping

const fs = require('fs');
const zlib = require('zlib');

// Read file → Compress → Write file
fs.createReadStream('big-file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('big-file.txt.gz'));

Chaining Multiple Transforms

const { Transform } = require('stream');

const addExclaim = new Transform({
  transform(chunk, encoding, cb) {
    cb(null, chunk.toString() + '!');
  }
});

const uppercase = new Transform({
  transform(chunk, encoding, cb) {
    cb(null, chunk.toString().toUpperCase());
  }
});

// Chain them: hello → HELLO → HELLO!
process.stdin
  .pipe(uppercase)
  .pipe(addExclaim)
  .pipe(process.stdout);

The Flow

graph TD A[Source Stream] -->|pipe| B[Transform 1] B -->|pipe| C[Transform 2] C -->|pipe| D[Destination]

Key Point: Each .pipe() connects one stream to the next. Data flows automatically through all of them!


🛡️ stream.pipeline Utility

The Safe Water Park Manager

Remember piping? It’s great, but has a problem. If something breaks, water leaks everywhere! Errors don’t get handled well.

stream.pipeline() is like a safety manager. It:

  • Connects all streams properly
  • Cleans up if something fails
  • Tells you when everything is done

The Old Way (Risky!)

// If gzip fails, file stays open! 😱
fs.createReadStream('file.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('file.gz'));

The Safe Way (Use This!)

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('file.txt'),
  zlib.createGzip(),
  fs.createWriteStream('file.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed!', err);
    } else {
      console.log('Pipeline done!');
    }
  }
);

With Promises (Even Better!)

const { pipeline } = require('stream/promises');

async function compress() {
  try {
    await pipeline(
      fs.createReadStream('file.txt'),
      zlib.createGzip(),
      fs.createWriteStream('file.gz')
    );
    console.log('Done!');
  } catch (err) {
    console.error('Failed:', err);
  }
}

Key Point: Always use pipeline() instead of chained .pipe(). It’s safer and cleaner!


⚖️ Backpressure Handling

Don’t Overflow the Pool!

Imagine pouring water into a pool faster than it can drain. The pool overflows! That’s bad.

Backpressure is when a stream says “Slow down! I can’t handle this much data!”

The Problem

const readable = getHugeDataStream();
const writable = getSlowWriter();

// If writable is slow, data piles up in memory!
readable.pipe(writable);

How Backpressure Works

graph TD A[Fast Reader] -->|sends data| B[Buffer] B -->|"Buffer full!"| A A -->|pauses| A B -->|drains| C[Slow Writer] C -->|"Ready!"| B B -->|resume| A

Manual Backpressure

const fs = require('fs');

const readable = fs.createReadStream('huge.txt');
const writable = fs.createWriteStream('copy.txt');

readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Buffer is full! Pause reading.
    readable.pause();
  }
});

writable.on('drain', () => {
  // Buffer emptied! Resume reading.
  readable.resume();
});

The Easy Way

Good news! .pipe() handles backpressure automatically. That’s why we use it!

// Backpressure handled for you!
readable.pipe(writable);

Key Point: Backpressure prevents memory overflow. Use pipe() or pipeline() to handle it automatically!


🎁 Readable.from Utility

Instant Water From Anything!

Readable.from() is magical. It turns arrays, strings, or any iterable into a readable stream instantly!

From an Array

const { Readable } = require('stream');

const stream = Readable.from(['a', 'b', 'c']);

stream.on('data', (chunk) => {
  console.log(chunk.toString());
});
// Prints: a, b, c

From a String

const { Readable } = require('stream');

const stream = Readable.from('Hello World');

stream.on('data', (chunk) => {
  console.log(chunk.toString());
});
// Prints: Hello World

From a Generator

const { Readable } = require('stream');

function* numbers() {
  yield '1';
  yield '2';
  yield '3';
}

const stream = Readable.from(numbers());
stream.on('data', (n) => console.log(n));
// Prints: 1, 2, 3

From an Async Generator

const { Readable } = require('stream');

async function* fetchData() {
  yield 'Loading...';
  yield 'Processing...';
  yield 'Done!';
}

const stream = Readable.from(fetchData());

Key Point: Readable.from() converts any iterable to a stream. Super handy!


🔁 Async Iterators with Streams

The Modern Way to Read Water!

Streams are async iterables. This means you can use for await...of to read them!

The Old Way

const fs = require('fs');

const stream = fs.createReadStream('file.txt');

stream.on('data', (chunk) => {
  console.log(chunk.toString());
});

stream.on('end', () => {
  console.log('Done!');
});

stream.on('error', (err) => {
  console.error(err);
});

The Modern Way (Much Cleaner!)

const fs = require('fs');

async function readFile() {
  const stream = fs.createReadStream('file.txt');

  for await (const chunk of stream) {
    console.log(chunk.toString());
  }

  console.log('Done!');
}

readFile().catch(console.error);

Processing Line by Line

const fs = require('fs');
const readline = require('readline');

async function processLines() {
  const stream = fs.createReadStream('log.txt');
  const lines = readline.createInterface({
    input: stream
  });

  for await (const line of lines) {
    console.log('Line:', line);
  }
}

Combining with Transform

const { Readable, Transform } = require('stream');

const numbers = Readable.from([1, 2, 3, 4, 5]);

const doubler = new Transform({
  objectMode: true,
  transform(n, enc, cb) {
    cb(null, n * 2);
  }
});

async function main() {
  const doubled = numbers.pipe(doubler);

  for await (const n of doubled) {
    console.log(n); // 2, 4, 6, 8, 10
  }
}

main();

Flow Diagram

graph TD A[Stream] -->|for await| B[Your Code] B -->|chunk 1| C[Process] B -->|chunk 2| C B -->|chunk n| C C -->|All done!| D[Continue]

Key Point: for await...of is the cleanest way to consume streams in modern Node.js!


🎯 Quick Reference

Stream Type Direction Data Connection
Readable Out only Source of data
Writable In only Destination
Duplex Both ways Independent
Transform Both ways Connected (in→out)

🏆 Summary: Your Water Park Map

  1. Duplex = Two-way tunnel (read + write independent)
  2. Transform = Magic changer (input transforms to output)
  3. Custom Streams = Build your own slides!
  4. Piping = Connect streams together
  5. pipeline() = Safe way to connect (use this!)
  6. Backpressure = Don’t overflow! (pipe handles it)
  7. Readable.from() = Instant stream from arrays
  8. Async Iterators = Modern for await consumption

You’re now a Stream Master! Go build amazing data flows! 🌊✨

Loading story...

No Story Available

This concept doesn't have a story yet.

Story Preview

Story - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

Interactive Preview

Interactive - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

No Interactive Content

This concept doesn't have interactive content yet.

Cheatsheet Preview

Cheatsheet - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

No Cheatsheet Available

This concept doesn't have a cheatsheet yet.

Quiz Preview

Quiz - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

No Quiz Available

This concept doesn't have a quiz yet.