Lab 03: Streams Advanced

Time: 30 minutes | Level: Advanced | Docker: docker run -it --rm node:20-alpine sh

Overview

Deep dive into advanced stream patterns: object mode streams, Transform stream implementation, stream.compose, WHATWG Web Streams API, error handling, and highWaterMark tuning.


Step 1: Object Mode Streams

const { Readable, Writable, Transform } = require('node:stream');

// Object mode: pass JS objects instead of Buffers
const objectReadable = Readable.from([
  { id: 1, name: 'Alice', score: 95 },
  { id: 2, name: 'Bob', score: 72 },
  { id: 3, name: 'Charlie', score: 88 }
]); // Readable.from creates object mode automatically for non-strings

// Object mode Transform
class ScoreNormalizer extends Transform {
  constructor() { super({ objectMode: true }); }

  _transform(record, enc, cb) {
    cb(null, {
      ...record,
      grade: record.score >= 90 ? 'A' : record.score >= 80 ? 'B' : 'C',
      normalized: (record.score / 100).toFixed(2)
    });
  }
}

// Object mode Writable
const results = [];
const collector = new Writable({
  objectMode: true,
  write(obj, enc, cb) { results.push(obj); cb(); }
});

objectReadable
  .pipe(new ScoreNormalizer())
  .pipe(collector);

collector.on('finish', () => {
  console.log('Processed:', JSON.stringify(results, null, 2));
});

Step 2: Custom Transform Streams


Step 3: stream.compose


Step 4: WHATWG Web Streams


Step 5: Error Handling in Streams


Step 6: highWaterMark Tuning


Step 7: Stream Performance Patterns


Step 8: Capstone — Object Mode Pipeline

Run verification:

📸 Verified Output:


Summary

Feature
API
Use Case

Object mode

{ objectMode: true }

Stream JS objects

Custom Transform

_transform(chunk, enc, cb)

Data transformation

_flush

Override in Transform

Process remaining buffer

stream.compose

compose(t1, t2, t3)

Combine streams

WHATWG streams

ReadableStream, TransformStream

Browser-compatible API

stream.pipeline

pipeline(r, t, w, cb)

Error-safe piping

highWaterMark

Constructor option

Buffer size tuning

Last updated