fers); // Retain chunks if (err === 0) req._chunks = chunks; afterWriteDispatched(req, err, cb); return req; } function writeGeneric(self, data, encoding, cb) { const req = createWriteWrap(self[kHandle], cb); const err = handleWriteReq(req, data, encoding); afterWriteDispatched(req, err, cb); return req; } function afterWriteDispatched(req, err, cb) { req.bytes = streamBaseState[kBytesWritten]; req.async = !!streamBaseState[kLastWriteWasAsync]; if (err !== 0) return cb(new ErrnoException(err, 'write', req.error)); if (!req.async && typeof req.callback === 'function') { req.callback(); } } function onStreamRead(arrayBuffer) { const nread = streamBaseState[kReadBytesOrError]; const handle = this; const stream = this[owner_symbol]; stream[kUpdateTimer](); if (nread > 0 && !stream.destroyed) { let ret; let result; const userBuf = stream[kBuffer]; if (userBuf) { result = (stream[kBufferCb](nread, userBuf) !== false); const bufGen = stream[kBufferGen]; if (bufGen !== null) { const nextBuf = bufGen(); if (isUint8Array(nextBuf)) stream[kBuffer] = ret = nextBuf; } } else { const offset = streamBaseState[kArrayBufferOffset]; const buf = new FastBuffer(arrayBuffer, offset, nread); result = stream.push(buf); } if (!result) { handle.reading = false; if (!stream.destroyed) { const err = handle.readStop(); if (err) stream.destroy(new ErrnoException(err, 'read')); } } return ret; } if (nread === 0) { return; } // After seeing EOF, most streams will be closed permanently, // and will not deliver any more read events after this point. // (equivalently, it should have called readStop on itself already). // Some streams may be reset and explicitly started again with a call // to readStart, such as TTY. if (nread !== UV_EOF) { // CallJSOnreadMethod expects the return value to be a buffer. // Ref: https://github.com/nodejs/node/pull/34375 stream.destroy(new ErrnoException(nread, 'read')); return; } // Defer this until we actually emit end if (stream._readableState.endEmitted) { if (stream[kMaybeDestroy]) stream[kMaybeDestroy](); } else { if (stream[kMaybeDestroy]) stream.on('end', stream[kMaybeDestroy]); // Push a null to signal the end of data. // Do it before `maybeDestroy` for correct order of events: // `end` -> `close` stream.push(null); stream.read(0); } } function setStreamTimeout(msecs, callback) { if (this.destroyed) return this; this.timeout = msecs; // Type checking identical to timers.enroll() msecs = getTimerDuration(msecs, 'msecs'); // Attempt to clear an existing timer in both cases - // even if it will be rescheduled we don't want to leak an existing timer. clearTimeout(this[kTimeout]); if (msecs === 0) { if (callback !== undefined) { validateFunction(callback, 'callback'); this.removeListener('timeout', callback); } } else { this[kTimeout] = setUnrefTimeout(this._onTimeout.bind(this), msecs); if (this[kSession]) this[kSession][kUpdateTimer](); if (this[kBoundSession]) this[kBoundSession][kUpdateTimer](); if (callback !== undefined) { validateFunction(callback, 'callback'); this.once('timeout', callback); } } return this; } module.exports = { writevGeneric, writeGeneric, onStreamRead, kAfterAsyncWrite, kMaybeDestroy, kUpdateTimer, kHandle, kSession, setStreamTimeout, kBuffer, kBufferCb, kBufferGen, };