Pragmatic gRPC 1
This artical is based on proto3 and go
proto
file
To really start to build a gRPC service, it's good to check some rules for the proto
file.
-
The
package
field should contain version information, likeHelloService.v1
-
The request and response message should name with a certain rule, generally it will be the service RPC method name with a
Request
orResponse
suffix.Like:
rpc HelloWorld(HelloWorldRequest) returns (HelloWorldResponse){}
A good reference is Goole API design guide.
Just keep in mind, protocol buffer, like go
, always have a default value, it can't differentiate an unset value from a default value. As an example, always keep 0
as the unknown or unset value for enum
(you should do it in go
either, always start
from iota+1
):
enum Hello {
UNKNOWN = 0;
STARTED = 1;
RUNNING = 1;
}
Struct first or Proto first?
To start building your gRPC service, you can either choose to first define the proto
file or the go
struct. Let's consider these two approaches.
- Struct first
Some packages will generate proto
definition from your struct
type, but I didn't try them out. The benefit of this approach is appealing, especially if you use an ORM like gorm
. The struct
will generate all the things you need, and it becomes your only source of truth.
I think this approach is poorly supported.
Generally, I preper write proto
file first, it's really easy to use proto
file to work with the front end or other teams to finalize the API.
- Proto first
With all the code generated from your proto
file, there is a tendency to think proto
file as your source of truth. Don't. The proto
file mostly belongs to the controller
, it shouldn't couple with your business logic or database interactions.
Another problem is you can't easily cast your struct
type to proto
generated struct
type (the proto.Message
type). The generated type has some additional fields. An easy solution is to use the gogofaster
binary of
gogoprotobuf, like:
protoc --proto_path proto --gogofaster_out="plugins=grpc:." ./proto/*.proto
This package provides lots of extensions to customize the [golang/protobuf](https://github.com/golang/protobuf)
. Like the support of go data type and marshaller function.
The problem is the helpful library is looking for new ownership, and it didn't support the newer version of go protocol buffer API.
Go has two versions of protocol buffer library, v1 and v2. The newer version has many improvements like the reflection api.
If you want to start a new project with gRPC, I highly suggest you start with v2 API. Then
you have 2 choices to cast your struct
type to the proto
generated type.
- Serillize and desrillize. Make sure you use the right library:
google.golang.org/protobuf/encoding/protojson
. - Write some boilerplate code on your own.
I think the only thing to consider is performance, just benchmark with your proto message you will conclude. Generally, I prefer the second one.
code generation
With the new v2 API, here is an example to generate go code from proto
file.
protoc -I=proto --go\_out\=module\=github.com/hello/hello:. \
--go-grpc_out=module=github.com/hello/hello:. \
./proto/\*.proto
In every proto
file, add a line to specify the go module:
option go_package = "github.com/hello/hello/yourModule/pb";
And the generated code will be put into the pb
folder under every specific
module.
Error handling
In the restful
API, sometimes people put an error code and error message in every response (which is a bad practice). An example with proto
will be:
message Response {
string data = 1;
int32 code = 2;
string error_msg = 3;
}
Don't do this in gRPC. gRPC error already contains the error code and the error message. If you got a response with code ok
, then it indicates the request is successful(?)
. To construct a gRPC error is simple:
status.Error(codes.Internal, "request failed")
You can check the gRPC error type with:
//"google.golang.org/grpc/status"
if se, ok := status.FromError(err);ok {
return se.Err()
}
You can read more about the gRPC error code and explanation here.
For all the error codes, your application should only consider returning these:
- InvalidArgument Code = 3
- NotFound Code = 5
- AlreadyExists Code = 6
- PermissionDenied Code = 7
- FailedPrecondition Code = 9
- Aborted Code = 10
- OutOfRange Code = 11
- DataLoss Code = 15
- Unauthenticated Code = 16
If you want to dig more into the gRPC error, I suggest you to watch this vedio.
Deadline
It's best practice to use deadlines.
You can check whether the client set a deadline with:
d,ok := ctx.Deadline()
if !ok {
//return some error
}
timeout := d.Sub(time.Now())
//check timeout range
if timeout < 5*time.Second || timeout > 30*time.Second {
//return some error
}
But, for all the gRPC and gRPC-web
libraries among all the languages, I don't think every one of them will let you set deadlines in the client request. Just think twice to implement this check on your service.
You can check for more about deadline propagation in this vedio (and other best practices).
Interceptor and Metadata
Interceptor is like the middleware
in REST
framework. You can get all your need from go-grpc-middleware. The interceptor chain just works like a middleware chain in REST
framework.
Almost in every REST
framework, you can define a specific middleware for a specific controller or endpoint. Unfortunately, I haven't found an easy way to do this in gRPC
, you can only write interceptor for the whole server, after that, you can filter your routes in the interceptor, or you can put the code directly in the controller
.
middleware
chain uses metadata to pass data. Metadata in gRPC is the HTTP/2 version of HTTP headers. You can find a detailed explanation
here.
A lot of things can be done with interceptors using metadata and context, like authentication and authorization.
To read data from metadata is easy:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
//return some error
}
md
is of type metadata.MD
which is map[string][]string
. For example, to
get the trace id:
ids, ok := md["x-b3-traceid"]
This solved part of our problem, to read data from requests. If you want to pass
some data to other interceptor or controller
, you can use context
.
type key int
//payloadKey is unexported, to prevent collisions with keys defined in other packages
var payloadKey key
//Payload is a custom struct contain the data you want to pass on
type Payload struct {
ID string
}
First, we need an unexported type payloadKey
as an unique identifier, which
won't be overwriten by other code or packages like an ordinary metadata name.
//newContext will create a new go `context` with the payload
func newContext(ctx context.Context, payload *Payload) context.Context {
return context.WithValue(ctx, payloadKey, payload)
}
//FromContext can be used in all controllers to get the payload from `context`
func FromContext(ctx context.Context) (*Payload, bool) {
payload, ok := ctx.Value(payloadKey).(*MetaPayload)
return payload, ok
}
Combine with the above interceptor chain
:
func UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
//read data from metadata
...
//construct your payload
...
//return the new `context`
return handler(newContext(ctx, &payload),req)
}
That's only the case for unary interceptors, for stream interceptors, we can still use the unexportedpayloadKey
andFromContext
, but we need a wrapped stream.
//WrappedStream implement the grpc stream interface, but we need to modify
// the Context() method to return our new context with payload
type WrappedStream interface {
grpc.ServerStream
Context() context.Context
}
type wrappedStream struct {
grpc.ServerStream
parentCtx context.Context
payload *Payload
}
func newWrappedStream(ss grpc.ServerStream, payload *Payload) *wrappedStream {
return &wrappedStream{ss, ss.Context(), payload}
}
//Context() will return our context with payload
func (w *wrappedStream) Context() context.Context {
return context.WithValue(w.parentCtx, payloadKey, w.payload)
}
In this way, we utilize go's composition power, you don't need to modify any existing code.
func StreamInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
//some code
...
return handler(srv,newWrappedStream(ss, &payload))
}
With all the above, you can easily implement your authentication and authorization logic.
But there is a problem, in REST
, the request headers, URL and method generally contain all the information you need for authorization: the authorization
header will contain information about the user identity, and the URL will tell you which resource the user requested, and the method is the user action. In gRPC, you still get the authorization
metadata, you can get user action from:
//The FullMethod will return a string contain gRPC package, service, rpc method
//Remember the gRPC package is versioned? make sure update the gRPC package version won't break your authorization code
info.FullMethod
But you will not have any information about what resources the user requested. Then, you need to put that information in the metadata if you want to do authorization in the interceptor. Or you will put the requested resources in the request message.
I prefer the first approach, but carefully design your API, as data related to your business logic is now in metadata and request message.
Partial Update
gRPC uses a google.protobuf.FieldMask
to describe
partial update.
You can utilize reflect API in v2 to deal with the FieldMask
:
//check field mask existence
if request.FieldMask != nil{
request.FieldMask.Normalize()
//check if the field mask is valid
if ok:=request.FieldMaks.IsValid(request); !ok{
return nil, status.Error(codes.InvalidArgument, "invalid field mask")
}
}
paths:=request.FieldMask.GetPaths()
//the rest is on your own, you can do whatever you what with paths
//for example, you can do partial update by your own without reflection since fv.(type) is realy annoying
//notice the protocol buffer `[list](https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#List)` and `[map](https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#List)` type is not same as the go `array` or `map` type.
rft := request.ProtoReflect()
rft.Range(func(fd protoreflect.FieldDescriptor, fv protoreflect.Value) bool {
//compare fd.JSONName() to the maskPaths value
//get corresponse field value with fv.(type)
}
Stream with Broadcast
The most exciting part about gRPC of course is its streaming ability. Though it's no easy feat to implement that for beginners of concurrency (like me). I hope I can ease your pain when implementing the gRPC streaming with broadcasting.
The basics are:
- Each handler runs in its own goroutine (each connection will have its own goroutine)
As documented in Concurrency:
Each RPC handler attached to a registered server will be invoked in its own goroutine. For example, SayHello will be invoked in its own goroutine. The same is true for service handlers for streaming RPCs, as seen in the route guide example here. Similar to clients, multiple services can be registered to the same server.
chan
is not built for broadcast, it's one-to-one communication
You will end up with something like map[string]chan
, the string
is an
identifier to identify the goroutin (client connection), like a sessionID
, or
tracingID
, or a random string, or userID
.
Pay attention to the last case. Only when you can be sure the user can only log in to one instance, that is each user can only have a single active connection, or the above map
becomes to map[userID]map[connectionID]chan
. The chan
is created in the controller
indicating the exact goroutine (connection) which will receive messages from, and the controller
will pass the received message to the client.
Here is an oversimplified code:
func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
//get trace id or generated a random string or whatever you want to indicate this goroutine
ID:="randomString"
//create a chan to receive response message
conn := make(chan *pb.SubscribeResponse)
for {
//receive message
response:=<-conn
//send to client
srv.Send(response)
}
}
Below are some approaches I saw or can think of:
srv
Sharing This will avoid this map
structure or maybe all the channel
things.
func (s *server) TestHandler(req *pb.TestRequest, srv pb.TestServer) error {
...
}
For a server streaming handler above, you can share the srv
between
goroutines. The rest is up to you, you can put the srv
in a map
or slice
based on what you want. Just be careful as documented in
Streams:
When using streams, one must take care to avoid calling either
SendMsg
orRecvMsg
multiple times against the same Stream from different goroutines. In other words, it's safe to have a goroutine callingSendMsg
and another goroutine callingRecvMsg
on the same stream at the same time. But it is not safe to callSendMsg
on the same stream in different goroutines, or to callRecvMsg
on the same stream in different goroutines.
And you certainly need sync.Mutex
, I will talk about it later.
map
Sharing With the map
structure previously mentioned, at some point, you always want to
modify the map
structure in another goroutine. For example, you want to access
the map
and send a message to the chan
in one goroutine, and you want to add
new connections or remove connections from the map
in another goroutine, then it
comes to the sync.Mutex
, so beware of deadlocks.
mu.Lock()
doSomething()
mu.Unlock()
If mu
is a sync.Mutex
lock, always Lock()
and Unlock()
in the outer
function, do not Lock()
and Unlock
in doSomething()
, as your code become
complicated, you will not notice when you Lock()
twice.
This approach is also sharing memory, and it is prone to deadlocks.
An intermediate Channel (Preferred)
As said in Share Memory By Communicating
Do not communicate by sharing memory; instead, share memory by communicating.
We can have an intermediate goroutine that has the ownership of the map
structure, by ownership I mean only this goroutine can modify the map
structure. Messages will be send to an intermediate channel Broadcast
, and the Broadcast
will modify the map
structure, or send messages to channel
according to the identifier. The Broadcast
has the ownership of the map
structure, and the data flows in a single direction.
Some code you can work with:
func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
//get trace id or generated a random string or whatever you want to indicate this goroutine
ID:="randomString"
//create a chan to receive response message
conn := make(chan *pb.SubscribeResponse)
//an intermediate channel which incharge of the `map`
s.broadcast <- &broadcastPayload {
//an unique identifier
ID: ID
//the chan corresponse to the ID
Conn: conn
//event to indicate add, remove or send message to broadcast channel
Event: EventEnum.AddConnection
}
for {
select {
case <-srv.Context().Done():
s.broadcast <- &entity.BroadcastPayload{
ID: ID,
Event: EventEnum.RemoveConnection
}
return nil
case response := <-conn:
if status, ok := status.FromError(srv.Send(response)); ok {
switch status.Code() {
case codes.OK:
//noop
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
return nil
default:
return nil
}
}}
}
}
//this goroutine has the ownership of the map[string]chan *pb.SubscribeResponse
go func(){
for v:=range s.broadcast {
//do something based on the event
switch v.Event {
//add ID and its corresponding Conn to the map
case EventEnum.AddConnection:
...
//delete ID and its corresponding Conn from the map
case EventEnum.RemoveConnection:
...
//receive message from bussiness logic, send the message to suiteable Conn in the map as you like
case EventEnum.ReceiveResponse:
...
}
}
}
Maybe a counter?
Without a map
to identify the single chan
used by the single connection, another approach I saw online is to use a counter. The counter will record the number of connections by a single user
or a custom subject
, single user
or subject
will share a chan
(comparing to the above, each connection uses a single chan
), you can use the sync.Mutex
or sync/atomic
to modify the counter when the new connection comes in or drop out. The counter will decide how many times you want to send to the chan
.
sync.Cond
If the use case is really simple, I think sync.Cond
is also a valid approach, but I never tried this.
I prefer the 3rd approach, it avoids memory sharing and the code is cleaner.
Going distributed
If you want to use a messaging system, like kafka
or NATS
, the idea is the same, use an intermediate channel
to manage your connections.
Continue
A lot of topics haven't been included, like gRPC-web, testing, gRPC-gateway, health check, profiling and so many things, I will continue with a 2nd post in the future. Hope you like this one, if you have questions or suggestions, or how you solve the above problems, just submit an issue here, all are welcomed.