Lab 06: Streams Advanced
Step 1: Stream Fundamentals Review
Producer → [Readable buffer] → Transform → [Writable buffer] → Consumer
↑
backpressure signal: "drain" event
when writable buffer full, pause readableStep 2: Implementing a Transform Stream
// file: transform-demo.js
const { Transform, Readable, pipeline } = require('stream');
const { promisify } = require('util');
const pipelineAsync = promisify(pipeline);
// Transform: uppercase converter
class UpperCaseTransform extends Transform {
constructor(options = {}) {
super({ ...options, decodeStrings: false });
this.byteCount = 0;
}
_transform(chunk, encoding, callback) {
const str = chunk.toString();
this.byteCount += str.length;
this.push(str.toUpperCase());
callback(); // signal ready for more data
}
_flush(callback) {
// Called when source ends — emit final data
this.push(`\n[Total bytes processed: ${this.byteCount}]`);
callback();
}
}
// Transform: word counter
class WordCountTransform extends Transform {
constructor() {
super({ readableObjectMode: true }); // output objects
this.words = 0;
this.buffer = '';
}
_transform(chunk, enc, cb) {
this.buffer += chunk.toString();
cb();
}
_flush(cb) {
this.words = this.buffer.split(/\s+/).filter(Boolean).length;
this.push({ wordCount: this.words, charCount: this.buffer.length });
cb();
}
}
async function main() {
let output = '';
const upper = new UpperCaseTransform({ highWaterMark: 16 });
const sink = new Transform({
transform(chunk, enc, cb) { output += chunk.toString(); cb(); }
});
const input = Readable.from(['hello ', 'world ', 'from ', 'streams']);
await pipelineAsync(input, upper, sink);
console.log('Transformed output:', output.trim());
}
main();Step 3: Backpressure Deep Dive
Step 4: stream.pipeline — The Right Way
stream.pipeline — The Right WayStep 5: stream.compose — Stream Composition
stream.compose — Stream CompositionStep 6: Web Streams API (Node 18+)
Step 7: Async Iteration over Streams
Step 8: Capstone — Streaming Data Pipeline
Summary
Concept
API
Key Detail
Last updated
