Our company recently request us to migrate all the env vars to config center. Every thing is perfect until we start to launch some tests.
grpc version used here is 1.63.2
.
Wired things happen
When we change the addr config from 123.123.123.123:8000
(the right addr) to a wrong one (say 123.123.123.123:8001
), the service deployed on k8s reboot, while the pod lives. Plus, this happens, but not every time.
So I use the following cmd to check the pod
kubectl get pods {pod_name} -o yaml
It’s OOM, well not suprised.
Intermission
summarize a little bit
- mem leak when using the wrong addr for
Dial
- mem wont leak when using the right addr for
Dial
- heap profiling behaves normal (4g mem is assigned and it only used 600MB)
The only chance left is goroutine leak.
Profiling
curl localhost:15213/debug/pprof/goroutine?debug=1
and we have below profile when mem consumption reaches 3GB
goroutine profile: total 329323
246972 @ 0x4404ce 0x4509e5 0x7fb995 0x473ba1
# 0x7fb994 google.golang.org/grpc/internal/grpcsync.(*CallbackSerializer).run+0x114 /root/go/pkg/mod/google.golang.org/grpc@v1.63.2/internal/grpcsync/callback_serializer.go:76
82324 @ 0x4404ce 0x4509e5 0x8b6414 0x8b4479 0x473ba1
# 0x8b6413 google.golang.org/grpc.(*addrConn).resetTransport+0x3d3 /root/go/pkg/mod/google.golang.org/grpc@v1.63.2/clientconn.go:1264
# 0x8b4478 google.golang.org/grpc.(*addrConn).connect+0x98 /root/go/pkg/mod/google.golang.org/grpc@v1.63.2/clientconn.go:913
...
$$ 300,000*8KB + 600MB =3GB $$
a goroutine takes about 8KB space, which corresponds to my guess
Source
// google.golang.org/grpc@v1.63.2/internal/grpcsync/callback_serializer.go:76
func (cs *CallbackSerializer) run(ctx context.Context) {
defer close(cs.done)
// TODO: when Go 1.21 is the oldest supported version, this loop and Close
// can be replaced with:
//
// context.AfterFunc(ctx, cs.callbacks.Close)
for ctx.Err() == nil {
select { // <- HERE IS LINE 76!!!
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
case cb := <-cs.callbacks.Get():
cs.callbacks.Load()
cb.(func(context.Context))(ctx)
}
}
// Close the buffer to prevent new callbacks from being added.
cs.callbacks.Close()
// Run all pending callbacks.
for cb := range cs.callbacks.Get() {
cs.callbacks.Load()
cb.(func(context.Context))(ctx)
}
}
seems like a conn’s context is not canceled, plus we did not introduce any mutual reference. which lead to two cases
- there exist conn that is created but not closed
- it’s
cb := <-cs.callbacks.Get():
to blame
Analysis
Sec case
use
curl localhost:15213/debug/pprof/goroutine?debug=2
found run()
is created by NewCallbackSerializer
// https://github.com/grpc/grpc-go/blob/v1.63.2/internal/grpcsync/callback_serializer.go#L47
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
cs := &CallbackSerializer{
done: make(chan struct{}),
callbacks: buffer.NewUnbounded(),
}
go cs.run(ctx) // <- it block here
return cs
}
// https://github.com/grpc/grpc-go/blob/master/internal/buffer/unbounded.go#L48
type Unbounded struct {
c chan any
closed bool
closing bool
mu sync.Mutex
backlog []any
}
func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan any, 1)}
}
it just call run()
and block, so we can exclude the second case case cb := <-cs.callbacks.Get():
.
First case
this is the basic backbone of my colleague’s code, trying to impl stream req with retry.
for i := 0; i < MaxRetry; i++ {
conn, err = grpc.Dial(target, opts...)
if err != nil {
continue
}
client := pb.NewGRPCInferenceServiceClient(conn)
stream, err = client.ModelStreamInfer(ctx, req)
if err == nil {
break
}
}
on first sight, it seems seamless
- if addr is wrong, then dial will return err, conn will be nil
- it
continue
forMaxRetry
times and that’s it.
however, after add some log, it seems the conn is established successfully.
so the easiest way is modify the above code to:
for i := 0; i < MaxRetry; i++ {
conn, err = grpc.Dial(target, opts...)
if err != nil {
continue
}
client := pb.NewGRPCInferenceServiceClient(conn)
stream, err = client.ModelStreamInfer(ctx, req)
if err == nil {
break
} else { // +
conn.Close() // +
} // +
}
but why?
Dial
insecure.NewCredentials()
// google.golang.org/grpc@v1.63.2/credentials/insecure/insecure.go
// NewCredentials returns a credentials which disables transport security.
//
// Note that using this credentials with per-RPC credentials which require
// transport security is incompatible and will cause grpc.Dial() to fail.
func NewCredentials() credentials.TransportCredentials {
return insecureTC{}
}
// insecureTC implements the insecure transport credentials. The handshake
// methods simply return the passed in net.Conn and set the security level to
// NoSecurity.
type insecureTC struct{}
func (insecureTC) ClientHandshake(ctx context.Context, _ string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
return conn, info{credentials.CommonAuthInfo{SecurityLevel: credentials.NoSecurity}}, nil
}
this will ignore the target scheme and return a connection without performing a real handshake.
and I wrote a demo.
package main
import (
"fmt"
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial("123124124", grpc.WithInsecure())
if conn == nil {
fmt.Printf("conn is nil")
}
if err != nil {
fmt.Printf("%v", err)
}
}
using delve to debug, this option will make conn struct non-block
236: if !cc.dopts.block {
=> 237: return cc, nil
238: }
the real conn will be established only when needed. and I recommend you to read the comment for DialContext
.
// Dial calls DialContext(context.Background(), target, opts...).
//
// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
// DialContext calls NewClient and then exits idle mode. If WithBlock(true) is
// used, it calls Connect and WaitForStateChange until either the context
// expires or the state of the ClientConn is Ready.
//
// One subtle difference between NewClient and Dial and DialContext is that the
// former uses "dns" as the default name resolver, while the latter use
// "passthrough" for backward compatibility. This distinction should not matter
// to most users, but could matter to legacy users that specify a custom dialer
// and expect it to receive the target string directly.
//
// Deprecated: use NewClient instead. Will be supported throughout 1.x.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
...
}