Pragmatic gRPC 1
This artical is based on proto3 and go
proto file
To really start to build a gRPC service, it's good to check some rules for the proto file.
-
The
packagefield should contain version information, likeHelloService.v1 -
The request and response message should name with a certain rule, generally it will be the service RPC method name with a
RequestorResponsesuffix.Like:
rpc HelloWorld(HelloWorldRequest) returns (HelloWorldResponse){}
A good reference is Goole API design guide.
Just keep in mind, protocol buffer, like go, always have a default value, it can't differentiate an unset value from a default value. As an example, always keep 0 as the unknown or unset value for enum (you should do it in go either, always start
from iota+1):
enum Hello {
UNKNOWN = 0;
STARTED = 1;
RUNNING = 1;
}
Struct first or Proto first?
To start building your gRPC service, you can either choose to first define the proto file or the go struct. Let's consider these two approaches.
- Struct first
Some packages will generate proto definition from your struct type, but I didn't try them out. The benefit of this approach is appealing, especially if you use an ORM like gorm. The struct will generate all the things you need, and it becomes your only source of truth.
I think this approach is poorly supported.
Generally, I preper write proto file first, it's really easy to use proto file to work with the front end or other teams to finalize the API.
- Proto first
With all the code generated from your proto file, there is a tendency to think proto file as your source of truth. Don't. The proto file mostly belongs to the controller, it shouldn't couple with your business logic or database interactions.
Another problem is you can't easily cast your struct type to proto generated struct type (the proto.Message type). The generated type has some additional fields. An easy solution is to use the gogofaster binary of
gogoprotobuf, like:
protoc --proto_path proto --gogofaster_out="plugins=grpc:." ./proto/*.proto
This package provides lots of extensions to customize the [golang/protobuf](https://github.com/golang/protobuf). Like the support of go data type and marshaller function.
The problem is the helpful library is looking for new ownership, and it didn't support the newer version of go protocol buffer API.
Go has two versions of protocol buffer library, v1 and v2. The newer version has many improvements like the reflection api.
If you want to start a new project with gRPC, I highly suggest you start with v2 API. Then
you have 2 choices to cast your struct type to the proto generated type.
- Serillize and desrillize. Make sure you use the right library:
google.golang.org/protobuf/encoding/protojson. - Write some boilerplate code on your own.
I think the only thing to consider is performance, just benchmark with your proto message you will conclude. Generally, I prefer the second one.
code generation
With the new v2 API, here is an example to generate go code from proto file.
protoc -I=proto --go\_out\=module\=github.com/hello/hello:. \
--go-grpc_out=module=github.com/hello/hello:. \
./proto/\*.proto
In every proto file, add a line to specify the go module:
option go_package = "github.com/hello/hello/yourModule/pb";
And the generated code will be put into the pb folder under every specific
module.
Error handling
In the restful API, sometimes people put an error code and error message in every response (which is a bad practice). An example with proto will be:
message Response {
string data = 1;
int32 code = 2;
string error_msg = 3;
}
Don't do this in gRPC. gRPC error already contains the error code and the error message. If you got a response with code ok, then it indicates the request is successful(?). To construct a gRPC error is simple:
status.Error(codes.Internal, "request failed")
You can check the gRPC error type with:
//"google.golang.org/grpc/status"
if se, ok := status.FromError(err);ok {
return se.Err()
}
You can read more about the gRPC error code and explanation here.
For all the error codes, your application should only consider returning these:
- InvalidArgument Code = 3
- NotFound Code = 5
- AlreadyExists Code = 6
- PermissionDenied Code = 7
- FailedPrecondition Code = 9
- Aborted Code = 10
- OutOfRange Code = 11
- DataLoss Code = 15
- Unauthenticated Code = 16
If you want to dig more into the gRPC error, I suggest you to watch this vedio.
Deadline
It's best practice to use deadlines.
You can check whether the client set a deadline with:
d,ok := ctx.Deadline()
if !ok {
//return some error
}
timeout := d.Sub(time.Now())
//check timeout range
if timeout < 5*time.Second || timeout > 30*time.Second {
//return some error
}
But, for all the gRPC and gRPC-web libraries among all the languages, I don't think every one of them will let you set deadlines in the client request. Just think twice to implement this check on your service.
You can check for more about deadline propagation in this vedio (and other best practices).
Interceptor and Metadata
Interceptor is like the middleware in REST framework. You can get all your need from go-grpc-middleware. The interceptor chain just works like a middleware chain in REST framework.
Almost in every REST framework, you can define a specific middleware for a specific controller or endpoint. Unfortunately, I haven't found an easy way to do this in gRPC, you can only write interceptor for the whole server, after that, you can filter your routes in the interceptor, or you can put the code directly in the controller.
middleware chain uses metadata to pass data. Metadata in gRPC is the HTTP/2 version of HTTP headers. You can find a detailed explanation
here.
A lot of things can be done with interceptors using metadata and context, like authentication and authorization.
To read data from metadata is easy:
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
//return some error
}
md is of type metadata.MD which is map[string][]string. For example, to
get the trace id:
ids, ok := md["x-b3-traceid"]
This solved part of our problem, to read data from requests. If you want to pass
some data to other interceptor or controller, you can use context.
type key int
//payloadKey is unexported, to prevent collisions with keys defined in other packages
var payloadKey key
//Payload is a custom struct contain the data you want to pass on
type Payload struct {
ID string
}
First, we need an unexported type payloadKey as an unique identifier, which
won't be overwriten by other code or packages like an ordinary metadata name.
//newContext will create a new go `context` with the payload
func newContext(ctx context.Context, payload *Payload) context.Context {
return context.WithValue(ctx, payloadKey, payload)
}
//FromContext can be used in all controllers to get the payload from `context`
func FromContext(ctx context.Context) (*Payload, bool) {
payload, ok := ctx.Value(payloadKey).(*MetaPayload)
return payload, ok
}
Combine with the above interceptor chain:
func UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
//read data from metadata
...
//construct your payload
...
//return the new `context`
return handler(newContext(ctx, &payload),req)
}
That's only the case for unary interceptors, for stream interceptors, we can still use the unexportedpayloadKeyandFromContext, but we need a wrapped stream.
//WrappedStream implement the grpc stream interface, but we need to modify
// the Context() method to return our new context with payload
type WrappedStream interface {
grpc.ServerStream
Context() context.Context
}
type wrappedStream struct {
grpc.ServerStream
parentCtx context.Context
payload *Payload
}
func newWrappedStream(ss grpc.ServerStream, payload *Payload) *wrappedStream {
return &wrappedStream{ss, ss.Context(), payload}
}
//Context() will return our context with payload
func (w *wrappedStream) Context() context.Context {
return context.WithValue(w.parentCtx, payloadKey, w.payload)
}
In this way, we utilize go's composition power, you don't need to modify any existing code.
func StreamInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
//some code
...
return handler(srv,newWrappedStream(ss, &payload))
}
With all the above, you can easily implement your authentication and authorization logic.
But there is a problem, in REST, the request headers, URL and method generally contain all the information you need for authorization: the authorization header will contain information about the user identity, and the URL will tell you which resource the user requested, and the method is the user action. In gRPC, you still get the authorization metadata, you can get user action from:
//The FullMethod will return a string contain gRPC package, service, rpc method
//Remember the gRPC package is versioned? make sure update the gRPC package version won't break your authorization code
info.FullMethod
But you will not have any information about what resources the user requested. Then, you need to put that information in the metadata if you want to do authorization in the interceptor. Or you will put the requested resources in the request message.
I prefer the first approach, but carefully design your API, as data related to your business logic is now in metadata and request message.
Partial Update
gRPC uses a google.protobuf.FieldMask to describe
partial update.
You can utilize reflect API in v2 to deal with the FieldMask:
//check field mask existence
if request.FieldMask != nil{
request.FieldMask.Normalize()
//check if the field mask is valid
if ok:=request.FieldMaks.IsValid(request); !ok{
return nil, status.Error(codes.InvalidArgument, "invalid field mask")
}
}
paths:=request.FieldMask.GetPaths()
//the rest is on your own, you can do whatever you what with paths
//for example, you can do partial update by your own without reflection since fv.(type) is realy annoying
//notice the protocol buffer `[list](https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#List)` and `[map](https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#List)` type is not same as the go `array` or `map` type.
rft := request.ProtoReflect()
rft.Range(func(fd protoreflect.FieldDescriptor, fv protoreflect.Value) bool {
//compare fd.JSONName() to the maskPaths value
//get corresponse field value with fv.(type)
}
Stream with Broadcast
The most exciting part about gRPC of course is its streaming ability. Though it's no easy feat to implement that for beginners of concurrency (like me). I hope I can ease your pain when implementing the gRPC streaming with broadcasting.
The basics are:
- Each handler runs in its own goroutine (each connection will have its own goroutine)
As documented in Concurrency:
Each RPC handler attached to a registered server will be invoked in its own goroutine. For example, SayHello will be invoked in its own goroutine. The same is true for service handlers for streaming RPCs, as seen in the route guide example here. Similar to clients, multiple services can be registered to the same server.
chanis not built for broadcast, it's one-to-one communication
You will end up with something like map[string]chan, the string is an
identifier to identify the goroutin (client connection), like a sessionID, or
tracingID, or a random string, or userID.
Pay attention to the last case. Only when you can be sure the user can only log in to one instance, that is each user can only have a single active connection, or the above map becomes to map[userID]map[connectionID]chan. The chan is created in the controller indicating the exact goroutine (connection) which will receive messages from, and the controller will pass the received message to the client.
Here is an oversimplified code:
func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
//get trace id or generated a random string or whatever you want to indicate this goroutine
ID:="randomString"
//create a chan to receive response message
conn := make(chan *pb.SubscribeResponse)
for {
//receive message
response:=<-conn
//send to client
srv.Send(response)
}
}
Below are some approaches I saw or can think of:
Sharing srv
This will avoid this map structure or maybe all the channel things.
func (s *server) TestHandler(req *pb.TestRequest, srv pb.TestServer) error {
...
}
For a server streaming handler above, you can share the srv between
goroutines. The rest is up to you, you can put the srv in a map or slice
based on what you want. Just be careful as documented in
Streams:
When using streams, one must take care to avoid calling either
SendMsgorRecvMsgmultiple times against the same Stream from different goroutines. In other words, it's safe to have a goroutine callingSendMsgand another goroutine callingRecvMsgon the same stream at the same time. But it is not safe to callSendMsgon the same stream in different goroutines, or to callRecvMsgon the same stream in different goroutines.
And you certainly need sync.Mutex, I will talk about it later.
Sharing map
With the map structure previously mentioned, at some point, you always want to
modify the map structure in another goroutine. For example, you want to access
the map and send a message to the chan in one goroutine, and you want to add
new connections or remove connections from the map in another goroutine, then it
comes to the sync.Mutex, so beware of deadlocks.
mu.Lock()
doSomething()
mu.Unlock()
If mu is a sync.Mutex lock, always Lock() and Unlock() in the outer
function, do not Lock() and Unlock in doSomething(), as your code become
complicated, you will not notice when you Lock() twice.
This approach is also sharing memory, and it is prone to deadlocks.
An intermediate Channel (Preferred)
As said in Share Memory By Communicating
Do not communicate by sharing memory; instead, share memory by communicating.
We can have an intermediate goroutine that has the ownership of the map structure, by ownership I mean only this goroutine can modify the map structure. Messages will be send to an intermediate channel Broadcast, and the Broadcast will modify the map structure, or send messages to channel according to the identifier. The Broadcast has the ownership of the map structure, and the data flows in a single direction.
Some code you can work with:
func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
//get trace id or generated a random string or whatever you want to indicate this goroutine
ID:="randomString"
//create a chan to receive response message
conn := make(chan *pb.SubscribeResponse)
//an intermediate channel which incharge of the `map`
s.broadcast <- &broadcastPayload {
//an unique identifier
ID: ID
//the chan corresponse to the ID
Conn: conn
//event to indicate add, remove or send message to broadcast channel
Event: EventEnum.AddConnection
}
for {
select {
case <-srv.Context().Done():
s.broadcast <- &entity.BroadcastPayload{
ID: ID,
Event: EventEnum.RemoveConnection
}
return nil
case response := <-conn:
if status, ok := status.FromError(srv.Send(response)); ok {
switch status.Code() {
case codes.OK:
//noop
case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
return nil
default:
return nil
}
}}
}
}
//this goroutine has the ownership of the map[string]chan *pb.SubscribeResponse
go func(){
for v:=range s.broadcast {
//do something based on the event
switch v.Event {
//add ID and its corresponding Conn to the map
case EventEnum.AddConnection:
...
//delete ID and its corresponding Conn from the map
case EventEnum.RemoveConnection:
...
//receive message from bussiness logic, send the message to suiteable Conn in the map as you like
case EventEnum.ReceiveResponse:
...
}
}
}
Maybe a counter?
Without a map to identify the single chan used by the single connection, another approach I saw online is to use a counter. The counter will record the number of connections by a single user or a custom subject, single user or subject will share a chan(comparing to the above, each connection uses a single chan), you can use the sync.Mutex or sync/atomic to modify the counter when the new connection comes in or drop out. The counter will decide how many times you want to send to the chan.
sync.Cond
If the use case is really simple, I think sync.Cond is also a valid approach, but I never tried this.
I prefer the 3rd approach, it avoids memory sharing and the code is cleaner.
Going distributed
If you want to use a messaging system, like kafka or NATS, the idea is the same, use an intermediate channel to manage your connections.
Continue
A lot of topics haven't been included, like gRPC-web, testing, gRPC-gateway, health check, profiling and so many things, I will continue with a 2nd post in the future. Hope you like this one, if you have questions or suggestions, or how you solve the above problems, just submit an issue here, all are welcomed.