The Art of I/O (3rd Session of 4)
A magical introduction to input and output signals
First Session | Previous Session | Course Home | Toggle Notes Mode | Watch Video Presentation | http://ioschool.is | Next Session
Course Recap
In our last two sessions, we've touched on
- functions (accept input, return output, maybe do effects)
- closures (functions that 'close over' variables)
- callbacks (functions that are 'called back' later)
- modules (exported code we can require in other code)
- events (when things happen)
- streams (chunks of data over time)
In this session, we'll dive deep into streams, the heart of I/O magic.
Stream Origins
"We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also."
Stream Magic
Streams allow us to
- operate on data chunk by chunk
- compose around a common abstraction
Chunk By Chunk
With streams, we can operate on data chunk by chunk, without buffering everything into memory.
Let's say we want to fill a large pool with buckets of water. We could fill many buckets one time, but then we need heaps upon heaps of buckets for a large pool, or we could fill one bucket many times.
Modular Composition
With streams, we can pipe abstractions together:
curl http://www.gutenberg.org/cache/epub/2701/pg2701.txt > mobydick.txt
<mobydick.txt sed -r 's/\s+/\n/g' | grep -i whale | wc -l
fs.createReadStream('mobydick.txt')
.pipe(replace(/\s+/g, '\n'))
.pipe(filter(/whale/i))
.pipe(linecount(function (count) {
console.log(count)
}))
Streams Overview
-
Readable
- produces data: you can pipe FROM it
readable.pipe(A)
-
Writable
- consumes data: you can pipe TO it
A.pipe(writable)
-
Transform
- consumes data, producing transformed data
A.pipe(transform).pipe(B)
Readable fs -> Writable stdout
// greetz.js
var fs = require('fs')
fs.createReadStream('greetz.txt')
.pipe(process.stdout)
echo beep boop > greetz.txt
node greetz.js
// greetz.js
var fs = require('fs')
fs.createReadStream('greetz.txt')
.pipe(process.stdout)
echo beep boop > greetz.txt
node greetz.js
Readable stdin -> Writable stdout
What if we want to read from stdin instead of a file?
Just pipe from `process.stdin` instead of `fs.createReadStream()`.
// stdin.js
process.stdin
.pipe(process.stdout)
echo "beep boop" | node stdin.js
Transform through
Now let's transform the data before we print it out!
You can chain `.pipe()` calls together just like the | operator in bash:
// stdin-through.js
process.stdin
.pipe(...)
.pipe(process.stdout)
through2
through2 is a module that provides a simple way to setup a transform stream.
npm install through2
through(write, end)
With through there are 2 parameters: write and end. Both are optional.
function write (buf, enc, next) {}function end () {}
Call next() when you're ready for the next chunk. If you don't call next(), your stream will hang!
Call this.push(VALUE) inside the callback to put VALUE into the stream's output.
Use a VALUE of null to end the stream.
Transform through
npm install through2
// stdin-through.js
var through = require('through2')
process.stdin
.pipe(through(toUpper))
.pipe(process.stdout)
function toUpper (buf, enc, next) {
var up = buf.toString().toUpperCase()
this.push(up)
next()
}
Writable Streams
We've seen `.pipe()` which is a method of all readable streams.
Any stream you can write to has these methods:
.write(buf)- write some data.end()- close the stream.end(buf)- write some data and close the stream
HTTP Streams
req is a readable stream, res is a writable stream.
var http = require('http')
function handler (req, res) {
req.pipe(process.stdout)
res.write("hello ")
res.write("world!")
res.end()
}
http.createServer(handler).listen(5000)
Object streams
Normally you can only read and write buffers and strings with streams. However, if you initialize a stream in `objectMode`, you can use any kind of object (except for `null`):
// obj-stream.js
var through = require('through2')
var tr = through.obj(function (obj, enc, next) {
this.push((obj.n * 1000) + '\n')
next()
})
tr.pipe(process.stdout)
tr.write({ n: 5 })
tr.write({ n: 10 })
tr.write({ n: 3 })
tr.end();
Streams In The Wild
Fun fact: the entire npm registry is available as JSON!
Let's use curl to grab this:
curl http://isaacs.iriscouch.com/registry/_all_docs?include_docs=true > npm.json
Then count Dominic Tarr's modules:
cat npm.json | grep 'Dominic Tarr' | wc -l
Reading JSON Without Streams
// npm-nostream.js
var fs = require('fs')
fs.readFile("npm.json", function (err, contents) {
var registry = JSON.parse(contents)
var pkgs = registry.rows.map(function (row) { return row.doc })
var names = pkgs.map(function (pkg) { return pkg.name })
names.forEach(console.log)
})
Reading JSON With Streams
npm install json-stream
// npm.js
var fs = require('fs')
var JSONStream = require('JSONStream')
var through = require('through2')
fs.createReadStream('npm.json')
.pipe(JSONStream.parse(['rows', true, 'doc']))
.pipe(getNames())
.pipe(process.stdout)
function getNames () {
return through.obj(function (pkg, enc, next) {
if (pkg.name)
this.push(pkg.name + "\n")
next()
})
}
Basic Stream Processing
// npm.js
var fs = require('fs')
var JSONStream = require('JSONStream')
var through = require('through2')
fs.createReadStream('npm.json')
.pipe(JSONStream.parse(['rows', true, 'doc']))
.pipe(filterByAuthor("Dominic Tarr"))
.pipe(getNames())
.pipe(process.stdout)
function filterByAuthor (authorName) {
return through.obj(function (pkg, enc, next) {
if (!pkg || !pkg.author) return next()
if (pkg.author.name === authorName) {
this.push(pkg)
}
next()
})
}
// function getNames ...
Stream Resources
- substack's stream workshop notes and video
- substack's stream handbook
On The Magic Journey
NodeSchool
- javascripting: Javascript workshop
- learnyounode: Async I/O workshop
- async-you: Advanced async workshop
- browserify-adventure: Browserify workshop
- stream-adventure: Stream workshop