View Source

The Art of I/O (3rd Session of 4)

A magical introduction to input and output signals

First Session | Previous Session | Course Home | Toggle Notes Mode | Watch Video Presentation | http://ioschool.is | Next Session

Course Recap

In our last two sessions, we've touched on

  • functions (accept input, return output, maybe do effects)
  • closures (functions that 'close over' variables)
  • callbacks (functions that are 'called back' later)
  • modules (exported code we can require in other code)
  • events (when things happen)
  • streams (chunks of data over time)

In this session, we'll dive deep into streams, the heart of I/O magic.

Stream Origins

"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also."

Doug McIlroy. October 11, 1964

Stream Magic

Streams allow us to

  • operate on data chunk by chunk
  • compose around a common abstraction

Chunk By Chunk

With streams, we can operate on data chunk by chunk, without buffering everything into memory.

Let's say we want to fill a large pool with buckets of water. We could fill many buckets one time, but then we need heaps upon heaps of buckets for a large pool, or we could fill one bucket many times.

Modular Composition

With streams, we can pipe abstractions together:

curl http://www.gutenberg.org/cache/epub/2701/pg2701.txt > mobydick.txt
<mobydick.txt sed -r 's/\s+/\n/g' | grep -i whale | wc -l
fs.createReadStream('mobydick.txt')
    .pipe(replace(/\s+/g, '\n'))
    .pipe(filter(/whale/i))
    .pipe(linecount(function (count) {
        console.log(count)
    }))

Streams Overview

  • Readable
    • produces data: you can pipe FROM it
    • readable.pipe(A)
  • Writable
    • consumes data: you can pipe TO it
    • A.pipe(writable)
  • Transform
    • consumes data, producing transformed data
    • A.pipe(transform).pipe(B)

Readable fs -> Writable stdout

// greetz.js
    
var fs = require('fs')

fs.createReadStream('greetz.txt')
  .pipe(process.stdout)
echo beep boop > greetz.txt
node greetz.js

Readable stdin -> Writable stdout

What if we want to read from stdin instead of a file?

Just pipe from `process.stdin` instead of `fs.createReadStream()`.

// stdin.js

process.stdin
  .pipe(process.stdout)
echo "beep boop" | node stdin.js

Transform through

Now let's transform the data before we print it out!

You can chain `.pipe()` calls together just like the | operator in bash:

// stdin-through.js

process.stdin
  .pipe(...)
  .pipe(process.stdout)

through2

through2 is a module that provides a simple way to setup a transform stream.

npm install through2

through(write, end)

With through there are 2 parameters: write and end. Both are optional.

  • function write (buf, enc, next) {}

  • function end () {}

Call next() when you're ready for the next chunk. If you don't call next(), your stream will hang!

Call this.push(VALUE) inside the callback to put VALUE into the stream's output.

Use a VALUE of null to end the stream.

Transform through

npm install through2

// stdin-through.js
    
var through = require('through2')

process.stdin
  .pipe(through(toUpper))
  .pipe(process.stdout)

function toUpper (buf, enc, next) {
  var up = buf.toString().toUpperCase()
  this.push(up)
  next()
}

Writable Streams

We've seen `.pipe()` which is a method of all readable streams.

Any stream you can write to has these methods:

  • .write(buf) - write some data
  • .end() - close the stream
  • .end(buf) - write some data and close the stream

HTTP Streams

req is a readable stream, res is a writable stream.

var http = require('http')

function handler (req, res) {
  req.pipe(process.stdout)

  res.write("hello ")
  res.write("world!")
  res.end()
}

http.createServer(handler).listen(5000)

Object streams

Normally you can only read and write buffers and strings with streams. However, if you initialize a stream in `objectMode`, you can use any kind of object (except for `null`):

// obj-stream.js
    
var through = require('through2')

var tr = through.obj(function (obj, enc, next) {
    this.push((obj.n * 1000) + '\n')
    next()
})
tr.pipe(process.stdout)

tr.write({ n: 5 })
tr.write({ n: 10 })
tr.write({ n: 3 })
tr.end();

Streams In The Wild

Fun fact: the entire npm registry is available as JSON!

Let's use curl to grab this:

curl http://isaacs.iriscouch.com/registry/_all_docs?include_docs=true > npm.json

Then count Dominic Tarr's modules:

cat npm.json | grep 'Dominic Tarr' | wc -l

Reading JSON Without Streams

// npm-nostream.js

var fs = require('fs')

fs.readFile("npm.json", function (err, contents) {
  var registry = JSON.parse(contents)
  var pkgs = registry.rows.map(function (row) { return row.doc })
  var names = pkgs.map(function (pkg) { return pkg.name })
  names.forEach(console.log)
})

Reading JSON With Streams

npm install json-stream

// npm.js

var fs = require('fs')
var JSONStream = require('JSONStream')
var through = require('through2')

fs.createReadStream('npm.json')
  .pipe(JSONStream.parse(['rows', true, 'doc']))
  .pipe(getNames())
  .pipe(process.stdout)

function getNames () {
  return through.obj(function (pkg, enc, next) {
    if (pkg.name)
      this.push(pkg.name + "\n")
    next()
  })
}

Basic Stream Processing

// npm.js

var fs = require('fs')
var JSONStream = require('JSONStream')
var through = require('through2')

fs.createReadStream('npm.json')
  .pipe(JSONStream.parse(['rows', true, 'doc']))
  .pipe(filterByAuthor("Dominic Tarr"))
  .pipe(getNames())
  .pipe(process.stdout)

function filterByAuthor (authorName) {
  return through.obj(function (pkg, enc, next) {
    if (!pkg || !pkg.author) return next()
    if (pkg.author.name === authorName) {
      this.push(pkg)
    }
    next()
  })
}

// function getNames ... 

Stream Resources

On The Magic Journey

NodeSchool

Duplex Stream Magic

  • Duplex A.pipe(duplex).pipe(A)
    • consumes data separately from producing data

Source