The Art of I/O (3rd Session of 4)
A magical introduction to input and output signals
First Session | Previous Session | Course Home | Toggle Notes Mode | Watch Video Presentation | http://ioschool.is | Next Session
Course Recap
In our last two sessions, we've touched on
- functions (accept input, return output, maybe do effects)
- closures (functions that 'close over' variables)
- callbacks (functions that are 'called back' later)
- modules (exported code we can require in other code)
- events (when things happen)
- streams (chunks of data over time)
In this session, we'll dive deep into streams, the heart of I/O magic.
Stream Origins
"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also."
Stream Magic
Streams allow us to
- operate on data chunk by chunk
- compose around a common abstraction
Chunk By Chunk
With streams, we can operate on data chunk by chunk, without buffering everything into memory.
Let's say we want to fill a large pool with buckets of water. We could fill many buckets one time, but then we need heaps upon heaps of buckets for a large pool, or we could fill one bucket many times.
Modular Composition
With streams, we can pipe abstractions together:
curl http://www.gutenberg.org/cache/epub/2701/pg2701.txt > mobydick.txt
<mobydick.txt sed -r 's/\s+/\n/g' | grep -i whale | wc -l
fs.createReadStream('mobydick.txt')
.pipe(replace(/\s+/g, '\n'))
.pipe(filter(/whale/i))
.pipe(linecount(function (count) {
console.log(count)
}))
Streams Overview
-
Readable
- produces data: you can pipe FROM it
readable.pipe(A)
-
Writable
- consumes data: you can pipe TO it
A.pipe(writable)
-
Transform
- consumes data, producing transformed data
A.pipe(transform).pipe(B)
Readable fs -> Writable stdout
// greetz.js
var fs = require('fs')
fs.createReadStream('greetz.txt')
.pipe(process.stdout)
echo beep boop > greetz.txt
node greetz.js
// greetz.js
var fs = require('fs')
fs.createReadStream('greetz.txt')
.pipe(process.stdout)
echo beep boop > greetz.txt
node greetz.js
Readable stdin -> Writable stdout
What if we want to read from stdin instead of a file?
Just pipe from `process.stdin` instead of `fs.createReadStream()`.
// stdin.js
process.stdin
.pipe(process.stdout)
echo "beep boop" | node stdin.js
Transform through
Now let's transform the data before we print it out!
You can chain `.pipe()` calls together just like the |
operator in bash:
// stdin-through.js
process.stdin
.pipe(...)
.pipe(process.stdout)
through2
through2 is a module that provides a simple way to setup a transform stream.
npm install through2
through(write, end)
With through there are 2 parameters: write
and end
. Both are optional.
function write (buf, enc, next) {}
function end () {}
Call next()
when you're ready for the next chunk. If you don't call next()
, your stream will hang!
Call this.push(VALUE)
inside the callback to put VALUE into the stream's output.
Use a VALUE
of null
to end the stream.
Transform through
npm install through2
// stdin-through.js
var through = require('through2')
process.stdin
.pipe(through(toUpper))
.pipe(process.stdout)
function toUpper (buf, enc, next) {
var up = buf.toString().toUpperCase()
this.push(up)
next()
}
Writable Streams
We've seen `.pipe()` which is a method of all readable streams.
Any stream you can write to has these methods:
.write(buf)
- write some data.end()
- close the stream.end(buf)
- write some data and close the stream
HTTP Streams
req
is a readable stream, res
is a writable stream.
var http = require('http')
function handler (req, res) {
req.pipe(process.stdout)
res.write("hello ")
res.write("world!")
res.end()
}
http.createServer(handler).listen(5000)
Object streams
Normally you can only read and write buffers and strings with streams. However, if you initialize a stream in `objectMode`, you can use any kind of object (except for `null`):
// obj-stream.js
var through = require('through2')
var tr = through.obj(function (obj, enc, next) {
this.push((obj.n * 1000) + '\n')
next()
})
tr.pipe(process.stdout)
tr.write({ n: 5 })
tr.write({ n: 10 })
tr.write({ n: 3 })
tr.end();
Streams In The Wild
Fun fact: the entire npm registry is available as JSON!
Let's use curl
to grab this:
curl http://isaacs.iriscouch.com/registry/_all_docs?include_docs=true > npm.json
Then count Dominic Tarr's modules:
cat npm.json | grep 'Dominic Tarr' | wc -l
Reading JSON Without Streams
// npm-nostream.js
var fs = require('fs')
fs.readFile("npm.json", function (err, contents) {
var registry = JSON.parse(contents)
var pkgs = registry.rows.map(function (row) { return row.doc })
var names = pkgs.map(function (pkg) { return pkg.name })
names.forEach(console.log)
})
Reading JSON With Streams
npm install json-stream
// npm.js
var fs = require('fs')
var JSONStream = require('JSONStream')
var through = require('through2')
fs.createReadStream('npm.json')
.pipe(JSONStream.parse(['rows', true, 'doc']))
.pipe(getNames())
.pipe(process.stdout)
function getNames () {
return through.obj(function (pkg, enc, next) {
if (pkg.name)
this.push(pkg.name + "\n")
next()
})
}
Basic Stream Processing
// npm.js
var fs = require('fs')
var JSONStream = require('JSONStream')
var through = require('through2')
fs.createReadStream('npm.json')
.pipe(JSONStream.parse(['rows', true, 'doc']))
.pipe(filterByAuthor("Dominic Tarr"))
.pipe(getNames())
.pipe(process.stdout)
function filterByAuthor (authorName) {
return through.obj(function (pkg, enc, next) {
if (!pkg || !pkg.author) return next()
if (pkg.author.name === authorName) {
this.push(pkg)
}
next()
})
}
// function getNames ...
Stream Resources
- substack's stream workshop notes and video
- substack's stream handbook
On The Magic Journey
NodeSchool
- javascripting: Javascript workshop
- learnyounode: Async I/O workshop
- async-you: Advanced async workshop
- browserify-adventure: Browserify workshop
- stream-adventure: Stream workshop