diff --git a/balancer/leastrequest/leastrequest.go b/balancer/leastrequest/leastrequest.go index dd46dfa8faa4..f758f954e9f1 100644 --- a/balancer/leastrequest/leastrequest.go +++ b/balancer/leastrequest/leastrequest.go @@ -97,11 +97,8 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba } type leastRequestBalancer struct { - // Embeds balancer.Balancer because needs to intercept UpdateClientConnState - // to learn about choiceCount. - balancer.Balancer - // Embeds balancer.ClientConn because needs to intercept UpdateState calls - // from the child balancer. + // Embeds balancer.ClientConn because we need to intercept UpdateState + // calls from the child balancer. balancer.ClientConn child balancer.Balancer logger *internalgrpclog.PrefixLogger @@ -118,6 +115,21 @@ func (lrb *leastRequestBalancer) Close() { lrb.endpointRPCCounts = nil } +func (lrb *leastRequestBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + lrb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state) +} + +func (lrb *leastRequestBalancer) ResolverError(err error) { + // Will cause inline picker update from endpoint sharding. + lrb.child.ResolverError(err) +} + +func (lrb *leastRequestBalancer) ExitIdle() { + if ei, ok := lrb.child.(balancer.ExitIdler); ok { // Should always be ok, as child is endpoint sharding. + ei.ExitIdle() + } +} + func (lrb *leastRequestBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { lrCfg, ok := ccs.BalancerConfig.(*LBConfig) if !ok { diff --git a/balancer/leastrequest/balancer_test.go b/balancer/leastrequest/leastrequest_test.go similarity index 91% rename from balancer/leastrequest/balancer_test.go rename to balancer/leastrequest/leastrequest_test.go index e0043db391c8..d8073470f79b 100644 --- a/balancer/leastrequest/balancer_test.go +++ b/balancer/leastrequest/leastrequest_test.go @@ -27,12 +27,13 @@ import ( "time" "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" testgrpc "google.golang.org/grpc/interop/grpc_testing" testpb "google.golang.org/grpc/interop/grpc_testing" "google.golang.org/grpc/peer" @@ -42,7 +43,8 @@ import ( ) const ( - defaultTestTimeout = 5 * time.Second + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond ) type s struct { @@ -706,3 +708,63 @@ func (s) TestLeastRequestEndpoints_MultipleAddresses(t *testing.T) { t.Fatalf("error in expected round robin: %v", err) } } + +// Test tests that the least request balancer properly surfaces resolver +// errors. +func (s) TestLeastRequestEndpoints_ResolverError(t *testing.T) { + const sc = `{"loadBalancingConfig": [{"least_request_experimental": {}}]}` + mr := manual.NewBuilderWithScheme("lr-e2e") + defer mr.Close() + + cc, err := grpc.NewClient( + mr.Scheme()+":///", + grpc.WithResolvers(mr), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(sc), + ) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + + // We need to pass an endpoint with a valid address to the resolver before + // reporting an error - otherwise endpointsharding does not report the + // error through. + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + // Act like a server that closes the connection without sending a server + // preface. + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("Unexpected error when accepting a connection: %v", err) + } + conn.Close() + }() + mr.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}}, + }) + cc.Connect() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure) + + // Report an error through the resolver + resolverErr := fmt.Errorf("simulated resolver error") + mr.CC().ReportError(resolverErr) + + // Ensure the client returns the expected resolver error. + testServiceClient := testgrpc.NewTestServiceClient(cc) + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + _, err = testServiceClient.EmptyCall(ctx, &testpb.Empty{}) + if strings.Contains(err.Error(), resolverErr.Error()) { + break + } + } + if ctx.Err() != nil { + t.Fatalf("Timeout when waiting for RPCs to fail with error containing %s. Last error: %v", resolverErr, err) + } +} diff --git a/internal/resolver/delegatingresolver/delegatingresolver.go b/internal/resolver/delegatingresolver/delegatingresolver.go index 7b93f692be01..20b8fb098ac3 100644 --- a/internal/resolver/delegatingresolver/delegatingresolver.go +++ b/internal/resolver/delegatingresolver/delegatingresolver.go @@ -28,6 +28,8 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/proxyattributes" + "google.golang.org/grpc/internal/transport" + "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -40,7 +42,7 @@ var ( // delegatingResolver manages both target URI and proxy address resolution by // delegating these tasks to separate child resolvers. Essentially, it acts as -// a intermediary between the gRPC ClientConn and the child resolvers. +// an intermediary between the gRPC ClientConn and the child resolvers. // // It implements the [resolver.Resolver] interface. type delegatingResolver struct { @@ -48,6 +50,9 @@ type delegatingResolver struct { cc resolver.ClientConn // gRPC ClientConn proxyURL *url.URL // proxy URL, derived from proxy environment and target + // We do not hold both mu and childMu in the same goroutine. Avoid holding + // both locks when calling into the child, as the child resolver may + // synchronously callback into the channel. mu sync.Mutex // protects all the fields below targetResolverState *resolver.State // state of the target resolver proxyAddrs []resolver.Address // resolved proxy addresses; empty if no proxy is configured @@ -66,8 +71,8 @@ func (nopResolver) ResolveNow(resolver.ResolveNowOptions) {} func (nopResolver) Close() {} -// proxyURLForTarget determines the proxy URL for the given address based on -// the environment. It can return the following: +// proxyURLForTarget determines the proxy URL for the given address based on the +// environment. It can return the following: // - nil URL, nil error: No proxy is configured or the address is excluded // using the `NO_PROXY` environment variable or if req.URL.Host is // "localhost" (with or without // a port number) @@ -86,7 +91,8 @@ func proxyURLForTarget(address string) (*url.URL, error) { // resolvers: // - one to resolve the proxy address specified using the supported // environment variables. This uses the registered resolver for the "dns" -// scheme. +// scheme. It is lazily built when a target resolver update contains at least +// one TCP address. // - one to resolve the target URI using the resolver specified by the scheme // in the target URI or specified by the user using the WithResolvers dial // option. As a special case, if the target URI's scheme is "dns" and a @@ -95,8 +101,10 @@ func proxyURLForTarget(address string) (*url.URL, error) { // resolution is enabled using the dial option. func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions, targetResolverBuilder resolver.Builder, targetResolutionEnabled bool) (resolver.Resolver, error) { r := &delegatingResolver{ - target: target, - cc: cc, + target: target, + cc: cc, + proxyResolver: nopResolver{}, + targetResolver: nopResolver{}, } var err error @@ -123,37 +131,26 @@ func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOpti // resolution should be handled by the proxy, not the client. Therefore, we // bypass the target resolver and store the unresolved target address. if target.URL.Scheme == "dns" && !targetResolutionEnabled { - state := resolver.State{ + r.targetResolverState = &resolver.State{ Addresses: []resolver.Address{{Addr: target.Endpoint()}}, Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}}, } - r.targetResolverState = &state - } else { - wcc := &wrappingClientConn{ - stateListener: r.updateTargetResolverState, - parent: r, - } - if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil { - return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err) - } - } - - if r.proxyResolver, err = r.proxyURIResolver(opts); err != nil { - return nil, fmt.Errorf("delegating_resolver: failed to build resolver for proxy URL %q: %v", r.proxyURL, err) + r.updateTargetResolverState(*r.targetResolverState) + return r, nil } - - if r.targetResolver == nil { - r.targetResolver = nopResolver{} + wcc := &wrappingClientConn{ + stateListener: r.updateTargetResolverState, + parent: r, } - if r.proxyResolver == nil { - r.proxyResolver = nopResolver{} + if r.targetResolver, err = targetResolverBuilder.Build(target, wcc, opts); err != nil { + return nil, fmt.Errorf("delegating_resolver: unable to build the resolver for target %s: %v", target, err) } return r, nil } -// proxyURIResolver creates a resolver for resolving proxy URIs using the -// "dns" scheme. It adjusts the proxyURL to conform to the "dns:///" format and -// builds a resolver with a wrappingClientConn to capture resolved addresses. +// proxyURIResolver creates a resolver for resolving proxy URIs using the "dns" +// scheme. It adjusts the proxyURL to conform to the "dns:///" format and builds +// a resolver with a wrappingClientConn to capture resolved addresses. func (r *delegatingResolver) proxyURIResolver(opts resolver.BuildOptions) (resolver.Resolver, error) { proxyBuilder := resolver.Get("dns") if proxyBuilder == nil { @@ -189,18 +186,58 @@ func (r *delegatingResolver) Close() { r.proxyResolver = nil } -// updateClientConnStateLocked creates a list of combined addresses by -// pairing each proxy address with every target address. For each pair, it -// generates a new [resolver.Address] using the proxy address, and adding the -// target address as the attribute along with user info. It returns nil if -// either resolver has not sent update even once and returns the error from -// ClientConn update once both resolvers have sent update atleast once. +func needsProxyResolver(state *resolver.State) bool { + for _, addr := range state.Addresses { + if !skipProxy(addr) { + return true + } + } + for _, endpoint := range state.Endpoints { + for _, addr := range endpoint.Addresses { + if !skipProxy(addr) { + return true + } + } + } + return false +} + +func skipProxy(address resolver.Address) bool { + // Avoid proxy when network is not tcp. + networkType, ok := networktype.Get(address) + if !ok { + networkType, _ = transport.ParseDialTarget(address.Addr) + } + if networkType != "tcp" { + return true + } + + req := &http.Request{URL: &url.URL{ + Scheme: "https", + Host: address.Addr, + }} + // Avoid proxy when address included in `NO_PROXY` environment variable or + // fails to get the proxy address. + url, err := HTTPSProxyFromEnvironment(req) + if err != nil || url == nil { + return true + } + return false +} + +// updateClientConnStateLocked constructs a combined list of addresses by +// pairing each proxy address with every target address of type TCP. For each +// pair, it creates a new [resolver.Address] using the proxy address and +// attaches the corresponding target address and user info as attributes. Target +// addresses that are not of type TCP are appended to the list as-is. The +// function returns nil if either resolver has not yet provided an update, and +// returns the result of ClientConn.UpdateState once both resolvers have +// provided at least one update. func (r *delegatingResolver) updateClientConnStateLocked() error { if r.targetResolverState == nil || r.proxyAddrs == nil { return nil } - curState := *r.targetResolverState // If multiple resolved proxy addresses are present, we send only the // unresolved proxy host and let net.Dial handle the proxy host name // resolution when creating the transport. Sending all resolved addresses @@ -218,24 +255,29 @@ func (r *delegatingResolver) updateClientConnStateLocked() error { } var addresses []resolver.Address for _, targetAddr := range (*r.targetResolverState).Addresses { + if skipProxy(targetAddr) { + addresses = append(addresses, targetAddr) + continue + } addresses = append(addresses, proxyattributes.Set(proxyAddr, proxyattributes.Options{ User: r.proxyURL.User, ConnectAddr: targetAddr.Addr, })) } - // Create a list of combined endpoints by pairing all proxy endpoints - // with every target endpoint. Each time, it constructs a new - // [resolver.Endpoint] using the all addresses from all the proxy endpoint - // and the target addresses from one endpoint. The target address and user - // information from the proxy URL are added as attributes to the proxy - // address.The resulting list of addresses is then grouped into endpoints, - // covering all combinations of proxy and target endpoints. + // For each target endpoint, construct a new [resolver.Endpoint] that + // includes all addresses from all proxy endpoints and the addresses from + // that target endpoint, preserving the number of target endpoints. var endpoints []resolver.Endpoint for _, endpt := range (*r.targetResolverState).Endpoints { var addrs []resolver.Address - for _, proxyAddr := range r.proxyAddrs { - for _, targetAddr := range endpt.Addresses { + for _, targetAddr := range endpt.Addresses { + // Avoid proxy when network is not tcp. + if skipProxy(targetAddr) { + addrs = append(addrs, targetAddr) + continue + } + for _, proxyAddr := range r.proxyAddrs { addrs = append(addrs, proxyattributes.Set(proxyAddr, proxyattributes.Options{ User: r.proxyURL.User, ConnectAddr: targetAddr.Addr, @@ -246,8 +288,9 @@ func (r *delegatingResolver) updateClientConnStateLocked() error { } // Use the targetResolverState for its service config and attributes // contents. The state update is only sent after both the target and proxy - // resolvers have sent their updates, and curState has been updated with - // the combined addresses. + // resolvers have sent their updates, and curState has been updated with the + // combined addresses. + curState := *r.targetResolverState curState.Addresses = addresses curState.Endpoints = endpoints return r.cc.UpdateState(curState) @@ -257,7 +300,8 @@ func (r *delegatingResolver) updateClientConnStateLocked() error { // addresses and endpoints, marking the resolver as ready, and triggering a // state update if both proxy and target resolvers are ready. If the ClientConn // returns a non-nil error, it calls `ResolveNow()` on the target resolver. It -// is a StateListener function of wrappingClientConn passed to the proxy resolver. +// is a StateListener function of wrappingClientConn passed to the proxy +// resolver. func (r *delegatingResolver) updateProxyResolverState(state resolver.State) error { r.mu.Lock() defer r.mu.Unlock() @@ -265,8 +309,8 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro logger.Infof("Addresses received from proxy resolver: %s", state.Addresses) } if len(state.Endpoints) > 0 { - // We expect exactly one address per endpoint because the proxy - // resolver uses "dns" resolution. + // We expect exactly one address per endpoint because the proxy resolver + // uses "dns" resolution. r.proxyAddrs = make([]resolver.Address, 0, len(state.Endpoints)) for _, endpoint := range state.Endpoints { r.proxyAddrs = append(r.proxyAddrs, endpoint.Addresses...) @@ -294,11 +338,14 @@ func (r *delegatingResolver) updateProxyResolverState(state resolver.State) erro return err } -// updateTargetResolverState updates the target resolver state by storing target -// addresses, endpoints, and service config, marking the resolver as ready, and -// triggering a state update if both resolvers are ready. If the ClientConn -// returns a non-nil error, it calls `ResolveNow()` on the proxy resolver. It -// is a StateListener function of wrappingClientConn passed to the target resolver. +// updateTargetResolverState is the StateListener function provided to the +// target resolver via wrappingClientConn. It updates the resolver state and +// marks the target resolver as ready. If the update includes at least one TCP +// address and the proxy resolver has not yet been constructed, it initializes +// the proxy resolver. A combined state update is triggered once both resolvers +// are ready. If all addresses are non-TCP, it proceeds without waiting for the +// proxy resolver. If ClientConn.UpdateState returns a non-nil error, +// ResolveNow() is called on the proxy resolver. func (r *delegatingResolver) updateTargetResolverState(state resolver.State) error { r.mu.Lock() defer r.mu.Unlock() @@ -307,6 +354,32 @@ func (r *delegatingResolver) updateTargetResolverState(state resolver.State) err logger.Infof("Addresses received from target resolver: %v", state.Addresses) } r.targetResolverState = &state + // If all addresses returned by the target resolver have a non-TCP network + // type, or are listed in the `NO_PROXY` environment variable, do not wait + // for proxy update. + if !needsProxyResolver(r.targetResolverState) { + return r.cc.UpdateState(*r.targetResolverState) + } + + // The proxy resolver may be rebuilt multiple times, specifically each time + // the target resolver sends an update, even if the target resolver is built + // successfully but building the proxy resolver fails. + if len(r.proxyAddrs) == 0 { + go func() { + r.childMu.Lock() + defer r.childMu.Unlock() + if _, ok := r.proxyResolver.(nopResolver); !ok { + return + } + proxyResolver, err := r.proxyURIResolver(resolver.BuildOptions{}) + if err != nil { + r.cc.ReportError(fmt.Errorf("delegating_resolver: unable to build the proxy resolver: %v", err)) + return + } + r.proxyResolver = proxyResolver + }() + } + err := r.updateClientConnStateLocked() if err != nil { go func() { @@ -335,7 +408,8 @@ func (wcc *wrappingClientConn) UpdateState(state resolver.State) error { return wcc.stateListener(state) } -// ReportError intercepts errors from the child resolvers and passes them to ClientConn. +// ReportError intercepts errors from the child resolvers and passes them to +// ClientConn. func (wcc *wrappingClientConn) ReportError(err error) { wcc.parent.cc.ReportError(err) } @@ -346,8 +420,8 @@ func (wcc *wrappingClientConn) NewAddress(addrs []resolver.Address) { wcc.UpdateState(resolver.State{Addresses: addrs}) } -// ParseServiceConfig parses the provided service config and returns an -// object that provides the parsed config. +// ParseServiceConfig parses the provided service config and returns an object +// that provides the parsed config. func (wcc *wrappingClientConn) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult { return wcc.parent.cc.ParseServiceConfig(serviceConfigJSON) } diff --git a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go index 148b44816614..c14d420940d9 100644 --- a/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go +++ b/internal/resolver/delegatingresolver/delegatingresolver_ext_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/internal/proxyattributes" "google.golang.org/grpc/internal/resolver/delegatingresolver" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" @@ -130,15 +131,29 @@ func (s) TestDelegatingResolverNoProxyEnvVarsSet(t *testing.T) { // overwriting the previously registered DNS resolver. This allows the test to // mock the DNS resolution for the proxy resolver. It also registers the // original DNS resolver after the test is done. -func setupDNS(t *testing.T) *manual.Resolver { +func setupDNS(t *testing.T) (*manual.Resolver, chan struct{}) { t.Helper() mr := manual.NewBuilderWithScheme("dns") dnsResolverBuilder := resolver.Get("dns") resolver.Register(mr) + resolverBuilt := make(chan struct{}) + mr.BuildCallback = func(resolver.Target, resolver.ClientConn, resolver.BuildOptions) { + close(resolverBuilt) + } + t.Cleanup(func() { resolver.Register(dnsResolverBuilder) }) - return mr + return mr, resolverBuilt +} + +func mustBuildResolver(ctx context.Context, t *testing.T, buildCh chan struct{}) { + t.Helper() + select { + case <-buildCh: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for resolver to be built.") + } } // proxyAddressWithTargetAttribute creates a resolver.Address for the proxy, @@ -149,6 +164,19 @@ func proxyAddressWithTargetAttribute(proxyAddr string, targetAddr string) resolv return addr } +func overrideTestHTTPSProxy(t *testing.T, proxyAddr string) { + t.Helper() + hpfe := func(req *http.Request) (*url.URL, error) { + return &url.URL{ + Scheme: "https", + Host: proxyAddr, + }, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + t.Cleanup(func() { delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe }) +} + // Tests the scenario where proxy is configured and the target URI contains the // "dns" scheme and target resolution is enabled. The test verifies that the // addresses returned by the delegating resolver combines the addresses @@ -161,34 +189,24 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithTargetResolution(t *testing.T) envProxyAddr = "proxytest.com" resolvedProxyTestAddr1 = "11.11.11.11:7687" ) - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == targetTestAddr { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr) - return nil, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) + // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("dns") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, true); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{ + {Addr: resolvedTargetTestAddr1}, + {Addr: resolvedTargetTestAddr2}, + }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -198,11 +216,13 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithTargetResolution(t *testing.T) case <-time.After(defaultTestShortTimeout): } - targetResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{ - {Addr: resolvedTargetTestAddr1}, - {Addr: resolvedTargetTestAddr2}, - }, + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -217,8 +237,8 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithTargetResolution(t *testing.T) var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timeed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -237,32 +257,23 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithNoTargetResolution(t *testing. envProxyAddr = "proxytest.com" resolvedProxyTestAddr1 = "11.11.11.11:7687" ) - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == targetTestAddr { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr) - return nil, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) targetResolver := manual.NewBuilderWithScheme("dns") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) proxyResolver.UpdateState(resolver.State{ Addresses: []resolver.Address{ {Addr: resolvedProxyTestAddr1}, @@ -277,8 +288,8 @@ func (s) TestDelegatingResolverwithDNSAndProxyWithNoTargetResolution(t *testing. var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -298,35 +309,24 @@ func (s) TestDelegatingResolverwithCustomResolverAndProxy(t *testing.T) { envProxyAddr = "proxytest.com" resolvedProxyTestAddr1 = "11.11.11.11:7687" ) - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == targetTestAddr { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr) - return nil, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("test") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{ + {Addr: resolvedTargetTestAddr1}, + {Addr: resolvedTargetTestAddr2}, + }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -336,11 +336,13 @@ func (s) TestDelegatingResolverwithCustomResolverAndProxy(t *testing.T) { case <-time.After(defaultTestShortTimeout): } - targetResolver.UpdateState(resolver.State{ - Addresses: []resolver.Address{ - {Addr: resolvedTargetTestAddr1}, - {Addr: resolvedTargetTestAddr2}, - }, + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -354,8 +356,8 @@ func (s) TestDelegatingResolverwithCustomResolverAndProxy(t *testing.T) { var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -380,46 +382,19 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { resolvedProxyTestAddr1 = "11.11.11.11:7687" resolvedProxyTestAddr2 = "22.22.22.22:7687" ) - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == targetTestAddr { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr) - return nil, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("test") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ - Endpoints: []resolver.Endpoint{ - {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}}, - {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr2}}}, - }, - ServiceConfig: &serviceconfig.ParseResult{}, - }) - - select { - case <-stateCh: - t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.") - case <-time.After(defaultTestShortTimeout): - } targetResolver.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{ { @@ -435,22 +410,39 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { }, ServiceConfig: &serviceconfig.ParseResult{}, }) + select { + case <-stateCh: + t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.") + case <-time.After(defaultTestShortTimeout): + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{ + {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr1}}}, + {Addresses: []resolver.Address{{Addr: resolvedProxyTestAddr2}}}, + }, + ServiceConfig: &serviceconfig.ParseResult{}, + }) wantState := resolver.State{ Endpoints: []resolver.Endpoint{ { Addresses: []resolver.Address{ proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr1), - proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr2), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr1), + proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr2), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr2), }, }, { Addresses: []resolver.Address{ proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr3), - proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr4), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr3), + proxyAddressWithTargetAttribute(resolvedProxyTestAddr1, resolvedTargetTestAddr4), proxyAddressWithTargetAttribute(resolvedProxyTestAddr2, resolvedTargetTestAddr4), }, }, @@ -460,8 +452,8 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Contex timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -474,7 +466,7 @@ func (s) TestDelegatingResolverForEndpointsWithProxy(t *testing.T) { // The test verifies that the delegating resolver combines unresolved proxy // host and target addresses correctly, returning addresses with the proxy host // populated and the target address included as an attribute. -func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { +func (s) TestDelegatingResolverForMultipleProxyAddress(t *testing.T) { const ( targetTestAddr = "test.com" resolvedTargetTestAddr1 = "1.1.1.1:8080" @@ -483,37 +475,22 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { resolvedProxyTestAddr1 = "11.11.11.11:7687" resolvedProxyTestAddr2 = "22.22.22.22:7687" ) - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == targetTestAddr { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, targetTestAddr) - return nil, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("test") target := targetResolver.Scheme() + ":///" + targetTestAddr // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) - + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, stateCh, _ := createTestResolverClientConn(t) if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { t.Fatalf("Failed to create delegating resolver: %v", err) } - proxyResolver.UpdateState(resolver.State{ + targetResolver.UpdateState(resolver.State{ Addresses: []resolver.Address{ - {Addr: resolvedProxyTestAddr1}, - {Addr: resolvedProxyTestAddr2}, + {Addr: resolvedTargetTestAddr1}, + {Addr: resolvedTargetTestAddr2}, }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -524,10 +501,15 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { case <-time.After(defaultTestShortTimeout): } - targetResolver.UpdateState(resolver.State{ + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ Addresses: []resolver.Address{ - {Addr: resolvedTargetTestAddr1}, - {Addr: resolvedTargetTestAddr2}, + {Addr: resolvedProxyTestAddr1}, + {Addr: resolvedProxyTestAddr2}, }, ServiceConfig: &serviceconfig.ParseResult{}, }) @@ -542,8 +524,8 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { var gotState resolver.State select { case gotState = <-stateCh: - case <-time.After(defaultTestTimeout): - t.Fatal("Timeout when waiting for a state update from the delegating resolver") + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") } if diff := cmp.Diff(gotState, wantState); diff != "" { @@ -560,17 +542,7 @@ func (s) TestDelegatingResolverForMutipleProxyAddress(t *testing.T) { func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { const envProxyAddr = "proxytest.com" - hpfe := func(req *http.Request) (*url.URL, error) { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("test") @@ -586,9 +558,9 @@ func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { target := targetResolver.Scheme() + ":///" + "ignored" // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) - unblockProxyResolverClose := make(chan struct{}) + unblockProxyResolverClose := make(chan struct{}, 1) proxyResolver.CloseCallback = func() { <-unblockProxyResolverClose t.Log("Proxy resolver is closed.") @@ -607,11 +579,13 @@ func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) + // Wait for the proxy resolver to be built before calling Close. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + mustBuildResolver(ctx, t, proxyResolverBuilt) // Closing the delegating resolver will block until the test writes to the // unblockProxyResolverClose channel. go dr.Close() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() select { case <-targetResolverCloseCalled: case <-ctx.Done(): @@ -621,9 +595,13 @@ func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { // Updating the channel will result in an error being returned. Since the // target resolver's Close method is already called, the delegating resolver // must not call "ResolveNow" on it. - go proxyResolver.UpdateState(resolver.State{ - Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, - }) + proxyUpdateCh := make(chan struct{}) + go func() { + proxyResolver.UpdateState(resolver.State{ + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, + }) + close(proxyUpdateCh) + }() unblockProxyResolverClose <- struct{}{} select { @@ -631,6 +609,15 @@ func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { t.Fatalf("targetResolver.ResolveNow() called unexpectedly.") case <-time.After(defaultTestShortTimeout): } + // Wait for the proxy update to complete before returning from the test and + // before the deferred reassignment of + // delegatingresolver.HTTPSProxyFromEnvironment. This ensures that we read + // from the function before it is reassigned, preventing a race condition. + select { + case <-proxyUpdateCh: + case <-ctx.Done(): + t.Fatalf("Context timed out waiting for proxyResolver.UpdateState() to be called.") + } } // Tests that calling cc.UpdateState in a blocking manner from a child resolver @@ -642,17 +629,7 @@ func (s) TestDelegatingResolverUpdateStateDuringClose(t *testing.T) { func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { const envProxyAddr = "proxytest.com" - hpfe := func(req *http.Request) (*url.URL, error) { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("test") @@ -667,7 +644,7 @@ func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { target := targetResolver.Scheme() + ":///" + "ignored" // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) tcc, _, _ := createTestResolverClientConn(t) tcc.UpdateStateF = func(resolver.State) error { @@ -682,14 +659,18 @@ func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + // Updating the channel will result in an error being returned. The // delegating resolver should call call "ResolveNow" on the target resolver. proxyResolver.UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() select { case <-targetResolverCalled: case <-ctx.Done(): @@ -702,32 +683,23 @@ func (s) TestDelegatingResolverUpdateStateFromResolveNow(t *testing.T) { func (s) TestDelegatingResolverResolveNow(t *testing.T) { const envProxyAddr = "proxytest.com" - hpfe := func(req *http.Request) (*url.URL, error) { - return &url.URL{ - Scheme: "https", - Host: envProxyAddr, - }, nil - } - originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { - delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe - }() + overrideTestHTTPSProxy(t, envProxyAddr) // Manual resolver to control the target resolution. targetResolver := manual.NewBuilderWithScheme("test") - targetResolverCalled := make(chan struct{}) + targetResolverCalled := make(chan struct{}, 1) targetResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { // Updating the resolver state should not deadlock. targetResolver.CC().UpdateState(resolver.State{ Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: "1.1.1.1"}}}}, }) - close(targetResolverCalled) + targetResolverCalled <- struct{}{} } target := targetResolver.Scheme() + ":///" + "ignored" // Set up a manual DNS resolver to control the proxy address resolution. - proxyResolver := setupDNS(t) + proxyResolver, proxyResolverBuilt := setupDNS(t) + proxyResolverCalled := make(chan struct{}) proxyResolver.ResolveNowCallback = func(resolver.ResolveNowOptions) { // Updating the resolver state should not deadlock. @@ -743,12 +715,23 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) { t.Fatalf("Failed to create delegating resolver: %v", err) } - // Call ResolveNow on the delegatingResolver and verify both children - // receive the ResolveNow call. + // ResolveNow of manual proxy resolver will not be called. Proxy resolver is + // only built when we get the first update from target resolver. Therefore + // in the first ResolveNow, proxy resolver will be a no-op resolver and only + // target resolver's ResolveNow will be called. dr.ResolveNow(resolver.ResolveNowOptions{}) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + select { + case <-targetResolverCalled: + case <-ctx.Done(): + t.Fatalf("context timed out waiting for targetResolver.ResolveNow() to be called.") + } + + mustBuildResolver(ctx, t, proxyResolverBuilt) + + dr.ResolveNow(resolver.ResolveNowOptions{}) + select { case <-targetResolverCalled: case <-ctx.Done(): @@ -760,3 +743,205 @@ func (s) TestDelegatingResolverResolveNow(t *testing.T) { t.Fatalf("context timed out waiting for proxyResolver.ResolveNow() to be called.") } } + +// Tests the scenario where a proxy is configured, and the resolver returns a +// network type other than tcp for all addresses. The test verifies that the +// delegating resolver avoids the proxy build and directly sends the update +// from target resolver to clientconn. +func (s) TestDelegatingResolverForNonTCPTarget(t *testing.T) { + const ( + targetTestAddr = "test.target" + resolvedTargetTestAddr1 = "1.1.1.1:8080" + envProxyAddr = "proxytest.com" + ) + overrideTestHTTPSProxy(t, envProxyAddr) + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + target := targetResolver.Scheme() + ":///" + targetTestAddr + // Set up a manual DNS resolver to control the proxy address resolution. + _, proxyResolverBuilt := setupDNS(t) + + tcc, stateCh, _ := createTestResolverClientConn(t) + if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + + // Set network to anything other than tcp. + nonTCPAddr := networktype.Set(resolver.Address{Addr: resolvedTargetTestAddr1}, "unix") + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{nonTCPAddr}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + var gotState resolver.State + select { + case gotState = <-stateCh: + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout when waiting for a state update from the delegating resolver") + } + + // Verify that the delegating resolver doesn't call proxy resolver's + // UpdateState since we have no tcp address + select { + case <-proxyResolverBuilt: + t.Fatal("Unexpected call to proxy resolver update state") + case <-time.After(defaultTestShortTimeout): + } + + wantState := resolver.State{ + Addresses: []resolver.Address{nonTCPAddr}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + } + + // Verify that the state clientconn receives is same as updated by target + // resolver, since we want to avoid proxy for any network type apart from + // tcp. + if diff := cmp.Diff(gotState, wantState); diff != "" { + t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%s", diff) + } +} + +// Tests the scenario where a proxy is configured, and the resolver returns tcp +// and non-tcp addresses. The test verifies that the delegating resolver doesn't +// add proxyatrribute to adresses with network type other than tcp, but adds +// the proxyattribute to addresses with network type tcp. +func (s) TestDelegatingResolverForMixNetworkType(t *testing.T) { + const ( + targetTestAddr = "test.target" + resolvedTargetTestAddr1 = "1.1.1.1:8080" + resolvedTargetTestAddr2 = "2.2.2.2:8080" + envProxyAddr = "proxytest.com" + ) + overrideTestHTTPSProxy(t, envProxyAddr) + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + target := targetResolver.Scheme() + ":///" + targetTestAddr + // Set up a manual DNS resolver to control the proxy address resolution. + proxyResolver, proxyResolverBuilt := setupDNS(t) + + tcc, stateCh, _ := createTestResolverClientConn(t) + if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + // Set network to anything other than tcp. + nonTCPAddr := networktype.Set(resolver.Address{Addr: resolvedTargetTestAddr1}, "unix") + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{nonTCPAddr, {Addr: resolvedTargetTestAddr2}}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr, {Addr: resolvedTargetTestAddr2}}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + select { + case <-stateCh: + t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.") + case <-time.After(defaultTestShortTimeout): + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: envProxyAddr}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + var gotState resolver.State + select { + case gotState = <-stateCh: + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") + } + wantState := resolver.State{ + Addresses: []resolver.Address{nonTCPAddr, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr2)}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{nonTCPAddr, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr2)}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + } + + if diff := cmp.Diff(gotState, wantState); diff != "" { + t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%v", diff) + } +} + +// Tests the scenario where a proxy is configured but some addresses are +// excluded (by using the NO_PROXY environment variable). The test verifies that +// the delegating resolver doesn't add proxyatrribute to adresses excluded, but +// adds the proxyattribute to all other addresses. +func (s) TestDelegatingResolverWithNoProxyEnvUsed(t *testing.T) { + const ( + targetTestAddr = "test.target" + noproxyresolvedTargetAddr = "1.1.1.1:8080" + resolvedTargetTestAddr = "2.2.2.2:8080" + envProxyAddr = "proxytest.com" + ) + hpfe := func(req *http.Request) (*url.URL, error) { + // return nil to mimick the scenario where the address is excluded using + // `NO_PROXY` env variable. + if req.URL.Host == noproxyresolvedTargetAddr { + return nil, nil + } + return &url.URL{ + Scheme: "https", + Host: envProxyAddr, + }, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + defer func() { + delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe + }() + + // Manual resolver to control the target resolution. + targetResolver := manual.NewBuilderWithScheme("test") + target := targetResolver.Scheme() + ":///" + targetTestAddr + // Set up a manual DNS resolver to control the proxy address resolution. + proxyResolver, proxyResolverBuilt := setupDNS(t) + + tcc, stateCh, _ := createTestResolverClientConn(t) + if _, err := delegatingresolver.New(resolver.Target{URL: *testutils.MustParseURL(target)}, tcc, resolver.BuildOptions{}, targetResolver, false); err != nil { + t.Fatalf("Failed to create delegating resolver: %v", err) + } + + targetResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: noproxyresolvedTargetAddr}, {Addr: resolvedTargetTestAddr}}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: noproxyresolvedTargetAddr}, {Addr: resolvedTargetTestAddr}}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + select { + case <-stateCh: + t.Fatalf("Delegating resolver invoked UpdateState before both the proxy and target resolvers had updated their states.") + case <-time.After(defaultTestShortTimeout): + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Wait for the proxy resolver to be built before calling UpdateState. + mustBuildResolver(ctx, t, proxyResolverBuilt) + proxyResolver.UpdateState(resolver.State{ + Addresses: []resolver.Address{{Addr: envProxyAddr}}, + ServiceConfig: &serviceconfig.ParseResult{}, + }) + + var gotState resolver.State + select { + case gotState = <-stateCh: + case <-ctx.Done(): + t.Fatal("Context timed out when waiting for a state update from the delegating resolver") + } + wantState := resolver.State{ + Addresses: []resolver.Address{{Addr: noproxyresolvedTargetAddr}, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr)}, + Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: noproxyresolvedTargetAddr}, proxyAddressWithTargetAttribute(envProxyAddr, resolvedTargetTestAddr)}}}, + ServiceConfig: &serviceconfig.ParseResult{}, + } + + if diff := cmp.Diff(gotState, wantState); diff != "" { + t.Fatalf("Unexpected state from delegating resolver. Diff (-got +want):\n%v", diff) + } +} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index ae9316662e9a..171e690a3f22 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -176,7 +176,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error return fn(ctx, address) } if !ok { - networkType, address = parseDialTarget(address) + networkType, address = ParseDialTarget(address) } if opts, present := proxyattributes.Get(addr); present { return proxyDial(ctx, addr, grpcUA, opts) @@ -1242,7 +1242,8 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { statusCode = codes.DeadlineExceeded } } - t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) + st := status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode) + t.closeStream(s, st.Err(), false, http2.ErrCodeNo, st, nil, false) } func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 3613d7b64817..f997f9fdb5d0 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -439,8 +439,8 @@ func getWriteBufferPool(size int) *sync.Pool { return pool } -// parseDialTarget returns the network and address to pass to dialer. -func parseDialTarget(target string) (string, string) { +// ParseDialTarget returns the network and address to pass to dialer. +func ParseDialTarget(target string) (string, string) { net := "tcp" m1 := strings.Index(target, ":") m2 := strings.Index(target, ":/") diff --git a/internal/transport/http_util_test.go b/internal/transport/http_util_test.go index 5a259d43cdc2..5eb466f38561 100644 --- a/internal/transport/http_util_test.go +++ b/internal/transport/http_util_test.go @@ -211,9 +211,9 @@ func (s) TestParseDialTarget(t *testing.T) { {"dns:///google.com", "tcp", "dns:///google.com"}, {"/unix/socket/address", "tcp", "/unix/socket/address"}, } { - gotNet, gotAddr := parseDialTarget(test.target) + gotNet, gotAddr := ParseDialTarget(test.target) if gotNet != test.wantNet || gotAddr != test.wantAddr { - t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr) + t.Errorf("ParseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr) } } } diff --git a/internal/transport/proxy_ext_test.go b/internal/transport/proxy_ext_test.go index 4f5ece1e81a2..6359796ffb12 100644 --- a/internal/transport/proxy_ext_test.go +++ b/internal/transport/proxy_ext_test.go @@ -71,6 +71,19 @@ func isIPAddr(addr string) bool { return err == nil } +func overrideTestHTTPSProxy(t *testing.T, proxyAddr string) { + t.Helper() + hpfe := func(req *http.Request) (*url.URL, error) { + return &url.URL{ + Scheme: "https", + Host: proxyAddr, + }, nil + } + originalhpfe := delegatingresolver.HTTPSProxyFromEnvironment + delegatingresolver.HTTPSProxyFromEnvironment = hpfe + t.Cleanup(func() { delegatingresolver.HTTPSProxyFromEnvironment = originalhpfe }) +} + // Tests the scenario where grpc.Dial is performed using a proxy with the // default resolver in the target URI. The test verifies that the connection is // established to the proxy server, sends the unresolved target URI in the HTTP @@ -86,7 +99,7 @@ func (s) TestGRPCDialWithProxy(t *testing.T) { t.Error(err) } if got, want := host, "localhost"; got != want { - t.Errorf(" Unexpected request host: %s , want = %s ", got, want) + t.Errorf(" Unexpected request host: %s, want = %s ", got, want) } } pServer := proxyserver.New(t, reqCheck, false) @@ -95,20 +108,7 @@ func (s) TestGRPCDialWithProxy(t *testing.T) { // correctly even when unresolved. pAddr := fmt.Sprintf("localhost:%d", testutils.ParsePort(t, pServer.Addr)) - // Overwrite the function in the test and restore them in defer. - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == unresolvedTargetURI { - return &url.URL{ - Scheme: "https", - Host: pAddr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, unresolvedTargetURI) - return nil, nil - } - orighpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { delegatingresolver.HTTPSProxyFromEnvironment = orighpfe }() + overrideTestHTTPSProxy(t, pAddr) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -152,20 +152,7 @@ func (s) TestGRPCDialWithDNSAndProxy(t *testing.T) { } pServer := proxyserver.New(t, reqCheck, false) - // Overwrite the function in the test and restore them in defer. - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == unresolvedTargetURI { - return &url.URL{ - Scheme: "https", - Host: pServer.Addr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, unresolvedTargetURI) - return nil, nil - } - orighpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { delegatingresolver.HTTPSProxyFromEnvironment = orighpfe }() + overrideTestHTTPSProxy(t, pServer.Addr) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -202,7 +189,7 @@ func (s) TestNewClientWithProxy(t *testing.T) { t.Error(err) } if got, want := host, "localhost"; got != want { - t.Errorf(" Unexpected request host: %s , want = %s ", got, want) + t.Errorf(" Unexpected request host: %s, want = %s ", got, want) } } pServer := proxyserver.New(t, reqCheck, false) @@ -211,20 +198,7 @@ func (s) TestNewClientWithProxy(t *testing.T) { // correctly even when unresolved. pAddr := fmt.Sprintf("localhost:%d", testutils.ParsePort(t, pServer.Addr)) - // Overwrite the function in the test and restore them in defer. - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == unresolvedTargetURI { - return &url.URL{ - Scheme: "https", - Host: pAddr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, unresolvedTargetURI) - return nil, nil - } - orighpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { delegatingresolver.HTTPSProxyFromEnvironment = orighpfe }() + overrideTestHTTPSProxy(t, pAddr) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -265,20 +239,7 @@ func (s) TestNewClientWithProxyAndCustomResolver(t *testing.T) { } pServer := proxyserver.New(t, reqCheck, false) - // Overwrite the function in the test and restore them in defer. - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == unresolvedTargetURI { - return &url.URL{ - Scheme: "https", - Host: pServer.Addr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, unresolvedTargetURI) - return nil, nil - } - orighpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { delegatingresolver.HTTPSProxyFromEnvironment = orighpfe }() + overrideTestHTTPSProxy(t, pServer.Addr) // Create and update a custom resolver for target URI. targetResolver := manual.NewBuilderWithScheme("test") @@ -327,20 +288,7 @@ func (s) TestNewClientWithProxyAndTargetResolutionEnabled(t *testing.T) { } pServer := proxyserver.New(t, reqCheck, false) - // Overwrite the function in the test and restore them in defer. - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == unresolvedTargetURI { - return &url.URL{ - Scheme: "https", - Host: pServer.Addr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, unresolvedTargetURI) - return nil, nil - } - orighpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { delegatingresolver.HTTPSProxyFromEnvironment = orighpfe }() + overrideTestHTTPSProxy(t, pServer.Addr) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -372,20 +320,7 @@ func (s) TestNewClientWithNoProxy(t *testing.T) { reqCheck := func(_ *http.Request) { t.Error("proxy server should not have received a Connect request") } pServer := proxyserver.New(t, reqCheck, false) - // Overwrite the function in the test and restore them in defer. - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == unresolvedTargetURI { - return &url.URL{ - Scheme: "https", - Host: pServer.Addr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, unresolvedTargetURI) - return nil, nil - } - orighpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { delegatingresolver.HTTPSProxyFromEnvironment = orighpfe }() + overrideTestHTTPSProxy(t, pServer.Addr) dopts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -417,20 +352,7 @@ func (s) TestNewClientWithContextDialer(t *testing.T) { reqCheck := func(_ *http.Request) { t.Error("proxy server should not have received a Connect request") } pServer := proxyserver.New(t, reqCheck, false) - // Overwrite the function in the test and restore them in defer. - hpfe := func(req *http.Request) (*url.URL, error) { - if req.URL.Host == unresolvedTargetURI { - return &url.URL{ - Scheme: "https", - Host: pServer.Addr, - }, nil - } - t.Errorf("Unexpected request host to proxy: %s want %s", req.URL.Host, unresolvedTargetURI) - return nil, nil - } - orighpfe := delegatingresolver.HTTPSProxyFromEnvironment - delegatingresolver.HTTPSProxyFromEnvironment = hpfe - defer func() { delegatingresolver.HTTPSProxyFromEnvironment = orighpfe }() + overrideTestHTTPSProxy(t, pServer.Addr) // Create a custom dialer that directly dials the backend. customDialer := func(_ context.Context, unresolvedTargetURI string) (net.Conn, error) { @@ -472,7 +394,7 @@ func (s) TestBasicAuthInNewClientWithProxy(t *testing.T) { reqCheck := func(req *http.Request) { proxyCalled = true if got, want := req.URL.Host, "example.test"; got != want { - t.Errorf(" Unexpected request host: %s , want = %s ", got, want) + t.Errorf(" Unexpected request host: %s, want = %s ", got, want) } wantProxyAuthStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+password)) if got := req.Header.Get("Proxy-Authorization"); got != wantProxyAuthStr { diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 8b1219597912..d2c4fac3443c 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -919,8 +919,9 @@ func (s) TestLargeMessageSuspension(t *testing.T) { } // The server will send an RST stream frame on observing the deadline // expiration making the client stream fail with a DeadlineExceeded status. - if _, err := s.readTo(make([]byte, 8)); err != io.EOF { - t.Fatalf("Read got unexpected error: %v, want %v", err, io.EOF) + _, err = s.readTo(make([]byte, 8)) + if st, ok := status.FromError(err); !ok || st.Code() != codes.DeadlineExceeded { + t.Fatalf("Read got unexpected error: %v, want status with code %v", err, codes.DeadlineExceeded) } if got, want := s.Status().Code(), codes.DeadlineExceeded; got != want { t.Fatalf("Read got status %v with code %v, want %v", s.Status(), got, want) diff --git a/test/transport_test.go b/test/transport_test.go index 2f61a6c03729..10e9ab57de30 100644 --- a/test/transport_test.go +++ b/test/transport_test.go @@ -19,16 +19,20 @@ package test import ( "context" + "encoding/binary" "io" "net" "sync" "testing" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/status" @@ -153,3 +157,75 @@ func (s) TestClientTransportRestartsAfterStreamIDExhausted(t *testing.T) { t.Fatal("Timeout expired when waiting for first client transport to close") } } + +// Tests that an RST_STREAM frame that causes an io.ErrUnexpectedEOF while +// reading a gRPC message is correctly converted to a gRPC status with code +// CANCELLED. The test sends a data frame with a partial gRPC message, followed +// by an RST_STREAM frame with HTTP/2 code CANCELLED. The test asserts the +// client receives the correct status. +func (s) TestRSTDuringMessageRead(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%s) = %v", lis.Addr().String(), err) + } + defer cc.Close() + + go func() { + conn, err := lis.Accept() + if err != nil { + t.Errorf("lis.Accept() = %v", err) + return + } + defer conn.Close() + framer := http2.NewFramer(conn, conn) + + if _, err := io.ReadFull(conn, make([]byte, len(clientPreface))); err != nil { + t.Errorf("Error while reading client preface: %v", err) + return + } + if err := framer.WriteSettings(); err != nil { + t.Errorf("Error while writing settings: %v", err) + return + } + if err := framer.WriteSettingsAck(); err != nil { + t.Errorf("Error while writing settings: %v", err) + return + } + for ctx.Err() == nil { + frame, err := framer.ReadFrame() + if err != nil { + return + } + switch frame := frame.(type) { + case *http2.HeadersFrame: + // When the client creates a stream, write a partial gRPC + // message followed by an RST_STREAM. + const messageLen = 2048 + buf := make([]byte, messageLen/2) + // Write the gRPC message length header. + binary.BigEndian.PutUint32(buf[1:5], uint32(messageLen)) + if err := framer.WriteData(1, false, buf); err != nil { + return + } + framer.WriteRSTStream(1, http2.ErrCodeCancel) + default: + t.Logf("Server received frame: %v", frame) + } + } + }() + + // The server will send a partial gRPC message before cancelling the stream. + // The client should get a gRPC status with code CANCELLED. + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled { + t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled) + } +} diff --git a/version.go b/version.go index 90237b1dbbd9..2bae4db890dc 100644 --- a/version.go +++ b/version.go @@ -19,4 +19,4 @@ package grpc // Version is the current grpc version. -const Version = "1.72.0" +const Version = "1.72.2"