function createDeferred() { const d = {}; d.promise = new Promise((resolve, reject) => { d.resolve = resolve; d.reject = reject; }); return d; } const SYMBOL_FINISHED = Symbol(); const SYMBOL_NEW_VALUE = Symbol(); /** * makePushPullAsyncIterableIterator * * The iterable will publish values until return or throw is called. * Afterwards it is in the completed state and cannot be used for publishing any further values. * It will handle back-pressure and keep pushed values until they are consumed by a source. */ export function makePushPullAsyncIterableIterator() { let isRunning = true; const values = []; let newValueD = createDeferred(); const finishedD = createDeferred(); const asyncIterableIterator = (async function* PushPullAsyncIterableIterator() { while (true) { if (values.length > 0) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion yield values.shift(); } else { const result = await Promise.race([ newValueD.promise, finishedD.promise ]); if (result === SYMBOL_FINISHED) { break; } if (result !== SYMBOL_NEW_VALUE) { throw result; } } } })(); function pushValue(value) { if (isRunning === false) { // TODO: Should this throw? return; } values.push(value); newValueD.resolve(SYMBOL_NEW_VALUE); newValueD = createDeferred(); } // We monkey patch the original generator for clean-up // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const originalReturn = asyncIterableIterator.return.bind(asyncIterableIterator); asyncIterableIterator.return = ( // eslint-disable-next-line @typescript-eslint/no-explicit-any ...args) => { isRunning = false; finishedD.resolve(SYMBOL_FINISHED); return originalReturn(...args); }; // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const originalThrow = asyncIterableIterator.throw.bind(asyncIterableIterator); asyncIterableIterator.throw = (err) => { isRunning = false; finishedD.resolve(err); return originalThrow(err); }; return { pushValue, asyncIterableIterator }; } //# sourceMappingURL=makePushPullAsyncIterableIterator.js.map