-
Notifications
You must be signed in to change notification settings - Fork 147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
_(readable).pipe(writable) does not destroy readable when writable closes #691
Comments
I haven't tried this myself, but does converting the Highland stream to a
Node Readable via `toNodeStream()`and then using `pipeline` work?
`toNodeStream()` is meant to be the escape hatch for any Node stream
interop that isn't directly supported.
…On Fri, Jan 10, 2020, 12:08 PM Richard Scarrott ***@***.***> wrote:
I know native Node streams also do not exhibit this behaviour either but
they provide pipeline
<https://nodejs.org/api/stream.html#stream_stream_pipeline_streams_callback>
(AKA pump <https://github.com/mafintosh/pump>) to ensure proper cleanup.
I wondered what the equivalent pattern is for highland.
My particular use case is piping a readable stream to a
Node.ServerResponse
<https://nodejs.org/api/http.html#http_class_http_serverresponse>, e.g.
const express = require('express');
const highland = require('highland');
const { pipeline } = require('stream');
const app = express();
app.get('/highland', (req, res) => {
const readable = createNodeReadableStream();
_(readable).pipe(res);
res.destroy();
setInterval(() => {
console.log(aNodeReadableStream.destroyed) // Always false, even when `res` is closed 👎
}, 1000);
});
app.get('/node-stream', (req, res) => {
const readable = createNodeReadableStream();
pipeline(readable, res, (ex) => {
ex && console.error(ex);
});
res.destroy();
setInterval(() => {
console.log(aNodeReadableStream.destroyed) // Becomes true when `res` closes 👍
}, 1000);
});
If res is destroyed, either explicitly as above or for example by the
browser cancelling the request, readable is not destroyed and therefore
any cleanup code is not run -- it will continue to write until it's buffer
reaches the highWaterMark.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#691?email_source=notifications&email_token=ABRDEZP5FX2SOOSUJE77WE3Q5C2RTA5CNFSM4KFLNGT2YY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4IFNEYZQ>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABRDEZIKJSJAZW5HQXGEYVDQ5C2RTANCNFSM4KFLNGTQ>
.
|
Unfortunately not, however keeping const _ = require('highland');
const { pipeline } = require('stream');
const readable = createNodeReadableStream();
const transform = _.pipeline(
_.filter((chunk) => !chunk.includes('foo')),
_.map((chunk) => `::${chunk}::`)
);
pipeline(readable, transform, res, (ex) => {
if (err) {
throw err;
}
console.log('DONE');
}) As an aside, I noticed the |
Although I imagine you'd lose a lot of expressiveness by avoiding creating readable streams in highland so it's not an ideal solution. e.g. const readFile = _.wrapCallback(fs.readFile);
const fileNames = ['foo.js', 'bar.js'];
const readable = _(fileNames).map(readFile).sequence();
// The `readFile` streams are not destroyed when `res` is closed 😔
pipeline(readable, res, (ex) => { console.log('DONE', ex) }); I guess you could manually track them, e.g. let readFileStreams = [];
const readable = _(fileNames).map(readFile).tap((s) => readFileStreams.push(s)).sequence().pipe(res);
res.on('close', () => {
readFileStreams.forEach((s) => s.destroy())
}) But that's not too pretty... |
Now that I think about it, Making this work correctly is actually pretty difficult. Highland steams are lazy-by-default, so the code doesn't do a good job of propagating For now, I would say that you should do what you suggest in #691 (comment) and use const _ = require('highland');
const { pipeline } = require('stream');
function transformReadable(stream) {
// In this function, you can pretend like you have a regular Highland stream that's
// correctly wrapping the Readable.
return stream
.filter((chunk) => !chunk.includes('foo'))
.map((chunk) => `::${chunk}::`);
}
const readable = createNodeReadableStream();
pipeline(readable, _.pipeline(transformReadable), res, (ex) => {
if (err) {
throw err;
}
console.log('DONE');
}); The above won't work if you need to
Functionally, they're the same. However, semantically, you should think of |
…ped node Readable stream
I had a (naive) go at implementing it just so I could understand the difficulties a bit better if you want to take a look -- #692 |
I know native Node streams also do not exhibit this behaviour either but they provide
pipeline
(AKApump
) to ensure proper cleanup. I wondered what the equivalent pattern is for highland.My particular use case is piping a readable stream to a
Node.ServerResponse
, e.g.If
res
is destroyed, either explicitly as above or for example by the browser cancelling the request,readable
is not destroyed and therefore any cleanup code is not run -- it will continue to write until it's buffer reaches thehighWaterMark
.The text was updated successfully, but these errors were encountered: