GRPC goroutine leak
Publish on 2024-10-15
GRPC eat up my mem

Our company recently request us to migrate all the env vars to config center. Every thing is perfect until we start to launch some tests.

grpc version used here is 1.63.2.

Wired things happen

When we change the addr config from 123.123.123.123:8000 (the right addr) to a wrong one (say 123.123.123.123:8001), the service deployed on k8s reboot, while the pod lives. Plus, this happens, but not every time.

So I use the following cmd to check the pod

kubectl get pods {pod_name} -o yaml

It’s OOM, well not suprised.

a3c1e9bf-64aa-4de0-840d-86ae794cee8b.png

Intermission

summarize a little bit

  • mem leak when using the wrong addr for Dial
  • mem wont leak when using the right addr for Dial
  • heap profiling behaves normal (4g mem is assigned and it only used 600MB)

image.png

The only chance left is goroutine leak.

Profiling

curl localhost:15213/debug/pprof/goroutine?debug=1

and we have below profile when mem consumption reaches 3GB

goroutine profile: total 329323
246972 @ 0x4404ce 0x4509e5 0x7fb995 0x473ba1
#       0x7fb994        google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run+0x114        /root/go/pkg/mod/google.golang.org/grpc@v1.63.2/internal/grpcsync/callback_serializer.go:76

82324 @ 0x4404ce 0x4509e5 0x8b6414 0x8b4479 0x473ba1
#       0x8b6413        google.golang.org/grpc.(*addrConn).resetTransport+0x3d3 /root/go/pkg/mod/google.golang.org/grpc@v1.63.2/clientconn.go:1264
#       0x8b4478        google.golang.org/grpc.(*addrConn).connect+0x98         /root/go/pkg/mod/google.golang.org/grpc@v1.63.2/clientconn.go:913

...

$$ 300,000*8KB + 600MB =3GB $$

a goroutine takes about 8KB space, which corresponds to my guess

Source

// google.golang.org/grpc@v1.63.2/internal/grpcsync/callback_serializer.go:76

func (cs *CallbackSerializer) run(ctx context.Context) {
	defer close(cs.done)

	// TODO: when Go 1.21 is the oldest supported version, this loop and Close
	// can be replaced with:
	//
	// context.AfterFunc(ctx, cs.callbacks.Close)
	for ctx.Err() == nil {
    select {                   // <- HERE IS LINE 76!!!
		case <-ctx.Done():
			// Do nothing here. Next iteration of the for loop will not happen,
			// since ctx.Err() would be non-nil.
		case cb := <-cs.callbacks.Get():
			cs.callbacks.Load()
			cb.(func(context.Context))(ctx)
		}
	}

	// Close the buffer to prevent new callbacks from being added.
	cs.callbacks.Close()

	// Run all pending callbacks.
	for cb := range cs.callbacks.Get() {
		cs.callbacks.Load()
		cb.(func(context.Context))(ctx)
	}
}

seems like a conn’s context is not canceled, plus we did not introduce any mutual reference. which lead to two cases

  • there exist conn that is created but not closed
  • it’s cb := <-cs.callbacks.Get(): to blame

Analysis

Sec case

use

curl localhost:15213/debug/pprof/goroutine?debug=2

found run() is created by NewCallbackSerializer

// https://github.com/grpc/grpc-go/blob/v1.63.2/internal/grpcsync/callback_serializer.go#L47

func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
	cs := &CallbackSerializer{
		done:      make(chan struct{}),
		callbacks: buffer.NewUnbounded(),
	}
	go cs.run(ctx) // <- it block here
	return cs
}

// https://github.com/grpc/grpc-go/blob/master/internal/buffer/unbounded.go#L48

type Unbounded struct {
	c       chan any
	closed  bool
	closing bool
	mu      sync.Mutex
	backlog []any
}

func NewUnbounded() *Unbounded {
	return &Unbounded{c: make(chan any, 1)}
}

it just call run() and block, so we can exclude the second case case cb := <-cs.callbacks.Get():.

First case

this is the basic backbone of my colleague’s code, trying to impl stream req with retry.

	for i := 0; i < MaxRetry; i++ {
		conn, err = grpc.Dial(target, opts...)
		if err != nil {
			continue
		}
		client := pb.NewGRPCInferenceServiceClient(conn)
		stream, err = client.ModelStreamInfer(ctx, req)
		if err == nil {
			break
		}
	}

on first sight, it seems seamless

  • if addr is wrong, then dial will return err, conn will be nil
  • it continue for MaxRetry times and that’s it.

however, after add some log, it seems the conn is established successfully.

so the easiest way is modify the above code to:

	for i := 0; i < MaxRetry; i++ {
		conn, err = grpc.Dial(target, opts...)
		if err != nil {
			continue
		}
		client := pb.NewGRPCInferenceServiceClient(conn)
		stream, err = client.ModelStreamInfer(ctx, req)
		if err == nil {
			break
		} else {              // +
			conn.Close()        // +
		}                     // +
	}

but why?

Dial

insecure.NewCredentials()

// google.golang.org/grpc@v1.63.2/credentials/insecure/insecure.go

// NewCredentials returns a credentials which disables transport security.
//
// Note that using this credentials with per-RPC credentials which require
// transport security is incompatible and will cause grpc.Dial() to fail.
func NewCredentials() credentials.TransportCredentials {
	return insecureTC{}
}

// insecureTC implements the insecure transport credentials. The handshake
// methods simply return the passed in net.Conn and set the security level to
// NoSecurity.
type insecureTC struct{}

func (insecureTC) ClientHandshake(ctx context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
	return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil
}

this will ignore the target scheme and return a connection without performing a real handshake.

and I wrote a demo.

package main

import (
	"fmt"

	"google.golang.org/grpc"
)

func main() {
	conn, err := grpc.Dial("123124124", grpc.WithInsecure())
	if conn == nil {
		fmt.Printf("conn is nil")
	}
	if err != nil {
		fmt.Printf("%v", err)
	}
}

using delve to debug, this option will make conn struct non-block

   236:         if !cc.dopts.block {
=> 237:                 return cc, nil
   238:         }

the real conn will be established only when needed. and I recommend you to read the comment for DialContext.

// Dial calls DialContext(context.Background(), target, opts...).
//
// Deprecated: use NewClient instead.  Will be supported throughout 1.x.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
	return DialContext(context.Background(), target, opts...)
}

// DialContext calls NewClient and then exits idle mode.  If WithBlock(true) is
// used, it calls Connect and WaitForStateChange until either the context
// expires or the state of the ClientConn is Ready.
//
// One subtle difference between NewClient and Dial and DialContext is that the
// former uses "dns" as the default name resolver, while the latter use
// "passthrough" for backward compatibility.  This distinction should not matter
// to most users, but could matter to legacy users that specify a custom dialer
// and expect it to receive the target string directly.
//
// Deprecated: use NewClient instead.  Will be supported throughout 1.x.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
  ...
}
© 2024 humbornjo :: based on 
nobloger  ::  rss