NodeJs Stream – Memoria insufficiente

Sto cercando di elaborare un stream di 300 milioni di righe di dati. Una volta che raggiungo circa 5 milioni di righe, ricevo un ERRORE FATALE: CALL_AND_RETRY_LAST L’allocazione non è riuscita – elabora la memoria . (Il numero varia a seconda della macchina ma è costante).

È ansible eseguire il codice qui sotto per vedere ciò che accade – Non posso dire se il problema nel codice di inerente ai flussi. Ho provato a sminuire il processo ma non ho potuto farlo.

C’è un limite di memoria? Ho rimosso tutti gli altri codici e ho “oscurato” l’esempio per accertarmi che non si trattasse di un problema di contropressione.

var Readable = require('stream').Readable; var Writable = require('stream').Writable; var util = require('util'); var tenMillion = 10000000; //var tenMillion = 5000000; //THIS WORKS var writeEvery = tenMillion / 10; /* * Create a really simple stream that will run 10 million times */ function Streamo(max) { Readable.call(this, { objectMode: true }); this._currentIndex = -1; this._maxIndex = max; } util.inherits(Streamo, Readable); Streamo.prototype._read = function () { this._currentIndex += 1; if (this._currentIndex % writeEvery == 0) { console.log(this._currentIndex + ' of ' + this._maxIndex) }; if (this._currentIndex = this._maxIndex) { console.log("BOOM") this.push(null); return; } this.push(true); }; /* * Create a really simple Writable Stream to Count */ function Counta() { Writable.call(this, { objectMode: true, highWaterMark: (200 * 1024) }); this._count = 0; } util.inherits(Counta, Writable); Counta.prototype._write = function (chunk, enc, cb) { this._count++; if (this._count % writeEvery == 0) { console.log('_______________________________' + this._count) }; cb(); }; Counta.prototype.Count = function () { return this._count; } /* * Exercise It */ var s = new Streamo(tenMillion); var c = new Counta(); s.pipe(c); c.on('finish', function () { console.log("BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM BOOM ") }); 

    Questo è un problema noto con l’implementazione corrente dei flussi.

    Nella documentazione e nel codice di streaming, ci sono più punti in cui si fa riferimento che _read() dovrebbe essere asincrono.

    Quindi, se non si esegue l’I / O (asincrono) di qualche tipo nell’implementazione _read() , potrebbe essere necessario (almeno occasionalmente) chiamare setImmediate() prima di push() ing, per mantenere lo stack di chiamate da diventando troppo grande Ad esempio, questo funziona senza crash:

     Streamo.prototype._read = function (n) { this._currentIndex += 1; if (this._currentIndex % writeEvery == 0) { console.log(this._currentIndex + ' of ' + this._maxIndex) }; if (this._currentIndex < 0 || this._currentIndex >= this._maxIndex) { console.log("BOOM") this.push(null); return; } var self = this; if (this._currentIndex % writeEvery == 0) { setImmediate(function() { self.push(true); }); } else this.push(true); };