Lab 06: Streams Advanced

Time: 60 minutes | Level: Architect | Docker: docker run -it --rm node:20-alpine sh

Streams are Node.js's most powerful abstraction for handling data over time. This lab covers Transform streams, backpressure mechanics, stream.pipeline, stream.compose, and the Web Streams API.


Step 1: Stream Fundamentals Review

Four stream types:

  • Readable: source of data (file read, HTTP request)

  • Writable: sink for data (file write, HTTP response)

  • Duplex: both readable and writable (TCP socket)

  • Transform: duplex with transformation logic (gzip, encryption)

Data flow with backpressure:

Producer → [Readable buffer] → Transform → [Writable buffer] → Consumer

                              backpressure signal: "drain" event
                              when writable buffer full, pause readable

Step 2: Implementing a Transform Stream

// file: transform-demo.js
const { Transform, Readable, pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);

// Transform: uppercase converter
class UpperCaseTransform extends Transform {
  constructor(options = {}) {
    super({ ...options, decodeStrings: false });
    this.byteCount = 0;
  }

  _transform(chunk, encoding, callback) {
    const str = chunk.toString();
    this.byteCount += str.length;
    this.push(str.toUpperCase());
    callback(); // signal ready for more data
  }

  _flush(callback) {
    // Called when source ends — emit final data
    this.push(`\n[Total bytes processed: ${this.byteCount}]`);
    callback();
  }
}

// Transform: word counter
class WordCountTransform extends Transform {
  constructor() {
    super({ readableObjectMode: true }); // output objects
    this.words = 0;
    this.buffer = '';
  }

  _transform(chunk, enc, cb) {
    this.buffer += chunk.toString();
    cb();
  }

  _flush(cb) {
    this.words = this.buffer.split(/\s+/).filter(Boolean).length;
    this.push({ wordCount: this.words, charCount: this.buffer.length });
    cb();
  }
}

async function main() {
  let output = '';
  const upper = new UpperCaseTransform({ highWaterMark: 16 });
  const sink = new Transform({
    transform(chunk, enc, cb) { output += chunk.toString(); cb(); }
  });

  const input = Readable.from(['hello ', 'world ', 'from ', 'streams']);
  await pipelineAsync(input, upper, sink);

  console.log('Transformed output:', output.trim());
}

main();

📸 Verified Output:

💡 Always call callback() in _transform to signal you're ready for the next chunk. Forgetting causes stream deadlock.


Step 3: Backpressure Deep Dive

💡 Never ignore the false return value from writable.write(). It means the internal buffer is full — you MUST wait for drain before writing more.


Step 4: stream.pipeline — The Right Way

💡 stream.pipeline automatically handles cleanup: if any stream in the chain errors or ends, ALL streams are destroyed. Unlike manual .pipe() chaining.


Step 5: stream.compose — Stream Composition


Step 6: Web Streams API (Node 18+)

💡 Web Streams are cross-platform (works in browsers too). Node.js can interop: Readable.toWeb() converts a Node stream to a Web ReadableStream.


Step 7: Async Iteration over Streams


Step 8: Capstone — Streaming Data Pipeline

Build a complete streaming ETL pipeline:


Summary

Concept
API
Key Detail

Transform stream

class extends Transform

Implement _transform(chunk, enc, cb)

Flush

_flush(cb)

Called when source ends, emit final data

Backpressure

write() returns false

Wait for drain event

Pipeline

stream.pipeline()

Auto-cleanup on error/end

Compose

stream.compose()

Combine streams into one

Web Streams

ReadableStream, TransformStream

WHATWG standard, cross-platform

Async iteration

for await...of stream

Ergonomic stream consumption

highWaterMark

{ highWaterMark: N }

Buffer size threshold for backpressure

Last updated