errc: return pingError case <-ctx.Done(): return ctx.Err() case <-cc.readerDone: // connection closed return cc.readerErr } } func (rl *clientConnReadLoop) processPing(f *PingFrame) error { if f.IsAck() { cc := rl.cc defer cc.maybeCallStateHook() cc.mu.Lock() defer cc.mu.Unlock() // If ack, notify listener if any if c, ok := cc.pings[f.Data]; ok { close(c) delete(cc.pings, f.Data) } if cc.pendingResets > 0 { // See clientStream.cleanupWriteRequest. cc.pendingResets = 0 cc.rstStreamPingsBlocked = true cc.cond.Broadcast() } return nil } cc := rl.cc cc.wmu.Lock() defer cc.wmu.Unlock() if err := cc.fr.WritePing(true, f.Data); err != nil { return err } return cc.bw.Flush() } func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { // We told the peer we don't want them. // Spec says: // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH // setting of the peer endpoint is set to 0. An endpoint that // has set this setting and has received acknowledgement MUST // treat the receipt of a PUSH_PROMISE frame as a connection // error (Section 5.4.1) of type PROTOCOL_ERROR." return ConnectionError(ErrCodeProtocol) } // writeStreamReset sends a RST_STREAM frame. // When ping is true, it also sends a PING frame with a random payload. func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) { // TODO: map err to more interesting error codes, once the // HTTP community comes up with some. But currently for // RST_STREAM there's no equivalent to GOAWAY frame's debug // data, and the error codes are all pretty vague ("cancel"). cc.wmu.Lock() cc.fr.WriteRSTStream(streamID, code) if ping { var payload [8]byte rand.Read(payload[:]) cc.fr.WritePing(false, payload) } cc.bw.Flush() cc.wmu.Unlock() } var ( errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize ) func (cc *ClientConn) logf(format string, args ...interface{}) { cc.t.logf(format, args...) } func (cc *ClientConn) vlogf(format string, args ...interface{}) { cc.t.vlogf(format, args...) } func (t *Transport) vlogf(format string, args ...interface{}) { if VerboseLogs { t.logf(format, args...) } } func (t *Transport) logf(format string, args ...interface{}) { log.Printf(format, args...) } var noBody io.ReadCloser = noBodyReader{} type noBodyReader struct{} func (noBodyReader) Close() error { return nil } func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF } type missingBody struct{} func (missingBody) Close() error { return nil } func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF } func strSliceContains(ss []string, s string) bool { for _, v := range ss { if v == s { return true } } return false } type erringRoundTripper struct{ err error } func (rt erringRoundTripper) RoundTripErr() error { return rt.err } func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err } var errConcurrentReadOnResBody = errors.New("http2: concurrent read on response body") // gzipReader wraps a response body so it can lazily // get gzip.Reader from the pool on the first call to Read. // After Close is called it puts gzip.Reader to the pool immediately // if there is no Read in progress or later when Read completes. type gzipReader struct { _ incomparable body io.ReadCloser // underlying Response.Body mu sync.Mutex // guards zr and zerr zr *gzip.Reader // stores gzip reader from the pool between reads zerr error // sticky gzip reader init error or sentinel value to detect concurrent read and read after close } type eofReader struct{} func (eofReader) Read([]byte) (int, error) { return 0, io.EOF } func (eofReader) ReadByte() (byte, error) { return 0, io.EOF } var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }} // gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r. func gzipPoolGet(r io.Reader) (*gzip.Reader, error) { zr := gzipPool.Get().(*gzip.Reader) if err := zr.Reset(r); err != nil { gzipPoolPut(zr) return nil, err } return zr, nil } // gzipPoolPut puts a gzip.Reader back into the pool. func gzipPoolPut(zr *gzip.Reader) { // Reset will allocate bufio.Reader if we pass it anything // other than a flate.Reader, so ensure that it's getting one. var r flate.Reader = eofReader{} zr.Reset(r) gzipPool.Put(zr) } // acquire returns a gzip.Reader for reading response body. // The reader must be released after use. func (gz *gzipReader) acquire() (*gzip.Reader, error) { gz.mu.Lock() defer gz.mu.Unlock() if gz.zerr != nil { return nil, gz.zerr } if gz.zr == nil { gz.zr, gz.zerr = gzipPoolGet(gz.body) if gz.zerr != nil { return nil, gz.zerr } } ret := gz.zr gz.zr, gz.zerr = nil, errConcurrentReadOnResBody return ret, nil } // release returns the gzip.Reader to the pool if Close was called during Read. func (gz *gzipReader) release(zr *gzip.Reader) { gz.mu.Lock() defer gz.mu.Unlock() if gz.zerr == errConcurrentReadOnResBody { gz.zr, gz.zerr = zr, nil } else { // fs.ErrClosed gzipPoolPut(zr) } } // close returns the gzip.Reader to the pool immediately or // signals release to do so after Read completes. func (gz *gzipReader) close() { gz.mu.Lock() defer gz.mu.Unlock() if gz.zerr == nil && gz.zr != nil { gzipPoolPut(gz.zr) gz.zr = nil } gz.zerr = fs.ErrClosed } func (gz *gzipReader) Read(p []byte) (n int, err error) { zr, err := gz.acquire() if err != nil { return 0, err } defer gz.release(zr) return zr.Read(p) } func (gz *gzipReader) Close() error { gz.close() return gz.body.Close() } type errorReader struct{ err error } func (r errorReader) Read(p []byte) (int, error) { return 0, r.err } // isConnectionCloseRequest reports whether req should use its own // connection for a single request and then close the connection. func isConnectionCloseRequest(req *http.Request) bool { return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close") } // registerHTTPSProtocol calls Transport.RegisterProtocol but // converting panics into errors. func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) { defer func() { if e := recover(); e != nil { err = fmt.Errorf("%v", e) } }() t.RegisterProtocol("https", rt) return nil } // noDialH2RoundTripper is a RoundTripper which only tries to complete the request // if there's already a cached connection to the host. // (The field is exported so it can be accessed via reflect from net/http; tested // by TestNoDialH2RoundTripperType) // // A noDialH2RoundTripper is registered with http1.Transport.RegisterProtocol, // and the http1.Transport can use type assertions to call non-RoundTrip methods on it. // This lets us expose, for example, NewClientConn to net/http. type noDialH2RoundTripper struct{ *Transport } func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { res, err := rt.Transport.RoundTrip(req) if isNoCachedConnError(err) { return nil, http.ErrSkipAltProtocol } return res, err } func (rt noDialH2RoundTripper) NewClientConn(conn net.Conn, internalStateHook func()) (http.RoundTripper, error) { tr := rt.Transport cc, err := tr.newClientConn(conn, tr.disableKeepAlives(), internalStateHook) if err != nil { return nil, err } // RoundTrip should block when the conn is at its concurrency limit, // not return an error. Setting strictMaxConcurrentStreams enables this. cc.strictMaxConcurrentStreams = true return netHTTPClientConn{cc}, nil } // netHTTPClientConn wraps ClientConn and implements the interface net/http expects from // the RoundTripper returned by NewClientConn. type netHTTPClientConn struct { cc *ClientConn } func (cc netHTTPClientConn) RoundTrip(req *http.Request) (*http.Response, error) { return cc.cc.RoundTrip(req) } func (cc netHTTPClientConn) Close() error { return cc.cc.Close() } func (cc netHTTPClientConn) Err() error { cc.cc.mu.Lock() defer cc.cc.mu.Unlock() if cc.cc.closed { return errors.New("connection closed") } return nil } func (cc netHTTPClientConn) Reserve() error { defer cc.cc.maybeCallStateHook() cc.cc.mu.Lock() defer cc.cc.mu.Unlock() if !cc.cc.canReserveLocked() { return errors.New("connection is unavailable") } cc.cc.streamsReserved++ return nil } func (cc netHTTPClientConn) Release() { defer cc.cc.maybeCallStateHook() cc.cc.mu.Lock() defer cc.cc.mu.Unlock() // We don't complain if streamsReserved is 0. // // This is consistent with RoundTrip: both Release and RoundTrip will // consume a reservation iff one exists. if cc.cc.streamsReserved > 0 { cc.cc.streamsReserved-- } } func (cc netHTTPClientConn) Available() int { cc.cc.mu.Lock() defer cc.cc.mu.Unlock() return cc.cc.availableLocked() } func (cc netHTTPClientConn) InFlight() int { cc.cc.mu.Lock() defer cc.cc.mu.Unlock() return cc.cc.currentRequestCountLocked() } func (cc *ClientConn) maybeCallStateHook() { if cc.internalStateHook != nil { cc.internalStateHook() } } func (t *Transport) idleConnTimeout() time.Duration { // to keep things backwards compatible, we use non-zero values of // IdleConnTimeout, followed by using the IdleConnTimeout on the underlying // http1 transport, followed by 0 if t.IdleConnTimeout != 0 { return t.IdleConnTimeout } if t.t1 != nil { return t.t1.IdleConnTimeout } return 0 } func traceGetConn(req *http.Request, hostPort string) { trace := httptrace.ContextClientTrace(req.Context()) if trace == nil || trace.GetConn == nil { return } trace.GetConn(hostPort) } func traceGotConn(req *http.Request, cc *ClientConn, reused bool) { trace := httptrace.ContextClientTrace(req.Context()) if trace == nil || trace.GotConn == nil { return } ci := httptrace.GotConnInfo{Conn: cc.tconn} ci.Reused = reused cc.mu.Lock() ci.WasIdle = len(cc.streams) == 0 && reused if ci.WasIdle && !cc.lastActive.IsZero() { ci.IdleTime = time.Since(cc.lastActive) } cc.mu.Unlock() trace.GotConn(ci) } func traceWroteHeaders(trace *httptrace.ClientTrace) { if trace != nil && trace.WroteHeaders != nil { trace.WroteHeaders() } } func traceGot100Continue(trace *httptrace.ClientTrace) { if trace != nil && trace.Got100Continue != nil { trace.Got100Continue() } } func traceWait100Continue(trace *httptrace.ClientTrace) { if trace != nil && trace.Wait100Continue != nil { trace.Wait100Continue() } } func traceWroteRequest(trace *httptrace.ClientTrace, err error) { if trace != nil && trace.WroteRequest != nil { trace.WroteRequest(httptrace.WroteRequestInfo{Err: err}) } } func traceFirstResponseByte(trace *httptrace.ClientTrace) { if trace != nil && trace.GotFirstResponseByte != nil { trace.GotFirstResponseByte() } } func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error { if trace != nil { return trace.Got1xxResponse } return nil } // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS // connection. func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) { dialer := &tls.Dialer{ Config: cfg, } cn, err := dialer.DialContext(ctx, network, addr) if err != nil { return nil, err } tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed return tlsCn, nil }