Menu
×
     ❯   
HTML CSS JAVASCRIPT SQL PYTHON JAVA PHP HOW TO W3.CSS C C++ C# BOOTSTRAP REACT MYSQL JQUERY EXCEL XML DJANGO NUMPY PANDAS NODEJS DSA TYPESCRIPT ANGULAR GIT POSTGRESQL MONGODB ASP AI R GO KOTLIN SASS VUE GEN AI SCIPY CYBERSECURITY DATA SCIENCE INTRO TO PROGRAMMING BASH RUST

Node.js Tutorial

Node HOME Node Intro Node Get Started Node JS Requirements Node.js vs Browser Node Cmd Line Node V8 Engine Node Architecture Node Event Loop

Asynchronous

Node Async Node Promises Node Async/Await Node Errors Handling

Module Basics

Node Modules Node ES Modules Node NPM Node package.json Node NPM Scripts Node Manage Dep Node Publish Packages

Core Modules

HTTP Module HTTPS Module File System (fs) Path Module OS Module URL Module Events Module Stream Module Buffer Module Crypto Module Timers Module DNS Module Assert Module Util Module Readline Module

JS & TS Features

Node ES6+ Node Process Node TypeScript Node Adv. TypeScript Node Lint & Formatting

Building Applications

Node Frameworks Express.js Middleware Concept REST API Design API Authentication Node.js with Frontend

Database Integration

MySQL Get Started MySQL Create Database MySQL Create Table MySQL Insert Into MySQL Select From MySQL Where MySQL Order By MySQL Delete MySQL Drop Table MySQL Update MySQL Limit MySQL Join
MongoDB Get Started MongoDB Create DB MongoDB Collection MongoDB Insert MongoDB Find MongoDB Query MongoDB Sort MongoDB Delete MongoDB Drop Collection MongoDB Update MongoDB Limit MongoDB Join

Advanced Communication

GraphQL Socket.IO WebSockets

Testing & Debugging

Node Adv. Debugging Node Testing Apps Node Test Frameworks Node Test Runner

Node.js Deployment

Node Env Variables Node Dev vs Prod Node CI/CD Node Security Node Deployment

Perfomance & Scaling

Node Logging Node Monitoring Node Performance Child Process Module Cluster Module Worker Threads

Node.js Advanced

Microservices Node WebAssembly HTTP2 Module Perf_hooks Module VM Module TLS/SSL Module Net Module Zlib Module Real-World Examples

Hardware & IoT

RasPi Get Started RasPi GPIO Introduction RasPi Blinking LED RasPi LED & Pushbutton RasPi Flowing LEDs RasPi WebSocket RasPi RGB LED WebSocket RasPi Components

Node.js Reference

Built-in Modules EventEmitter (events) Worker (cluster) Cipher (crypto) Decipher (crypto) DiffieHellman (crypto) ECDH (crypto) Hash (crypto) Hmac (crypto) Sign (crypto) Verify (crypto) Socket (dgram, net, tls) ReadStream (fs, stream) WriteStream (fs, stream) Server (http, https, net, tls) Agent (http, https) Request (http) Response (http) Message (http) Interface (readline)

Resources & Tools

Node.js Compiler Node.js Server Node.js Quiz Node.js Exercises Node.js Syllabus Node.js Study Plan Node.js Certificate

Node.js Streams


What are Streams?

In Node.js, streams are collections of data, which might not be available in full at once and don't have to fit in memory.

Think of them as conveyor belts that move data from one place to another, allowing you to work with each piece as it arrives rather than waiting for the whole dataset.

Streams are one of Node.js's most powerful features and are used extensively in:

  • File system operations (reading/writing files)
  • HTTP requests and responses
  • Data compression and decompression
  • Database operations
  • Real-time data processing

Getting Started with Streams

Streams are one of the fundamental concepts in Node.js for handling data efficiently.

They allow you to process data in chunks as it becomes available, rather than loading everything into memory at once.

Basic Stream Example

const fs = require('fs');

// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt', 'utf8');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');

// Pipe the data from readable to writable stream
readableStream.pipe(writableStream);

// Handle completion and errors
writableStream.on('finish', () => {
  console.log('File copy completed!');
});

readableStream.on('error', (err) => {
  console.error('Error reading file:', err);
});

writableStream.on('error', (err) => {
  console.error('Error writing file:', err);
});
Run example »

Why Use Streams?

There are several advantages to using streams:

  • Memory Efficiency: Process large files without loading them entirely into memory
  • Time Efficiency: Start processing data as soon as you have it, instead of waiting for all the data
  • Composability: Build powerful data pipelines by connecting streams
  • Better User Experience: Deliver data to users as it becomes available (e.g., video streaming)

Imagine reading a 1GB file on a server with 512MB of RAM:

  • Without streams: You'd crash the process attempting to load the entire file into memory
  • With streams: You process the file in small chunks (e.g., 64KB at a time)

Core Stream Types

Node.js provides four fundamental types of streams, each serving a specific purpose in data handling:

Stream Type Description Common Examples
Readable Streams from which data can be read (data source) fs.createReadStream(), HTTP responses, process.stdin
Writable Streams to which data can be written (data destination) fs.createWriteStream(), HTTP requests, process.stdout
Duplex Streams that are both Readable and Writable TCP sockets, Zlib streams
Transform Duplex streams that can modify or transform data as it's written and read Zlib streams, crypto streams

Note: All streams in Node.js are instances of EventEmitter, which means they emit events that can be listened to and handled.



Readable Streams

Readable streams let you read data from a source. Examples include:

  • Reading from a file
  • HTTP responses on the client
  • HTTP requests on the server
  • process.stdin

Creating a Readable Stream

const fs = require('fs');

// Create a readable stream from a file
const readableStream = fs.createReadStream('myfile.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Events for readable streams
readableStream.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  console.log(chunk);
});

readableStream.on('end', () => {
  console.log('No more data to read.');
});

readableStream.on('error', (err) => {
  console.error('Error reading from stream:', err);
});
Run example »

Reading Modes

Readable streams operate in one of two modes:

  • Flowing Mode: Data is read from the source and provided to your application as quickly as possible using events
  • Paused Mode: You must explicitly call stream.read() to get chunks of data from the stream
const fs = require('fs');

// Paused mode example
const readableStream = fs.createReadStream('myfile.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB chunks
});

// Manually consume the stream using read()
readableStream.on('readable', () => {
  let chunk;
  while (null !== (chunk = readableStream.read())) {
    console.log(`Read ${chunk.length} bytes of data.`);
    console.log(chunk);
  } });

readableStream.on('end', () => {
  console.log('No more data to read.');
});
Run example »

Writable Streams

Writable streams let you write data to a destination. Examples include:

  • Writing to a file
  • HTTP requests on the client
  • HTTP responses on the server
  • process.stdout

Creating a Writable Stream

const fs = require('fs');

// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');

// Write data to the stream
writableStream.write('Hello, ');
writableStream.write('World!');
writableStream.write('\nWriting to a stream is easy!');

// End the stream
writableStream.end();

// Events for writable streams
writableStream.on('finish', () => {
  console.log('All data has been written to the file.');
});

writableStream.on('error', (err) => {
  console.error('Error writing to stream:', err);
});
Run example »

Handling Backpressure

When writing to a stream, if the data is being written faster than it can be processed, backpressure occurs.

The write() method returns a boolean indicating if it's safe to continue writing.

const fs = require('fs');

const writableStream = fs.createWriteStream('output.txt');

function writeData() {
  let i = 100;
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time, close the stream
        writableStream.write('Last chunk!\n');
        writableStream.end();
      } else {
        // Continue writing data
        const data = `Data chunk ${i}\n`;
        // Write and check if we should continue
        ok = writableStream.write(data);
      }
    }
    while (i > 0 && ok);

    if (i > 0) {
      // We need to wait for the drain event before writing more
      writableStream.once('drain', write);
    }
  }
  write();
}

writeData();
writableStream.on('finish', () => {
  console.log('All data written successfully.');
});
Run example »

Pipe

The pipe() method connects a readable stream to a writable stream, automatically managing the flow of data and handling backpressure.

It's the easiest way to consume streams.

const fs = require('fs');

// Create readable and writable streams
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');

// Pipe the readable stream to the writable stream
readableStream.pipe(writableStream);

// Handle completion and errors
readableStream.on('error', (err) => {
  console.error('Read error:', err);
});

writableStream.on('error', (err) => {
  console.error('Write error:', err);
});

writableStream.on('finish', () => {
  console.log('File copy completed!');
});
Run example »

Chaining Pipes

You can chain multiple streams together using pipe().

This is especially useful when working with transform streams.

const fs = require('fs');
const zlib = require('zlib');

// Create a pipeline to read a file, compress it, and write to a new file
fs.createReadStream('source.txt')
  .pipe(zlib.createGzip()) // Compress the data
  .pipe(fs.createWriteStream('destination.txt.gz'))
  .on('finish', () => {
    console.log('File compressed successfully!');
  });
Run example »

Note: The pipe() method returns the destination stream, which enables chaining.


Duplex and Transform Streams

Duplex Streams

Duplex streams are both readable and writable, like a two-way pipe.

A TCP socket is a good example of a duplex stream.

const net = require('net');

// Create a TCP server
const server = net.createServer((socket) => {
  // 'socket' is a duplex stream

  // Handle incoming data (readable side)
  socket.on('data', (data) => {
    console.log('Received:', data.toString());

    // Echo back (writable side)
    socket.write(`Echo: ${data}`);
  });

  socket.on('end', () => {
    console.log('Client disconnected');
  });
});

server.listen(8080, () => {
  console.log('Server listening on port 8080');
});

// To test, you can use a tool like netcat or telnet:
// $ nc localhost 8080
// or create a client:
/*
const client = net.connect({ port: 8080 }, () => {
  console.log('Connected to server');
  client.write('Hello from client!');
});

client.on('data', (data) => {
  console.log('Server says:', data.toString());
  client.end(); // Close the connection
});
*/

Transform Streams

Transform streams are duplex streams that can modify data as it passes through.

They're ideal for processing data in pipelines.

const { Transform } = require('stream');
const fs = require('fs');

// Create a transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // Transform the chunk to uppercase
    const upperChunk = chunk.toString().toUpperCase();
    // Push the transformed data
    this.push(upperChunk);
    // Signal that we're done with this chunk
    callback();
  }
}

// Create an instance of our transform stream
const uppercaseTransform = new UppercaseTransform();

// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt');

// Create a writable stream to a file
const writableStream = fs.createWriteStream('output-uppercase.txt');

// Pipe the data through our transform stream
readableStream
  .pipe(uppercaseTransform)
  .pipe(writableStream)
  .on('finish', () => {
    console.log('Transformation completed!');
  });
Run example »

Stream Events

All streams are instances of EventEmitter and emit several events:

Readable Stream Events

  • data: Emitted when the stream has data available to read
  • end: Emitted when there's no more data to be consumed
  • error: Emitted if an error occurs while reading
  • close: Emitted when the stream's underlying resource has been closed
  • readable: Emitted when data is available to be read

Writable Stream Events

  • drain: Emitted when the stream is ready to accept more data after a write() method has returned false
  • finish: Emitted when all data has been flushed to the underlying system
  • error: Emitted if an error occurs while writing
  • close: Emitted when the stream's underlying resource has been closed
  • pipe: Emitted when the pipe() method is called on a readable stream
  • unpipe: Emitted when the unpipe() method is called on a readable stream

The stream.pipeline() Method

The pipeline() function (available since Node.js v10.0.0) is a more robust way to pipe streams together, especially for error handling.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Create a pipeline that handles errors properly
pipeline(
  fs.createReadStream('source.txt'),
  zlib.createGzip(),
  fs.createWriteStream('destination.txt.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded!');
    }
  }
);
Run example »

Note: pipeline() will properly clean up all the streams if an error occurs in any of them, preventing potential memory leaks.


Object Mode Streams

By default, streams work with strings and Buffer objects.

However, streams can be set to 'object mode' to work with JavaScript objects.

const { Readable, Writable, Transform } = require('stream');

// Create a readable stream in object mode
const objectReadable = new Readable({
  objectMode: true,
  read() {} // Implementation required but can be no-op
});

// Create a transform stream in object mode
const objectTransform = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Add a property to the object
    chunk.transformed = true;
    chunk.timestamp = new Date();
    this.push(chunk);
    callback();
  } });

// Create a writable stream in object mode
const objectWritable = new Writable({
  objectMode: true,
  write(chunk, encoding, callback) {
    console.log('Received object:', chunk);
    callback();
  } });

// Connect the streams
objectReadable
  .pipe(objectTransform)
  .pipe(objectWritable);

// Push some objects to the stream
objectReadable.push({ name: 'Object 1', value: 10 });
objectReadable.push({ name: 'Object 2', value: 20 });
objectReadable.push({ name: 'Object 3', value: 30 });
objectReadable.push(null); // Signal the end of data
Run example »

Advanced Stream Patterns

1. Error Handling with pipeline()

The pipeline() method is the recommended way to handle errors in stream chains:

Example

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

pipeline(
  fs.createReadStream('input.txt'),
  zlib.createGzip(),
  fs.createWriteStream('output.txt.gz'),
  (err) => {
   if (err) {
    console.error('Pipeline failed:', err);
   } else {
    console.log('Pipeline succeeded');
   }
  }
);
Run example »

2. Object Mode Streams

Streams can work with JavaScript objects instead of just strings and buffers:

Example

const { Readable } = require('stream');

// Create a readable stream in object mode
const objectStream = new Readable({
  objectMode: true,
  read() {}
});
// Push objects to the stream
objectStream.push({ id: 1, name: 'Alice' });
objectStream.push({ id: 2, name: 'Bob' });
objectStream.push(null); // Signal end of stream
// Consume the stream
objectStream.on('data', (obj) => {
  console.log('Received:', obj);
});
Run example »

Practical Examples

HTTP Streaming

Streams are used extensively in HTTP requests and responses.

const http = require('http');
const fs = require('fs');

// Create an HTTP server
const server = http.createServer((req, res) => {
  // Handle different routes
  if (req.url === '/') {
    // Send a simple response
    res.writeHead(200, { 'Content-Type': 'text/html' });
    res.end('<h1>Stream Demo</h1><p>Try <a href="/file">streaming a file</a> or <a href="/video">streaming a video</a>.</p>');
  }
  else if (req.url === '/file') {
    // Stream a large text file
    res.writeHead(200, { 'Content-Type': 'text/plain' });
    const fileStream = fs.createReadStream('largefile.txt', 'utf8');

    // Pipe the file to the response (handles backpressure automatically)
    fileStream.pipe(res);

    // Handle errors
    fileStream.on('error', (err) => {
      console.error('File stream error:', err);
      res.statusCode = 500;
      res.end('Server Error');
    });
  }
  else if (req.url === '/video') {
    // Stream a video file with proper headers
    const videoPath = 'video.mp4';
    const stat = fs.statSync(videoPath);
    const fileSize = stat.size;
    const range = req.headers.range;

    if (range) {
      // Handle range requests for video seeking
      const parts = range.replace(/bytes=/, "").split("-");
      const start = parseInt(parts[0], 10);
      const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
      const chunksize = (end - start) + 1;

      const videoStream = fs.createReadStream(videoPath, { start, end });
      res.writeHead(206, {
        'Content-Range': `bytes ${start}-${end}/${fileSize}`,
        'Accept-Ranges': 'bytes',
        'Content-Length': chunksize,
        'Content-Type': 'video/mp4'
      });

      videoStream.pipe(res);
      } else {
        // No range header, send entire video
        res.writeHead(200, {
          'Content-Length': fileSize,
          'Content-Type': 'video/mp4'
        });

        fs.createReadStream(videoPath).pipe(res);
      }
  }&br>   else {
    // 404 Not Found
    res.writeHead(404, { 'Content-Type': 'text/plain' });
    res.end('Not Found');
  }
});

// Start the server
server.listen(8080, () => {
  console.log('Server running at http://localhost:8080/');
});

Processing Large CSV Files

const fs = require('fs');
const { Transform } = require('stream');
const csv = require('csv-parser'); // npm install csv-parser

// Create a transform stream to filter and transform CSV data
const filterTransform = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    // Only pass through rows that meet our criteria
    if (parseInt(row.age) > 18) {
      // Modify the row
      row.isAdult = 'Yes';
      // Push the transformed row
      this.push(row);
    }
    }
    callback();
  }
});

// Create a writable stream for the results
const results = [];
const writeToArray = new Transform({
  objectMode: true,
  transform(row, encoding, callback) {
    results.push(row);
    callback();
  }
});

// Create the processing pipeline
fs.createReadStream('people.csv')
  .pipe(csv())
  .pipe(filterTransform)
  .pipe(writeToArray)
  .on('finish', () => {
    console.log(`Processed ${results.length} records:`);
    console.log(results);
  }
  })
  .on('error', (err) => {
    console.error('Error processing CSV:', err);
  }
  });
Run example »

Best Practices

  • Error Handling: Always handle error events on streams to prevent application crashes.
  • Use pipeline(): Prefer stream.pipeline() over .pipe() for better error handling and cleanup.
  • Handle Backpressure: Respect the return value of write() to avoid memory issues.
  • End Streams: Always call end() on writable streams when you're done.
  • Avoid Synchronous Operations: Don't block the event loop with synchronous operations inside stream handlers.
  • Buffer Size: Be mindful of highWaterMark (buffer size) settings.

Warning: Mishandling streams can lead to memory leaks and performance issues.

Always handle errors and end streams properly.


Summary

Streams are a fundamental concept in Node.js that allow for efficient data handling. They:

  • Process data piece by piece without loading everything into memory
  • Provide better memory efficiency for large datasets
  • Allow processing to start before all data is available
  • Enable powerful data processing pipelines
  • Are used extensively in core Node.js APIs

Exercise?What is this?
Test your skills by answering a few questions about the topics of this page

Which method is used to connect a readable stream to a writable stream?





×

Contact Sales

If you want to use W3Schools services as an educational institution, team or enterprise, send us an e-mail:
sales@w3schools.com

Report Error

If you want to report an error, or if you want to make a suggestion, send us an e-mail:
help@w3schools.com

W3Schools is optimized for learning and training. Examples might be simplified to improve reading and learning. Tutorials, references, and examples are constantly reviewed to avoid errors, but we cannot warrant full correctness of all content. While using W3Schools, you agree to have read and accepted our terms of use, cookie and privacy policy.

Copyright 1999-2025 by Refsnes Data. All Rights Reserved. W3Schools is Powered by W3.CSS.