Node.js Streams: Todo lo que necesitas saber.

Henry, 02 Jul 2018

Los streams en Node.js tienen la reputación de ser difíciles para trabajar con ellos, y aún más complicados de entender. Pues te tengo buenas noticias, nunca mas será tu caso.

Durante años, los desarrolladores han creado muchos paquetes solo con el propósito de hacer el trabajo con streams más fácil. Pero en este artículo voy a enfocarme en la API nativa de Node.js.

¿Qué son exactamente los streams?

Los streams son colecciones de datos, así como los arreglos y las cadenas. La diferencia es que los streams no están disponibles todo a la vez, y no tienen porque caber en memoria. Esto hace a los streams muy poderosos cuando trabajamos con grandes cantidades de datos, o datos que vienen de una fuente externa de a un bloque a la vez.

Sin embargo, los streams no son solo para trabajar con grandes cantidades de datos. También nos dan el poder de hacer componible nuestro código. Justo como podemos componer poderosos comandos de linux haciendo pipe entre comandos más pequeños, podemos hacer exactamente lo mismo en Node con streams.

~/learn-node $ grep -R exports * | wc -l
// 6
const grep = ... // Un Stream para la salida de grep
const wc = ... // Un stream para la entrada de wc

grep.pipe(wc)

Muchos de los módulos internos en Node implementan streams.

La lista de arriba tiene algunos ejemplos de objetos nativos de Node.js, que son de lectura (readable) y de escritura (writable). Algunos de estos objetos son ambos, tanto de lectura como de escritura, como los TCP sockets, zlib y crypto streams.

Nota que los objetos también están estrechamente relacionados. Mientras las respuestas HTTP es un stream de lectura en el cliente, es un stream de escritura en el servidor. Esto es porque en el caso de HTTP, básicamente leemos de un objeto (http.IncomingMessage) y escribimos al otro (http.ServerResponse).

Nota también como los stdio streams (stdin, stdout, stderr) tienen tipos de streams inversos cuando viene de un proceso hijo. Esto permite una forma realmente fácil de hacer pipe, recibiendo y enviando datos a estos streams desde el stdio stream del proceso principal.

Un ejemplo práctico de streams

La teoría es grandiosa, pero no 100% convincente. Vamos a ver un ejemplo demostrando la diferencia que los streams pueden hacer en el código, cuando se tiene en cuenta el consumo de memoria.

Primero vamos a crear un archivo de gran tamaño:

const fs = require('fs')
const file = fs.createWriteStream('./big.file')

for (let i = 0; i <= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n')
}

file.end()

Mira lo que usé para ese gran archivo. ¡Un stream de escritura!

El módulo fs puede usarse para leer y escribir archivos por medio de streams. En el ejemplo de arriba, escribimos en big.file, a través de un stream de escritura, un millón de líneas con un ciclo for.

Correr el script anterior genera un archivo de aproximadamente ~400 MB.

A continuación un servidor web simple en Node, diseñado exclusivamente para servir big.file

const fs = require('fs')
const server = require('http').createServer()

server.on('request', (req, res) => {
  fs.readFile('./bigfile', (err, data) => {
    if (err) throw err
    res.end(data)
  })
})

server.listen(8000)

Cuando el servidor reciba una petición, servirá el archivo big.file usando el método asíncrono fs.readFile. Pero mira, no es como que estemos bloqueando el event loop o algo. Todo esta bien, ¿verdad? ¿verdad?

Bueno, vamos a ver que pasa cuando corremos el servidor, nos conectamos, y monitoreamos la memoria mientras lo hacemos.

Cuando arrancamos el servidor, este empieza con un consumo de memoria de 31 MB. Después me conecté al servidor. Observa lo que ocurrió con el consumo de memoria:

¡Wow! el consumo de memoria subió hasta 458 MB.

Básicamente pusimos todo el contenido de big.file en memoria antes de escribirlo en el objeto de respuesta. Esto es muy ineficiente.

El objeto de respuesta HTTP (res en el código de arriba) es también un stream de escritura. Esto significa que si tenemos un stream de lectura que represente el contenido de big.file, podemos simplemente hacer pipe entre ellos y lograr casi el mismo resultado sin consumir ~450 MB de memoria.

El módulo fs de Node puede darnos un stream de lectura de cualquier archivo utilizando el método createReadStream. Podemos hacer pipe de esto al objeto de respuesta:

const fs = require('fs')
const server = require('http').createServer()

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file')
  src.pipe(res)
})

server.listen(8000)

Ahora cuando nos conectamos al servidor, la magia sucede (mira el consumo de memoria):

¿Qué está pasando?

Cuando un cliente pide por big.file, lo enviamos por el stream un bloque a la vez, lo que significa que no lo cargamos todo en memoria a la vez. El uso de memoria solo creció un poco y eso fue todo.

Puedes llevar este mismo ejemplo a sus limites. Recrea el big.file con cinco millónes de líneas en vez de un millón, lo que hará que el archivo llegue por encima de los 2 GB, lo que en realidad es más grande que el limite de carga por defecto de Node.

Si intentas servir el archivo usando fs.readFile, simplemente no puedes, por defecto (puedes cambiar el limite). Pero con fs.createReadStream, no hay problema alguno haciendo stream de 2 GB de datos a la petición, y lo mejor de todo, el uso de memoria del proceso será más o menos el mismo.

¿Listo para aprender streams ahora?

Streams (las bases)

Hay cuatro tipos fundamentales de streams en Node.js: de lectura (Readable), de escritura (Writable), Duplex, y streams de transformación (Transform Streams).

  • Un stream de lectura es una abstracción de una fuente cuya información puede ser consumida o leída. Un ejemplo de esto es el método fs.createReadStream.
  • Un stream de escritura es una abstracción de un destino cuya información puede ser asignada o escrita. Un ejemplo de esto es el método fs.createWriteStream.
  • Un stream duplex es tanto de lectura como de escritura. Un ejemplo de esto es un socket TCP.
  • Un stream de transformación es básicamente un stream duplex que puede ser usado para modificar o transformar los datos mientras es escrita y leída. Un ejemplo de esto es el stream zlib.createGzip para comprimir los datos usando gzip. Puedes pensar en los streams de transformación como una función donde la entrada es la parte del stream de escritura y la salida es la parte del stream de lectura. También puedes de los streams de transformación como “through streams”

Todos los streams son instancias de EventEmitter. Estos emiten eventos que pueden ser usados para leer o escribir datos. Sin embargo, podemos consumir datos de un stream de una manera más simple usando el método pipe.

El método pipe

Aquí la línea mágica que necesitas recordar:

readableSrc.pipe(writableDest)

En esta simple línea, estamos haciendo pipe la salida de un stream de lectura, la fuente de los datos, como la entrada de un stream de escritura, el destino. La fuente debe ser un stream de lectura y el destino debe ser uno de escritura. Por supuesto, pueden ser ambos duplex/de transformación también. De hecho, si estamos haciendo pipe en un stream duplex, podemos encadenar pipes como en linux:

readableSrc
  .pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWritableDest)

El método pipe retorna el stream destino, lo que nos permite hacer el encadenamiento de arriba. Para los streams a (de lectura), b y c (duplex), y d (de escritura), podemos:

a.pipe(b).pipe(c).pipe(d)

// que es equivalente a:
a.pipe(b)
b.pipe(c)
c.pipe(d)

# Lo que en linux es equivalente a:
$ a | b | c | d

El método pipe es la forma más fácil de consumir streams. Generalmente es recomendado usar el método pipe o consumir los streams con eventos, evitando mesclar ambos. Usualmente cuando estas usas el método pipe no hace falta usar eventos, pero si necesitas consumir los streams de una forma personalizada, usar eventos será el camino a tomar.

Eventos de stream

Además de leer un stream de lectura y escribirlo en un destino de escritura, el método pipe automáticamente se encarga de algunas cosas en el camino. Por ejemplo, maneja errores, finales-de-archivos, y los casos cuando un stream es más lento o rápido que el otro.

Sin embargo, los streams también pueden ser consumidos con eventos directamente. Aquí el equivalente simplificado en eventos de lo que hace principalmente el método pipe para leer y escribir datos:

// readable.pipe(writable)

readable.on('data', (chunk) => {
  writable.write(chunk)
})

readable.on('end', () => {
  writable.end()
})

A continuación una lista de eventos y funciones importantes, que pueden ser usados en streams de lectura y escritura:

Los eventos y funciones están de algún modo relacionados porque suelen usarse juntos.

Los eventos más importantes en los streams de lectura son:

  • El evento data, que se emite cada ves que el stream pasa un bloque de datos al consumidor.
  • El evento end, que se emite cuando no hay más datos para ser consumidos desde el stream.

Los eventos más importantes en los streams de escritura son:

  • El evento drain, que es una señal que avisa que el stream de escritura puede recibir más datos.
  • El evento finish, que se emite cuando todos los datos se han descargado al sistema subyacente.

Eventos y funciones pueden ser combinados para hacer uso personalizado y optimizado de streams. Para consumir un stream de lectura, podemos usar los métodos pipe/unpipe, o los métodos read/unshift/resume. Para consumir un stream de escritura, podemos hacerlo el destino de un pipe/unpipe, o solo escribir en el con el método write y llamar el método end cuando se haya completado.

Modos de pausa y flujo de streams de lectura

Los streams de lectura tienen dos modos principales que afectan la forma en que podemos consumirlos:

  • Pueden estar en modo pausa
  • O en el modo de flujo

Esos modos, a veces, se les conoce como modos pull y push

Todos los streams de lectura empiezan en el modo de pausa por defecto, pero pueden ser fácilmente cambiados al modo de flujo y volver a pausa cundo sea necesario. A veces el cambio sucede automáticamente.

Cuando un stream de lectura está en modo pausa, podemos usar el método read() para leer desde el stream bajo demanda, sin embargo, para un stream de escritura en modo de flujo, los datos están continuamente fluyendo y debemos estar escuchando eventos para consumirlos.

En el modo de flujo, los datos pueden perderse si no hay consumidores disponibles para manejarlos. Esta es la razón por la que, cuando tenemos un stream de lectura en modo de flujo, necesitamos un manejador del evento data. De hecho, solo añadiendo el manejador del evento data cambia el stream de pausa al modo de flujo. Y removiendo el manejador del evento data cambia el stream de devuelta al modo pausa. Algo de esto es hecho para mantener compatibilidad con versiones anteriores de la interfaz de streams de Node.

Para cambiar manualmente entre estos dos modos de los streams, puedes usar los métodos resume() y pause()

Cuando se consume un stream de lectura usando el método pipe, no tenemos que preocuparnos por los modos pues pipe se encarga automáticamente.

Implementando Streams

Cuando hablamos de streams en Node.js, hay dos tareas principales diferentes:

  • La tarea de implementar el stream.
  • La tarea de consumirlo.

Bastante hemos hablado solo sobre consumir streams. ¡Vamos a implementar alguno!

Los implementadores de streams son usualmente lo que requieren (require) el módulo stream.

Implementando un Stream de Escritura

Para implementar un stream de escritura, necesitamos usar el constructor Writable del módulo stream.

const { Writable } = require('stream')

Podemos implementar un stream de muchas maneras. Podemos, por ejemplo, extender el constructor Writable si queremos.

class miWritableStream extends Writable {
}

Sin embargo, yo prefiero una aproximación más simple al constructor. Solamente creamos un objeto desde el constructor Writable y le pasamos algunas opciones. La única opción requerida es una función write que expone el bloque de datos a ser escritos.

const {Writable} = require('stream')

const outStream = new Writable({
  write (chunk, encoding, callback) {
    console.log(chunk.toString())
    callback()
  }
})

process.stdin.pipe(outStream)

Este método write recibe tres argumentos.

  • El chunk (bloque) es normalmente un búfer a menos que configuremos el stream diferente.
  • El argumento encoding es necesario en este caso, pero usualmente podemos ignorarlo.
  • El callback es una función que necesitamos llamar después de procesar el bloque de datos (chunk). Es lo que indica si la escritura fue exitosa o no. Para indicar un fallo, llama el callback con un objeto de error.

En outStream, simplemente hicimos console.log del bloque como una cadena y llamamos el callback después sin un error para indicar éxito. Esto es un muy simple y probablemente no muy útil echo stream. Imprimirá devuelta cualquier cosa que reciba.

Para consumir este stream, Podemos simplemente usarlo con process.stdin en nuestro outStream.

Cuando corremos el código de arriba, cualquier cosa que escribamos en process.stdin será repetido devuelta usando la línea console.log de outStream.

Este stream no es muy útil de implementar, pues de hecho ya está implementado por defecto. Este es muy similar a process.stdout. Podemos solo hacer pipe de stdin en stdout y obtendremos la misma característica de repetición (echo) en una sola línea:

process.stdin.pipe(process.stdout)

Implementando un Stream de Lectura

Para implementar un stream de lectura, requerimos la interfaz Readable, construimos un objeto desde esta e implementamos el método read() en los parámetros de configuración de stream:

const { Readable } = require('stream')

const inStream = new Readabel({
  read () {}
})

inStream.push('ABCDEFGHIJKLM')
inStream.push('NOPQRSTUVWXYZ')

inStream.push(null) // no más datos

inStream.pipe(process.stdout)

Cuando hacemos push de un objeto null, significa que queremos indicar que el stream no tiene más datos.

Para consumir este simple stream de lectura, podemos simplemente hacer pipe en el stream de escritura process.stdout.

Cuando corremos el código de arriba, Estaremos leyendo todos los datos desde inStream y repitiéndola en el standard out. Muy simple, pero tampoco muy eficiente.

Estamos básicamente enviando (pushing) todos los datos en el stream antes de hacer pipe al process.stdout. Es mucho mejor cargar (push) los datos bajo demanda, cuando un consumidor pida por ellos. Podemos hacerlo implementando el método read() en el objeto de configuración:

const inStream = new Readable({
  read (size) {
    // hay una demanda de datos... alguien quiere leerlos
  }
})

Cuando el método read es llamado en un stream de lectura, la implementación puede cargar (push) datos parciales a la cola. Por ejemplo, podemos cargar (push) una letra por vez, empezando por el carácter código 65 (que representa A), e incrementar en cada carga:

const inStream = new Readable({
  read (size) {
    this.push(String.fromCharCode(this.currentCharCode++))
    if (this.currenChartCode > 90) {
      this.push(null)
    }
  }
})

inStream.currentCharCode = 65

inStream.pipe(process.stdout)

Mientras el consumidor esté leyendo un stream de lectura, el método read continuará disparándose, y cargaremos (push) más letras. Necesitamos detener este ciclo en alguna parte, y esa es la razón de la condición if que carga null cuando el currentCharCode es mayor que 90 (que representa Z).

Este código es equivalente al más simple, con el que empezamos, pero ahora estamos enviando (pushing) los datos bajo demanda cuando el consumidor pide por ellos. Siempre deberías hacerlo.

Implementando Streams Duplex/de Transformación

Con streams Duplex, podemos implementar streams tanto de lectura como de escritura con el mismo objeto. Es como si heredáramos de ambas interfaces.

Aquí un ejemplo de un stream duplex que combina los dos ejemplos anteriores de lectura y escritura:

const { Duplex } = require('stream')

const inoutStream = new Duplex({
  write (chunk, encoding, callback) {
    console.log(chunk.toString())
    callback()
  },
  read (size) {
    this.push(String.fromCharCode(this.currentCharCode++))
    if (this.currentCharCode > 90) {
      this.push(null)
    }
  }
})

inoutStream.currentCharCode = 65

process.stdin.pipe(inoutStream).pipe(process.stdout)

Combinando los métodos, podemos usar este stream duplex para leer las letras de la A a la Z y también podemos usarlo por característica de repetición (echo). Hacemos pipe del stream de lectura stdin en este stream duplex para usar la característica echo y hacemos pipe del mismo stream duplex en el stream de escritura stdout para ver las letras de la A a la Z.

Es importante entender que los lados de lectura y escrituras de un stream duplex operan completamente independientes uno del otro. Esto es apenas un agrupamiento de características en un objeto.

Un stream de transformación es más interesante que un stream duplex porque su salida es computada con su entrada.

Para un stream de transformación, no tenemos que implementar los métodos read o write, solo necesitamos implementar el método transform, que combina ambos. Este tiene la firma del método write y podemos hacer push de los datos también.

Aquí hay un ejemplo simple de stream de transformación que repite (echo) de vuelta cualquier cosa que escribas y después lo transforme todo a mayúsculas.

const { Transform } = require('stream')

const upperCaseTr = new Transform({
  transform (chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase())
    callback()
  }
})

process.stdin.pipe(upperCaseTr).pipe(process.stdout)

En este stream de transformación, que está siendo consumido exactamente como el anterior ejemplo de stream duplex, convertimos el chunk en su versión en mayúsculas, y después hacemos push de esa versión a la parte de lectura.

Streams en modo objeto

Por defecto, los streams esperan valores de búfers/cadenas. Hay una bandera objectMode que podemos establecer para permitir que el stream acepte cualquier objeto Javascript.

Aquí un ejemplo simple demostrándolo. La siguiente combinación de streams de transformación, como característica, mapea una cadena de valores separados por coma en un objeto Javascirpt. Entonces "a, b, c, d" se vuelve {a: b, c: d}.

const { Transform } = require('stream')

const commásplitter = new Transform({
  readableObjectMode: true,
  transform (chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','))
    callback()
  }
})

const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  transform (chunk, encoding, callback) {
    const obj = {}
    for (let i = 0; i < chunk.length; i += 2) {
      obj[chunk[i]] = chunk[i + 1]
    }
    this.push(obj)
    callback()
  }
})

const objectToString = new Transform({
  writableObjectMode: true,
  transform (chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n')
    callback()
  }
})

process.stdin
  .pipe(commásplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

Pasamos la cadena de entrada (por ejemplo "a,b,c,d") a través de commasplitter que envía (push) un arreglo como datos de lectura (["a", "b", "c", "d"]). Añadir la bandera readableObjectMode en ese stream es necesario porque estamos enviando (pushing) un objeto, no un string.

Después tomamos el arreglo y le hacemos pipe en el stream arrayToObject. Necesitamos la bandera writableObjectMode para hacer que el stream acepte un objeto. También enviará un objeto (el arreglo de entrada mapeado en un objeto) y por eso es que también necesitamos la bandera readableObjectMode aquí. El último stream (objectToString) acepta un objeto pero envía (push) una cadena a la salida, y por eso es que solo necesitamos la bandera writableObjectMode aquí. La parte de lectura es una cadena normal (el objeto convertido en cadena).

Streams de transformación integrados en Node

Node tiene algunos streams de transformación integrados muy útiles. Los streams de zlib y crypto.

A continuación un ejemplo que usa el stream zlib.createGzip() combinado con los streams de lectura/escritura de fs para crear un script de compresión de archivos:

const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))

Puedes usar este script para comprimir (gzip) cualquier archivo que pases como argumento. Estamos haciendo un stream de lectura para el archivo y haciendo pipe al stream de transformación integrado zlib, y luego en un stream de escritura para el nuevo archivo comprimido (gziped). Simple.

Lo genial de usar pipes es que, de hecho, podemos combinarlos con eventos si es necesario. Digamos, por ejemplo, que quiero que el usuario vea un indicador de progreso mientras el scrpit se este trabajando y un mensaje “Hecho” cuando haya finalizado. Ya que el método pipe retorna el stream destino, podemos encadenar el registro de manejadores de eventos también:

const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', () => console.log('Hecho'))

Entonces con el método pipe, podemos consumir fácilmente streams, pero podemos ir todavía más lejos, personalizando nuestra interacción con esos streams, usando eventos cuando sea necesario.

Lo grandioso del método pipe es que podemos usarlos para componer nuestro programa pieza por pieza, de una forma mucho más legible. Por ejemplo, en vez de escuchar el evento data como arriba, podemos simplemente crear un stream de transformación para reportar el progreso, y reemplazar el .on() con otro .pipe():

const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

const { Transform } = require('stream')

const reportProgress = new Transform({
  transform (chunk, encoding, callback) {
    process.stdout.write('.')
    callback(null, chunk)
  }
})

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file+'.gz'))
  .on('finish', () => console.log('Hecho'))

Este stream reportProgress es un simple stream de paso, pero también reporta el progreso al standard out. Nota como usé el segundo argumento en la función callback() para enviar (push) los datos dentro del método transform(). Esto es equivalente a enviar (push) los datos primero.

Las aplicaciones de combinar streams son infinitas. Por ejemplo, si necesitamos encriptar antes o después de comprimirlo (gzip), todo lo que necesitamos es hacer pipe a otro stream de transformación en el orden exacto que necesitemos. Podemos usar el módulo crypto de node para eso:

const crypto = require('crypto')
// ...

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(crypto.createCipher('aes192', 'a_secret'))
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file+'.gz'))
  .on('finish', () => console.log('Hecho'))

El script de arriba comprime y luego encripta el archivo entregado, y solo aquellos que tienen la clave secreta pueden usar el archivo resultante. No podemos descomprimir (unzip) este archivo con las utilidades normales de descompresión (unzip) porque está encriptado.

De hecho para se capaz de descomprimir (unzip) el archivo generado con el script de arriba, necesitamos los streams opuestos de crypto y zlib en orden inverso, lo cual es simple:

//...

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Hecho'));

Asumiendo que el archivo entregado es la versión comprimida, el código de arriba creará un stream de lectura desde el archivo, hará un pipe al stream createDecipher() de crypto (usando la misma clave secreta), hará pipe la salida de este en el stream createGunzip() de zlib, y después escribirá de vuelta un archivo sin la parte de la extensión.

Eso es todo lo que tengo para este tema. ¡Gracias por leer y hasta la próxima!

Este post es una traducción adaptada de Node.js Streams: everything you need to know