The Definitive Guide to Object Streams in Node.js

Node.js Streams come with a great power: You have an asynchronous way of dealing with input and output, and you can transform data in independent steps. In this tutorial, I'll walk you through the theory, and teach you how to use object stream transformables, just like Gulp does.


When I was researching for my book Front-End Tooling with Gulp, Bower and Yeoman, I decided to not just explain APIs and use cases, but also focus on the concepts underneath.

You know that especially in JavaScript, tools and frameworks come and go faster than you can register domains and Github groups for them. For Gulp.js, one of the most crucial concepts are streams!

Some 50 years of streams

With Gulp, you want to read input files and transform them into the desired output, loading lots of JavaScript files and combining them into one. The Gulp API provides some methods for reading, transforming, and writing files, all using streams under the hood.

Streams are a fairly old concept in computing, originating from the early Unix days in the 1960s.

The source can be of multiple types: files, the computer’s memory, or input devices like a keyboard or a mouse.

Once a stream is opened, data flows in chunks from its origin to the process consuming it. Coming from a file, every character or byte would be read one at a time; coming from the keyboard, every keystroke would transmit data over the stream.

The biggest advantage compared to loading all the data at once is that, in theory, the input can be endless and without limits.

Coming from a keyboard, that makes total sense - why should anybody close the input stream you’re using to control your computer?

Input streams are also called readable streams, indicating that they’re meant to read data from a source. On the other hand, there are outbound streams or destinations; they can also be files or some place in memory, but also output devices like the command line, a printer, or your screen.

They’re also called writeable streams, meaning that they’re meant to store the data that comes over the stream. The figure below illustrates how streams work.

Different types of readable and writeable streams in Node.js

The data is a sequence of elements made available over time (like characters or bytes).

Readable streams can originate from different sources, such as input devices (keyboards), files, or data stored in memory. Writeable streams can also end in different places, such as files and memory, as well as the command line.

Not only is it possible to have an endless amount of input, but you also can combine different readable and writeable streams. Key input can be directly stored into a file, or you can print file input out to the command line or even a connected printer. The interface stays the same no matter what the sources or destinations are.

The easiest program in Node.js involving streams is piping the standard key input to the standard output, the console:

process.stdin.pipe(process.stdout);

We take our readable (process.stdin) and pipe it to a writeable (process.stdout). As said before, we can stream any content from any readable source to any writeable destination.

Take the request package for example, where you can do an HTTP request to a URL. Why not fetching some page on the web and printing it out on process.stdin?

const request = require('request');

request('https://fettblog.eu').pipe(process.stdout);

The output of an HTML page might not be particularly useful on a console but think of it being piped to a file for a web scraper.

Transforming data

Streams aren’t just good for transferring data between different input sources and output destinations.

With the data exposed once a stream is opened, developers can transform the data that comes from the stream before it reaches its destination, such as by transforming all lowercase characters in a file to uppercase characters.

This is one of the greatest powers of streams. Once a stream is opened and you can read the data piece by piece, you can slot different programs in between. The figure below illustrates this process.

Streams are good not only for transferring data but also for modifying it.

To modify data, you add transformation blocks between the input and the output.

In this example, you get your input data from different sources and channel it through a toUpperCase transformation. This changes lowercase characters to their uppercase equivalent. Those blocks can be defined once and reused for different input origins and outputs.

In the following listing, we define a toUpperCase function that -- well -- transforms every letter to its uppercase equivalent. There are many ways to create this functionality, but I've always been a huge fan of the Node.js streaming packages like through2. They define a good wrapper to make creating new transformables in a breeze:

const through2 = require('through2');

const toUpperCase = through2((data, enc, cb) => {      /* 1 */
  cb(null, new Buffer(data.toString().toUpperCase())); /* 2 */
});

process.stdin.pipe(toUpperCase).pipe(process.stdout);  /* 3 */
  1. The through2 package takes a function for the first parameter. This function passes data (in a Buffer), some encoding information and a callback we can call once we're done with our transformation.
  2. Usually, in Node.js streams, we pass Buffers with the data from the stream. Coming from process.stdin this is most likely the current line before we press Return. Coming from a file, this can be actually anything. We transform the current Buffer to a string, create the uppercase version, and convert it back to a Buffer again. The callback takes two arguments. The first one is a possible error. The stream will crash and the program will stop the execution if you are not listening to an end event to catch the error. Pass null if everything is okay. The second parameter is the transformed data.
  3. We can use this transformable and pipe our input data from the readable to it. The transformed data is piped to our writeable.

This is totally in the vein of functional programming. We can use and reuse the same transformable for every other input or output, as long as it's coming from a readable stream. We don't care about the input source or the output. Also, we are not limited to one single transformable. We can chain as many transformables as we like:

const through2 = require('through2');

const toUpperCase = through2((data, enc, cb) => {
  cb(null, new Buffer(data.toString().toUpperCase()));
});

const dashBetweenWords = through2((data, enc, cb) => {
  cb(null, new Buffer(data.toString().split(' ').join('-')));
});

process.stdin
  .pipe(toUpperCase)
  .pipe(dashBetweenWords)
  .pipe(process.stdout);

If you are familiar with Gulp, the code above should ring some bell. Very similar, isn't it? However, Gulp streams are different in one specific matter: We don't pass data in Buffers, we use plain, old JavaScript objects.

Object streams

In standard streams, it’s usual to see the file just as a possible input source for the real data, which has to be processed. All information on the origin, like the path or filename, is lost once the stream has opened up.

In Gulp, you’re not just working with the contents of one or a few files, you need filename and the origin of the file system as well.

Think of having 20 JavaScript files and wanting to minify them. You’d have to remember each filename separately and keep track of which data belongs to which file to restore a connection once the output (the minified files of the same name) must be saved.

Luckily, Gulp takes care of that for you by creating both a new input source and a data type that can be used for your streams: virtual file objects.

Once a Gulp stream is opened, all the original, physical files are wrapped in such a virtual file object and handled in the virtual file system, or Vinyl, as the corresponding software is called in Gulp.

Vinyl objects, the file objects of your virtual file system, contain two types of information: the path where the file originated, which becomes the file’s name, as well as a stream exposing the file’s contents. Those virtual files are stored in your computer’s memory, known for being the fastest way to process data.

There all the modifications are done that would usually be made on your hard disk. By keeping everything in memory and not having to perform expensive read and write operations in between processes, Gulp can make changes extraordinarily quickly.

Internally, Gulp is using object streams to emit file by file into the processing pipeline. Object streams behave just like normal streams, but instead of Buffers and strings, we pass through plain old JavaScript objects.

We can create our own readable object stream using the readable-stream package:

const through2 = require('through2');
const Readable = require('readable-stream').Readable;

const stream = Readable({objectMode: true});   /* 1 */
stream._read = () => {};                       /* 2 */

setInterval(() => {                            /* 3 */
  stream.push({
    x: Math.random()
  });
}, 100);

const getX = through2.obj((data, enc, cb) => { /* 4 */
  cb(null, `${data.x.toString()}\n`);
});

stream.pipe(getX).pipe(process.stdout);        /* 5 */
  1. Important for creating an object readable is to set the objectMode flag to true. In doing so, the stream is capable of passing JavaScript objects through the pipeline. It would expect Buffers or Strings otherwise.
  2. Every stream needs a _read function. This function gets called when the stream checks for data. This is the proper place to start other mechanisms around and push new contents to the stream. Since we push data from outside, we don't need this function and can keep it void. However, readable streams need to implement this, otherwise we would get an error.
  3. Here we are filling the stream with demo data. Every 100 milliseconds, we push another object with a random number to our stream.
  4. Since we want to pipe the results of the object stream to process.stdout, and process.stdout just accepts strings, we have a small transformable where we extract the property from our passed through JavaScript object.
  5. We create a pipeline. Our readable object stream pipes all its data to the getX transformable, and finally to the writeable process.stdout

A note on stream packages in Node.js

You might have noticed that we use different stream packages that are installable via NPM. Isn't that odd?

However, the streaming core was constantly subject to change back in the old 0.x days of Node, that's why the community stepped in and created a solid and stable API around the basic packages. With semantic versioning, you can be sure that the streaming ecosystem moves nicely along with your application.

Enough demos. Let's do something real

Alright! Let's go for a small app that reads CSV data and stores them into JSON. We want to use object streams because at some points we might want to change data depending on the use case. Since streams are awesome, we want to be able to push the result to different output formats.

First things first, we install a few packages:

const through2 = require('through2');
const fs = require('fs');
const split = require('split2');
  1. We know through2 already. We use this one to create all our transformables.
  2. The fs package is obviously for reading and writing files. Cool thing: It allows you to create a readable! Exactly what we need.
  3. Since you never know how the data from fs.createReadStream is pulled into your memory, the split2 package makes sure that you can process data line by line. Note the "2" in the name of this transformable. It tells you that it's part of the semantically versioned wrapper ecosystem.

Parse CSV!

CSV is great for parsing because it follows a very easy to understand format: A comma means a new cell. A line means a new row.

Easy.

In this example, the first line is always the heading for our data. So we want to treat the first line in a special way: It will provide the keys for our JSON objects.

const parseCSV = () => {
  let templateKeys = [];
  let parseHeadline = true;
  return through2.obj((data, enc, cb) => {       /* 1 */
    if (parseHeadline) {
      templateKeys = data.toString().split(',');
      parseHeadline = false;
      return cb(null, null);                     /* 2 */
    }

    const entries = data.toString().split(',');
    const obj = {};

    templateKeys.forEach((el, index) => {       /* 3 */
      obj[el] = entries[index];
    });

    return cb(null, obj);                       /* 4 */
  });
};
  1. We create a transformable for object streams. Notice the .obj method. Even if your input data is just strings, you need an object stream transformable if you want to emit objects further on.
  2. In this block, we parse the headline (comma separated). This is going to be our template for the keys. We remove this line from the stream, that's why we pass null both times.
  3. For all other lines, we create an object each through the help of the template keys we parsed earlier.
  4. We pass this object through to the next stage.

That's all it needs to create JavaScript objects out of a CSV file!

Changing and adapting data

Once we have everything available in objects, we can transform the data much easier. Delete properties, add new ones; filter, map and reduce. Anything you like. For this example, we want to keep it easy: Pick the first 10 entries:

const pickFirst10 = () => {
  let cnt = 0;
  return through2.obj((data, enc, cb) => {
    if (cnt++ < 10) {
      return cb(null, data);
    }
    return cb(null, null);
  });
};

Again, like in the previous example: Passing data for the second argument of a callback means that we keep the element in the stream. Passing null means that we throw the data away. This is crucial for filters!

Flushing to a JSON

You know what JSON stands for? JavaScript object notation. This is great, because we have JavaScript objects, and we can note them down in a string representation!

So, what we want to do with the objects in our stream is to collect all of them that are passing through, and store them into a single string representation. JSON.stringify comes into mind.

One important thing you have to know when working with streams is that once the object (or Buffer data for that matter) passes through your transformable to the next stage, it's gone for this stage.

This also means that you can pass objects just to one writeable, not more. There is, however, a way of collecting data and doing something different with it. If there's no more data coming through a stream, each transformable calls a flush method.

Think of a sink that's getting filled with fluids.

You are not able to pick every single drop of it and analyze it again. But you can flush the whole thing to the next stage. This is what we're doing with the next transformable toJSON:

const toJSON = () => {
  let objs = [];
  return through2.obj(function(data, enc, cb) {
    objs.push(data);                              /* 1 */
    cb(null, null);
  }, function(cb) {                               /* 2 */
    this.push(JSON.stringify(objs));
    cb();
  });
};
  1. We collect all data that's passing through in an array. We remove the objects from our stream.
  2. In the second callback method, the flush method, we are transforming the collected data to a JSON string. With this.push (note the classic function notation there), we push this new object to our stream into the next stage. In this example, the new "object" is merely a string. Something that's compatible with standard writeables!

Gulp, for example, uses this behavior when working with concatenation plugins. Reading all files in stage one, and then flushing one single file to the next stage.

Combining everything

Functional programming comes into mind again: Each transformable that we've written in the last couple of lines is completely separated from the others. And they are perfectly reusable for different scenarios, regardless of input data or output format.

The only constraints are in the format of CSV (the first line is the headline) and that pickFirst10 and toJSON need JavaScript objects as input. Let's combine them and put the first ten entries as JSON on our standard console output:

const stream = fs.createReadStream('sample.csv');

stream
  .pipe(split())
  .pipe(parseCSV())
  .pipe(pickFirst10())
  .pipe(toJSON())
  .pipe(process.stdout);

Perfect! We can pipe the whole lot to different writeables, though. In Node.js, the core IO is all compatible with streams. So let's use a quick HTTP server and pipe everything out into the internet:

const http = require('http');

// All from above
const stream = fs.createReadStream('sample.csv')
  .pipe(split())
  .pipe(parseCSV())
  .pipe(pickFirst10())
  .pipe(toJSON())

const server = http.createServer((req, res) => {
  stream.pipe(res);
});

server.listen(8000);

This is the great power of Node.js streams. You have an asynchronous way of dealing with input and output, and you can transform data in independent steps. With object streams, you can leverage JavaScript objects that you know and love to transform your data.

This is the foundation of Gulp as a streaming build system, but also a great tool for your everyday development.

Further reading

If you are hooked on streams, I can recommend a few resources: