if (typeof then === 'function') { let d; FunctionPrototypeCall( then, body, (val) => { if (val != null) { d.push(val); } d.push(null); }, (err) => { destroyer(d, err); } ); return d = new Duplexify({ objectMode: true, writable: false, read() {} }); } throw new ERR_INVALID_ARG_TYPE( name, ['Blob', 'ReadableStream', 'WritableStream', 'Stream', 'Iterable', 'AsyncIterable', 'Function', '{ readable, writable } pair', 'Promise'], body); }; function fromAsyncGen(fn) { let { promise, resolve } = createDeferredPromise(); const ac = new AbortController(); const signal = ac.signal; const value = fn(async function*() { while (true) { const _promise = promise; promise = null; const { chunk, done, cb } = await _promise; process.nextTick(cb); if (done) return; if (signal.aborted) throw new AbortError(); ({ promise, resolve } = createDeferredPromise()); yield chunk; } }(), { signal }); return { value, write(chunk, encoding, cb) { const _resolve = resolve; resolve = null; _resolve({ chunk, done: false, cb }); }, final(cb) { const _resolve = resolve; resolve = null; _resolve({ done: true, cb }); }, destroy(err, cb) { ac.abort(); cb(err); } }; } function _duplexify(pair) { const r = pair.readable && typeof pair.readable.read !== 'function' ? Readable.wrap(pair.readable) : pair.readable; const w = pair.writable; let readable = !!isReadable(r); let writable = !!isWritable(w); let ondrain; let onfinish; let onreadable; let onclose; let d; function onfinished(err) { const cb = onclose; onclose = null; if (cb) { cb(err); } else if (err) { d.destroy(err); } else if (!readable && !writable) { d.destroy(); } } // TODO(ronag): Avoid double buffering. // Implement Writable/Readable/Duplex traits. // See, https://github.com/nodejs/node/pull/33515. d = new Duplexify({ // TODO (ronag): highWaterMark? readableObjectMode: !!r?.readableObjectMode, writableObjectMode: !!w?.writableObjectMode, readable, writable, }); if (writable) { eos(w, (err) => { writable = false; if (err) { destroyer(r, err); } onfinished(err); }); d._write = function(chunk, encoding, callback) { if (w.write(chunk, encoding)) { callback(); } else { ondrain = callback; } }; d._final = function(callback) { w.end(); onfinish = callback; }; w.on('drain', function() { if (ondrain) { const cb = ondrain; ondrain = null; cb(); } }); w.on('finish', function() { if (onfinish) { const cb = onfinish; onfinish = null; cb(); } }); } if (readable) { eos(r, (err) => { readable = false; if (err) { destroyer(r, err); } onfinished(err); }); r.on('readable', function() { if (onreadable) { const cb = onreadable; onreadable = null; cb(); } }); r.on('end', function() { d.push(null); }); d._read = function() { while (true) { const buf = r.read(); if (buf === null) { onreadable = d._read; return; } if (!d.push(buf)) { return; } } }; } d._destroy = function(err, callback) { if (!err && onclose !== null) { err = new AbortError(); } onreadable = null; ondrain = null; onfinish = null; if (onclose === null) { callback(err); } else { onclose = callback; destroyer(w, err); destroyer(r, err); } }; return d; }