novanode logo

Streaming to You Live

The Node 0.10 Streams API

Created by Evan Oxfeld / @evanoxfeld

Overview

  • Explore I/O bound problems in Node
  • Explore Streams API

Streams Example - Unzip


fs.createReadStream('path/to/archive.zip')
  .pipe(unzip.Parse())
  .pipe(fstream.Writer('output/path'));
				

Code!

Behind the curtain

## util.pump(readable, writable) * Sets up event handlers for 'data', 'end' * Ignores other events e.g. 'error', 'close' * API limits customization, not chainable

Streams in 0.8


	readable.pipe(duplex).pipe(writable)
						
## Streams in 0.8 * Readable * Emit data events * Optionally implement pause() and resume() * Writable * Implement write() and end()
## Awesome! Let's demo!

Issues - Backpressure



Readable Writable

Issues - Backpressure



Readable Writable

Issues - Backpressure



Readable Writable

Issues - Backpressure

Flood

More Issues

  • Buffering and backpressure
  • No on.('pipe') method
  • pause() isn't a guarantee

Streams of Tomorrow, Finally Here

Streams in 0.10

  • Readable streams are now "suck" streams
  • Can use Readable to wrap old-style streams
  • Object mode
  • Same composable pipe API
  • Shared base classes for backpressure and buffering
## Streams 0.10 Base Classes #### stream.Readable * Implement readable._read(size) * Queue data from I/O source * push(chunk) or unshift(chunk) * Users call read([size]) or pipe(dest) * read() returns null if less data is buffered than size * If objectMode is true, read(n) returns one object
## Streams 0.10 Base Classes #### stream.Writable * Implement writable._write(chunk, encoding, cb) * Users call * write(chunk, [encoding], [cb]) * write() returns false if the buffer is full * end([chunk], [encoding], [cb])

Streams 0.10 Base Classes

stream.Duplex

  • Inherits from Readable and Writable
  • Implement _read() and _write()
  • Queue data with push() and unshift()
  • Users call Readable and Writable methods

Streams 0.10 Base Classes

stream.Transform

  • Inherits from Duplex
  • Implement transform._transform(chunk, encoding, cb)
  • Users call Readable and Writable methods
## Streams 0.10 Base Classes #### stream.PassThrough * Inherits from Transform * No method to implement * Users call Readable and Writable methods
# Example stream.Transform Code!

Unzip Demo

## Other Relevant API Changes #### process.nextTick() > process.nextTick happens at the end of the current tick, immediately after the current stack unwinds. If you are currently using recursive nextTick calls, use setImmediate instead. [More 0.8 to 0.10 API Changes](https://github.com/joyent/node/wiki/Api-changes-between-v0.8-and-v0.10)

Example 0.8 Compatibility!

readable-stream module

Conclusions

  • Stream API is great for solving I/O bound problems
  • Streams2 developed in the open with a parallel user-land module
@substack
Streams make programming in node simple, elegant, and composable.

References

Questions?