This final installment explores advanced Node.js patterns and techniques to elevate your applications to production-grade quality.
1. Advanced Asynchronous Patterns
Promise Pooling
const { PromisePool } = require('@supercharge/promise-pool');
async function processBatch(items) {
const { results, errors } = await PromisePool
.withConcurrency(5)
.for(items)
.process(async (item) => {
return await processItem(item);
});
console.log(`Processed ${results.length} items`);
console.log(`${errors.length} failed`);
}
// Alternative with vanilla Node.js
const workerPool = require('workerpool');
const pool = workerPool.pool();
await Promise.all(
items.map(item =>
pool.exec('processItem', [item])
)
);
Cancelable Async Flows
const { EventEmitter } = require('events');
class CancelableOperation extends EventEmitter {
constructor() {
super();
this.canceled = false;
}
async execute() {
try {
await this.step1();
if (this.canceled) throw new Error('Canceled');
await this.step2();
if (this.canceled) throw new Error('Canceled');
this.emit('complete');
} catch (err) {
this.emit('error', err);
}
}
cancel() {
this.canceled = true;
}
}
// Usage
const op = new CancelableOperation();
op.execute();
op.on('complete', () => console.log('Done'));
setTimeout(() => op.cancel(), 1000);
2. Stream Processing Mastery
High-Performance Pipeline
const { pipeline } = require('stream');
const { createGzip } = require('zlib');
const { createReadStream, createWriteStream } = require('fs');
// Error-handled pipeline
pipeline(
createReadStream('input.csv'),
transformToJSON(),
createGzip(),
createWriteStream('output.json.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
// Custom transform stream
function transformToJSON() {
const { Transform } = require('stream');
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const data = parseCSV(chunk);
this.push(JSON.stringify(data));
callback();
} catch (err) {
callback(err);
}
}
});
}
3. Advanced Event Emitter Patterns
Typed Events
const { EventEmitter } = require('events');
const emitter = new EventEmitter();
// Type-safe emitter
function emit(
event: T,
...args: T extends keyof Events ? Events[T] : any
) {
return emitter.emit(event, ...args);
}
// Usage
emit('userCreated', { id: 1, name: 'John' }); // OK
emit('userCreated', 'invalid'); // Type error
Async Event Handling
const { once } = require('events');
// Wait for event with timeout
async function waitForEvent() {
try {
const [data] = await once(emitter, 'data', {
signal: AbortSignal.timeout(5000)
});
console.log('Received:', data);
} catch (err) {
if (err.name === 'AbortError') {
console.log('Timeout waiting for event');
} else {
throw err;
}
}
}
// Multiple events
const events = ['connect', 'error'];
const [event, args] = await Promise.race(
events.map(e => once(emitter, e))
);
4. Performance Optimization
V8 Optimization Tips
- Hidden Classes: Maintain consistent property order
- Inline Caching: Use monomorphic functions
- Memory Management: Reuse objects when possible
- Optimization Killers: Avoid
try-catch
in hot paths
Benchmarking
const benchmark = require('benchmark');
const suite = new benchmark.Suite();
suite
.add('RegExp#test', () => /o/.test('Hello World!'))
.add('String#indexOf', () => 'Hello World!'.indexOf('o') > -1)
.on('cycle', event => console.log(String(event.target)))
.on('complete', function() {
console.log('Fastest is ' + this.filter('fastest').map('name'));
})
.run({ 'async': true });
5. Advanced Module Patterns
Dynamic Module Loading
// ESM dynamic import
async function loadModule(moduleName) {
try {
const module = await import(`./modules/${moduleName}.js`);
return module;
} catch (err) {
console.error('Failed to load module:', err);
return null;
}
}
// CommonJS require with cache busting
function requireUncached(module) {
delete require.cache[require.resolve(module)];
return require(module);
}
Module Factory
function createDatabase(config) {
let connection;
function connect() {
connection = createConnection(config);
}
function query(sql) {
return connection.execute(sql);
}
return {
connect,
query
};
}
// Usage
const db = createDatabase({ host: 'localhost' });
db.connect();
6. Cluster Mode Optimization
Advanced Clustering
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Fork workers
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
// Handle messages from worker
worker.on('message', (msg) => {
if (msg.type === 'metrics') {
trackWorkerMetrics(worker.id, msg.data);
}
});
}
cluster.on('exit', (worker) => {
console.log(`Worker ${worker.id} died`);
cluster.fork(); // Replace dead worker
});
} else {
// Worker process
process.on('message', (msg) => {
if (msg === 'shutdown') {
gracefulShutdown();
}
});
// Send metrics to master
setInterval(() => {
process.send({
type: 'metrics',
data: getCurrentMetrics()
});
}, 5000);
}
7. Emerging Node.js Features
ES Modules
// package.json
{
"type": "module",
"exports": {
".": "./src/main.js",
"./utils": "./src/utils.js"
}
}
// main.js
import { readFile } from 'node:fs/promises';
import { format } from './utils.js';
const data = await readFile('data.json');
console.log(format(data));
Web Streams API
import { ReadableStream } from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('Hello');
controller.enqueue('World');
controller.close();
}
});
const transformed = readable.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
})
);
for await (const chunk of transformed) {
console.log(chunk); // HELLO, WORLD
}
Final Thoughts
Congratulations on completing this comprehensive Node.js series! You’ve journeyed from fundamental concepts to advanced production-ready patterns. The Node.js ecosystem continues to evolve, so keep exploring new features and best practices.