function createDeferred() { const d = {}; d.promise = new Promise((resolve, reject) => { d.resolve = resolve; d.reject = reject; }); return d; } const SYMBOL_FINISHED = Symbol(); const SYMBOL_NEW_VALUE = Symbol(); /** * makePushPullAsyncIterableIterator * * The iterable will publish values until return or throw is called. * Afterwards it is in the completed state and cannot be used for publishing any further values. * It will handle back-pressure and keep pushed values until they are consumed by a source. */ function makePushPullAsyncIterableIterator() { let isRunning = true; const values = []; let newValueD = createDeferred(); const finishedD = createDeferred(); const asyncIterableIterator = (async function* PushPullAsyncIterableIterator() { while (true) { if (values.length > 0) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion yield values.shift(); } else { const result = await Promise.race([ newValueD.promise, finishedD.promise ]); if (result === SYMBOL_FINISHED) { break; } if (result !== SYMBOL_NEW_VALUE) { throw result; } } } })(); function pushValue(value) { if (isRunning === false) { // TODO: Should this throw? return; } values.push(value); newValueD.resolve(SYMBOL_NEW_VALUE); newValueD = createDeferred(); } // We monkey patch the original generator for clean-up // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const originalReturn = asyncIterableIterator.return.bind(asyncIterableIterator); asyncIterableIterator.return = ( // eslint-disable-next-line @typescript-eslint/no-explicit-any ...args) => { isRunning = false; finishedD.resolve(SYMBOL_FINISHED); return originalReturn(...args); }; // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const originalThrow = asyncIterableIterator.throw.bind(asyncIterableIterator); asyncIterableIterator.throw = (err) => { isRunning = false; finishedD.resolve(err); return originalThrow(err); }; return { pushValue, asyncIterableIterator }; } const makeAsyncIterableIteratorFromSink = (make) => { const { pushValue, asyncIterableIterator } = makePushPullAsyncIterableIterator(); const dispose = make({ next: (value) => { pushValue(value); }, complete: () => { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion asyncIterableIterator.return(); }, error: (err) => { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion asyncIterableIterator.throw(err); } }); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const originalReturn = asyncIterableIterator.return; let returnValue = undefined; asyncIterableIterator.return = () => { if (returnValue === undefined) { dispose(); returnValue = originalReturn(); } return returnValue; }; return asyncIterableIterator; }; function applyAsyncIterableIteratorToSink(asyncIterableIterator, sink) { const run = async () => { try { for await (const value of asyncIterableIterator) { sink.next(value); } sink.complete(); } catch (err) { sink.error(err); } }; run(); return () => { var _a; (_a = asyncIterableIterator.return) === null || _a === void 0 ? void 0 : _a.call(asyncIterableIterator); }; } function isAsyncIterable(input) { return (typeof input === "object" && input !== null && // The AsyncGenerator check is for Safari on iOS which currently does not have // Symbol.asyncIterator implemented // That means every custom AsyncIterable must be built using a AsyncGeneratorFunction (async function * () {}) // eslint-disable-next-line @typescript-eslint/no-explicit-any (input[Symbol.toStringTag] === "AsyncGenerator" || (Symbol.asyncIterator && Symbol.asyncIterator in input))); } export { applyAsyncIterableIteratorToSink, isAsyncIterable, makeAsyncIterableIteratorFromSink, makePushPullAsyncIterableIterator };