8000 http2: add SETTINGS_ENABLE_CONNECT_PROTOCOL support by WeidiDeng · Pull Request #221 · golang/net · GitHub
[go: up one dir, main page]

Skip to content

http2: add SETTINGS_ENABLE_CONNECT_PROTOCOL support #221

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
http2: client side SETTINGS_ENABLE_CONNECT_PROTOCOL support
  • Loading branch information
WeidiDeng committed Sep 27, 2024
commit 4e0170fe1f5a5900d0df08ca5d33d29b02a6c4f0
4 changes: 4 additions & 0 deletions http2/http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ func (s Setting) Valid() error {
if s.Val < 16384 || s.Val > 1<<24-1 {
return ConnectionError(ErrCodeProtocol)
}
case SettingEnableConnectProtocol:
if s.Val != 1 && s.Val != 0 {
return ConnectionError(ErrCodeProtocol)
}
}
return nil
}
Expand Down
79 changes: 57 additions & 22 deletions http2/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,25 +335,26 @@ type ClientConn struct {
idleTimeout time.Duration // or 0 for never
idleTimer timer

mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow outflow // our conn-level flow control quota (cs.outflow is per stream)
inflow inflow // peer's conn-level flow control
doNotReuse bool // whether conn is marked to not be reused for any future requests
closing bool
closed bool
seenSettings bool // true if we've seen a settings frame, false otherwise
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
br *bufio.Reader
lastActive time.Time
lastIdle time.Time // time last idle
mu sync.Mutex // guards following
cond *sync.Cond // hold mu; broadcast on flow/closed changes
flow outflow // our conn-level flow control quota (cs.outflow is per stream)
inflow inflow // peer's conn-level flow control
doNotReuse bool // whether conn is marked to not be reused for any future requests
closing bool
closed bool
seenSettings bool // true if we've seen a settings frame, false otherwise
seenSettingsChan chan struct{} // closed when seenSettings is true or frame reading fails
wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
goAwayDebug string // goAway frame's debug data, retained as a string
streams map[uint32]*clientStream // client-initiated
streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
nextStreamID uint32
pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
pings map[[8]byte]chan struct{} // in flight ping data to notification channel
br *bufio.Reader
lastActive time.Time
lastIdle time.Time // time last idle
// Settings from peer: (also guarded by wmu)
maxFrameSize uint32
maxConcurrentStreams uint32
Expand All @@ -363,6 +364,7 @@ type ClientConn struct {
initialStreamRecvWindowSize int32
readIdleTimeout time.Duration
pingTimeout time.Duration
extendedConnecAllowed bool

// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
// Write to reqHeaderMu to lock it, read from it to unlock.
Expand Down Expand Up @@ -752,6 +754,7 @@ func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, erro
peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*clientStream),
singleUse: singleUse,
seenSettingsChan: make(chan struct{}),
wantSettingsAck: true,
readIdleTimeout: conf.SendPingTimeout,
pingTimeout: conf.PingTimeout,
Expand Down Expand Up @@ -1376,6 +1379,8 @@ func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)
cs.cleanupWriteRequest(err)
}

var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")

// writeRequest sends a request.
//
// It returns nil after the request is written, the response read,
Expand Down 8000 Expand Up @@ -1405,7 +1410,20 @@ func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStre
return ctx.Err()
}

// wait for setting frames to be received, a server can change this value later,
// but we just wait for the first settings frame
var isExtendedConnect bool
if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
isExtendedConnect = true
<-cc.seenSettingsChan
}

cc.mu.Lock()
if isExtendedConnect && !cc.extendedConnecAllowed {
cc.mu.Unlock()
<-cc.reqHeaderMu
return errExtendedConnectNotSupported
}
if cc.idleTimer != nil {
cc.idleTimer.Stop()
}
Expand Down Expand Up @@ -1910,7 +1928,7 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)

func validateHeaders(hdrs http.Header) string {
for k, vv := range hdrs {
if !httpguts.ValidHeaderFieldName(k) {
if !httpguts.ValidHeaderFieldName(k) && k != ":protocol" {
return fmt.Sprintf("name %q", k)
}
for _, v := range vv {
Expand All @@ -1926,6 +1944,10 @@ func validateHeaders(hdrs http.Header) string {

var errNilRequestURL = errors.New("http2: Request.URI is nil")

func isNormalConnect(req *http.Request) bool {
return req.Method == "CONNECT" && req.Header.Get(":protocol") == ""
}

// requires cc.wmu be held.
func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
cc.hbuf.Reset()
Expand All @@ -1946,7 +1968,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
}

var path string
if req.Method != "CONNECT" {
if !isNormalConnect(req) {
path = req.URL.RequestURI()
if !validPseudoPath(path) {
orig := path
Expand Down Expand Up @@ -1983,7 +2005,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
m = http.MethodGet
}
f(":method", m)
if req.Method != "CONNECT" {
if !isNormalConnect(req) {
f(":path", path)
f(":scheme", req.URL.Scheme)
}
Expand Down Expand Up @@ -2370,6 +2392,9 @@ func (rl *clientConnReadLoop) run() error {
if VerboseLogs {
cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
}
if !cc.seenSettings {
close(cc.seenSettingsChan)
}
return err
}
}
Expand Down Expand Up @@ -2917,6 +2942,15 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
case SettingHeaderTableSize:
cc.henc.SetMaxDynamicTableSize(s.Val)
cc.peerMaxHeaderTableSize = s.Val
case SettingEnableConnectProtocol:
if err := s.Valid(); err != nil {
return err
}
// RFC 8441 section, https://datatracker.ietf.org/doc/html/rfc8441#section-3
if s.Val == 0 && cc.extendedConnecAllowed {
return ConnectionError(ErrCodeProtocol)
}
cc.extendedConnecAllowed = s.Val == 1
default:
cc.vlogf("Unhandled Setting: %v", s)
}
Expand All @@ -2934,6 +2968,7 @@ func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
// connection can establish to our default.
cc.maxConcurrentStreams = defaultMaxConcurrentStreams
}
close(cc.seenSettingsChan)
cc.seenSettings = true
}

Expand Down
59 changes: 59 additions & 0 deletions http2/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5421,3 +5421,62 @@ func TestIssue67671(t *testing.T) {
res.Body.Close()
}
}

func TestExtendedConnectClientWithServerSupport(t *testing.T) {
disableExtendedConnectProtocol = false
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
t.Log(io.Copy(w, r.Body))
})
tr := &Transport{
TLSClientConfig: tlsConfigInsecure,
AllowHTTP: true,
}
defer tr.CloseIdleConnections()
pr, pw := io.Pipe()
pwDone := make(chan struct{})
req, _ := http.NewRequest("CONNECT", ts.URL, pr)
req.Header.Set(":protocol", "extended-connect")
go func() {
pw.Write([]byte("hello, extended connect"))
pw.Close()
close(pwDone)
}()

res, err := tr.RoundTrip(req)
if err != nil {
t.Fatal(err)
}
body, err := io.ReadAll(res.Body)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(body, []byte("hello, extended connect")) {
t.Fatal("unexpected body received")
}
}

func TestExtendedConnectClientWithoutServerSupport(t *testing.T) {
disableExtendedConnectProtocol = true
ts := newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
io.Copy(w, r.Body)
})
tr := &Transport{
TLSClientConfig: tlsConfigInsecure,
AllowHTTP: true,
}
defer tr.CloseIdleConnections()
pr, pw := io.Pipe()
pwDone := make(chan struct{})
req, _ := http.NewRequest("CONNECT", ts.URL, pr)
req.Header.Set(":protocol", "extended-connect")
go func() {
pw.Write([]byte("hello, extended connect"))
pw.Close()
close(pwDone)
}()

_, err := tr.RoundTrip(req)
if !errors.Is(err, errExtendedConnectNotSupported) {
t.Fatalf("expected error errExtendedConnectNotSupported, got: %v", err)
}
}
0