Pragmatic gRPC 1

This artical is based on proto3 and go

proto file

To really start to build a gRPC service, it's good to check some rules for the proto file.

  • The package field should contain version information, like HelloService.v1

  • The request and response message should name with a certain rule, generally it will be the service RPC method name with a Request or Response suffix.

    Like:

rpc HelloWorld(HelloWorldRequest) returns (HelloWorldResponse){}

A good reference is Goole API design guide.

Just keep in mind, protocol buffer, like go, always have a default value, it can't differentiate an unset value from a default value. As an example, always keep 0 as the unknown or unset value for enum (you should do it in go either, always start from iota+1):

enum Hello { 
	UNKNOWN = 0;
	STARTED = 1;
	RUNNING = 1;
}

Struct first or Proto first?

To start building your gRPC service, you can either choose to first define the proto file or the go struct. Let's consider these two approaches.

  1. Struct first

Some packages will generate proto definition from your struct type, but I didn't try them out. The benefit of this approach is appealing, especially if you use an ORM like gorm. The struct will generate all the things you need, and it becomes your only source of truth.

I think this approach is poorly supported.

Generally, I preper write proto file first, it's really easy to use proto file to work with the front end or other teams to finalize the API.

  1. Proto first

With all the code generated from your proto file, there is a tendency to think proto file as your source of truth. Don't. The proto file mostly belongs to the controller, it shouldn't couple with your business logic or database interactions.

Another problem is you can't easily cast your struct type to proto generated struct type (the proto.Message type). The generated type has some additional fields. An easy solution is to use the gogofaster binary of gogoprotobuf, like:

protoc --proto_path proto  --gogofaster_out="plugins=grpc:." ./proto/*.proto

This package provides lots of extensions to customize the [golang/protobuf](https://github.com/golang/protobuf). Like the support of go data type and marshaller function.

The problem is the helpful library is looking for new ownership, and it didn't support the newer version of go protocol buffer API.

Go has two versions of protocol buffer library, v1 and v2. The newer version has many improvements like the reflection api.

If you want to start a new project with gRPC, I highly suggest you start with v2 API. Then you have 2 choices to cast your struct type to the proto generated type.

  1. Serillize and desrillize. Make sure you use the right library: google.golang.org/protobuf/encoding/protojson.
  2. Write some boilerplate code on your own.

I think the only thing to consider is performance, just benchmark with your proto message you will conclude. Generally, I prefer the second one.

code generation

With the new v2 API, here is an example to generate go code from proto file.

protoc -I=proto --go\_out\=module\=github.com/hello/hello:. \
--go-grpc_out=module=github.com/hello/hello:. \
./proto/\*.proto

In every proto file, add a line to specify the go module:

option go_package = "github.com/hello/hello/yourModule/pb";

And the generated code will be put into the pb folder under every specific module.

Error handling

In the restful API, sometimes people put an error code and error message in every response (which is a bad practice). An example with proto will be:

message Response {
	string data = 1;
	int32 code = 2;
	string error_msg = 3;
}

Don't do this in gRPC. gRPC error already contains the error code and the error message. If you got a response with code ok, then it indicates the request is successful(?). To construct a gRPC error is simple:

status.Error(codes.Internal, "request failed")

You can check the gRPC error type with:

//"google.golang.org/grpc/status"
if se, ok := status.FromError(err);ok {
	return se.Err()
}

You can read more about the gRPC error code and explanation here.

For all the error codes, your application should only consider returning these:

  • InvalidArgument Code = 3
  • NotFound Code = 5
  • AlreadyExists Code = 6
  • PermissionDenied Code = 7
  • FailedPrecondition Code = 9
  • Aborted Code = 10
  • OutOfRange Code = 11
  • DataLoss Code = 15
  • Unauthenticated Code = 16

If you want to dig more into the gRPC error, I suggest you to watch this vedio.

Deadline

It's best practice to use deadlines.

You can check whether the client set a deadline with:

d,ok := ctx.Deadline()
if !ok {
	//return some error
}
timeout := d.Sub(time.Now())
//check timeout range
if timeout < 5*time.Second || timeout > 30*time.Second {
	//return some error
}

But, for all the gRPC and gRPC-web libraries among all the languages, I don't think every one of them will let you set deadlines in the client request. Just think twice to implement this check on your service.

You can check for more about deadline propagation in this vedio (and other best practices).

Interceptor and Metadata

Interceptor is like the middleware in REST framework. You can get all your need from go-grpc-middleware. The interceptor chain just works like a middleware chain in REST framework.

Almost in every REST framework, you can define a specific middleware for a specific controller or endpoint. Unfortunately, I haven't found an easy way to do this in gRPC, you can only write interceptor for the whole server, after that, you can filter your routes in the interceptor, or you can put the code directly in the controller.

middleware chain uses metadata to pass data. Metadata in gRPC is the HTTP/2 version of HTTP headers. You can find a detailed explanation here.

A lot of things can be done with interceptors using metadata and context, like authentication and authorization.

To read data from metadata is easy:

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
    //return some error
}

md is of type metadata.MD which is map[string][]string. For example, to get the trace id:

ids, ok := md["x-b3-traceid"]

This solved part of our problem, to read data from requests. If you want to pass some data to other interceptor or controller, you can use context.

type key int

//payloadKey is unexported, to prevent collisions with keys defined in other packages
var payloadKey key

//Payload is a custom struct contain the data you want to pass on
type Payload struct {
    ID string
}

First, we need an unexported type payloadKey as an unique identifier, which won't be overwriten by other code or packages like an ordinary metadata name.

//newContext will create a new go `context` with the payload
func newContext(ctx context.Context, payload *Payload) context.Context {
    return context.WithValue(ctx, payloadKey, payload)
}
//FromContext can be used in all controllers to get the payload from `context`
func FromContext(ctx context.Context) (*Payload, bool) {
	payload, ok := ctx.Value(payloadKey).(*MetaPayload)
	return payload, ok
}

Combine with the above interceptor chain:

func UnaryInterceptor() grpc.UnaryServerInterceptor {
 	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	//read data from metadata
	...
	//construct your payload
	...
	//return the new `context`
	return handler(newContext(ctx, &payload),req)
}

That's only the case for unary interceptors, for stream interceptors, we can still use the unexportedpayloadKeyandFromContext, but we need a wrapped stream.

//WrappedStream implement the grpc stream interface, but we need to modify
// the Context() method to return our new context with payload
type WrappedStream interface {
	grpc.ServerStream
	Context() context.Context
}

type wrappedStream struct {
	grpc.ServerStream
	parentCtx context.Context
	payload   *Payload
}

func newWrappedStream(ss grpc.ServerStream, payload *Payload) *wrappedStream {
 	return &wrappedStream{ss, ss.Context(), payload}
}

//Context() will return our context with payload
func (w *wrappedStream) Context() context.Context {
    return context.WithValue(w.parentCtx, payloadKey, w.payload)
}

In this way, we utilize go's composition power, you don't need to modify any existing code.

func StreamInterceptor() grpc.StreamServerInterceptor {
	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	//some code
	...
	return handler(srv,newWrappedStream(ss, &payload))
}

With all the above, you can easily implement your authentication and authorization logic.

But there is a problem, in REST, the request headers, URL and method generally contain all the information you need for authorization: the authorization header will contain information about the user identity, and the URL will tell you which resource the user requested, and the method is the user action. In gRPC, you still get the authorization metadata, you can get user action from:

//The FullMethod will return a string contain gRPC package, service, rpc method
//Remember the gRPC package is versioned? make sure update the gRPC package version won't break your authorization code
info.FullMethod

But you will not have any information about what resources the user requested. Then, you need to put that information in the metadata if you want to do authorization in the interceptor. Or you will put the requested resources in the request message.

I prefer the first approach, but carefully design your API, as data related to your business logic is now in metadata and request message.

Partial Update

gRPC uses a google.protobuf.FieldMask to describe partial update.

You can utilize reflect API in v2 to deal with the FieldMask:

//check field mask existence
if request.FieldMask != nil{
	request.FieldMask.Normalize()
	//check if the field mask is valid
	if ok:=request.FieldMaks.IsValid(request); !ok{
		return nil, status.Error(codes.InvalidArgument, "invalid field mask")
	}
}
paths:=request.FieldMask.GetPaths()

//the rest is on your own, you can do whatever you what with paths

//for example, you can do partial update by your own without reflection since fv.(type) is realy annoying
//notice the protocol buffer `[list](https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#List)` and `[map](https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#List)` type is not same as the go `array` or `map` type.
rft := request.ProtoReflect()
rft.Range(func(fd protoreflect.FieldDescriptor, fv protoreflect.Value) bool {
	//compare fd.JSONName() to the maskPaths value
	//get corresponse field value with fv.(type)
}

Stream with Broadcast

The most exciting part about gRPC of course is its streaming ability. Though it's no easy feat to implement that for beginners of concurrency (like me). I hope I can ease your pain when implementing the gRPC streaming with broadcasting.

The basics are:

  • Each handler runs in its own goroutine (each connection will have its own goroutine)

As documented in Concurrency:

Each RPC handler attached to a registered server will be invoked in its own goroutine. For example, SayHello will be invoked in its own goroutine. The same is true for service handlers for streaming RPCs, as seen in the route guide example here. Similar to clients, multiple services can be registered to the same server.

  • chan is not built for broadcast, it's one-to-one communication

You will end up with something like map[string]chan, the string is an identifier to identify the goroutin (client connection), like a sessionID, or tracingID, or a random string, or userID.

Pay attention to the last case. Only when you can be sure the user can only log in to one instance, that is each user can only have a single active connection, or the above map becomes to map[userID]map[connectionID]chan. The chan is created in the controller indicating the exact goroutine (connection) which will receive messages from, and the controller will pass the received message to the client.

Here is an oversimplified code:

func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
	//get trace id or generated a random string or whatever you want to indicate this goroutine
	ID:="randomString"
	//create a chan to receive response message
	conn := make(chan *pb.SubscribeResponse)
	for {
		//receive message
		response:=<-conn
		//send to client
		srv.Send(response)
	}
}

Below are some approaches I saw or can think of:

Sharing srv

This will avoid this map structure or maybe all the channel things.

func (s *server) TestHandler(req *pb.TestRequest, srv pb.TestServer) error {
	...
}

For a server streaming handler above, you can share the srv between goroutines. The rest is up to you, you can put the srv in a map or slice based on what you want. Just be careful as documented in Streams:

When using streams, one must take care to avoid calling either SendMsg or RecvMsg multiple times against the same Stream from different goroutines. In other words, it's safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time. But it is not safe to call SendMsg on the same stream in different goroutines, or to call RecvMsg on the same stream in different goroutines.

And you certainly need sync.Mutex, I will talk about it later.

Sharing map

With the map structure previously mentioned, at some point, you always want to modify the map structure in another goroutine. For example, you want to access the map and send a message to the chan in one goroutine, and you want to add new connections or remove connections from the map in another goroutine, then it comes to the sync.Mutex, so beware of deadlocks.

	mu.Lock()
	doSomething()
	mu.Unlock()

If mu is a sync.Mutex lock, always Lock() and Unlock() in the outer function, do not Lock() and Unlock in doSomething(), as your code become complicated, you will not notice when you Lock() twice.

This approach is also sharing memory, and it is prone to deadlocks.

An intermediate Channel (Preferred)

As said in Share Memory By Communicating

Do not communicate by sharing memory; instead, share memory by communicating.

We can have an intermediate goroutine that has the ownership of the map structure, by ownership I mean only this goroutine can modify the map structure. Messages will be send to an intermediate channel Broadcast, and the Broadcast will modify the map structure, or send messages to channel according to the identifier. The Broadcast has the ownership of the map structure, and the data flows in a single direction.

Some code you can work with:

func (s *server) Subscribe(req *pb.SubscribeRequest, srv pb.SubscribeServer) error {
	//get trace id or generated a random string or whatever you want to indicate this goroutine
	ID:="randomString"
	//create a chan to receive response message
	conn := make(chan *pb.SubscribeResponse)
	//an intermediate channel which incharge of the `map`
	s.broadcast <- &broadcastPayload {
		//an unique identifier
		ID: ID
		//the chan corresponse to the ID
		Conn: conn
		//event to indicate add, remove or send message to broadcast channel
		Event: EventEnum.AddConnection
	}

	for {
		select {
			case <-srv.Context().Done():
				s.broadcast <- &entity.BroadcastPayload{
					 ID: ID,
					 Event: EventEnum.RemoveConnection
				}
				return nil
			case response := <-conn:
				if status, ok := status.FromError(srv.Send(response)); ok {
					switch status.Code() {
					case codes.OK:
						//noop
					case codes.Unavailable, codes.Canceled, codes.DeadlineExceeded:
						return nil
					default:
						return nil
			 }
		 }}
	}
}
//this goroutine has the ownership of the map[string]chan *pb.SubscribeResponse
go func(){
	for v:=range s.broadcast {
		//do something based on the event
		switch v.Event {
			//add ID and its corresponding Conn to the map
			case EventEnum.AddConnection:
				...
			//delete ID and its corresponding Conn from the map
			case EventEnum.RemoveConnection:
				...
			//receive message from bussiness logic, send the message to suiteable Conn in the map as you like
			case EventEnum.ReceiveResponse:
				...
		}
	}
}

Maybe a counter?

Without a map to identify the single chan used by the single connection, another approach I saw online is to use a counter. The counter will record the number of connections by a single user or a custom subject, single user or subject will share a chan(comparing to the above, each connection uses a single chan), you can use the sync.Mutex or sync/atomic to modify the counter when the new connection comes in or drop out. The counter will decide how many times you want to send to the chan.

sync.Cond

If the use case is really simple, I think sync.Cond is also a valid approach, but I never tried this.

I prefer the 3rd approach, it avoids memory sharing and the code is cleaner.

Going distributed

If you want to use a messaging system, like kafka or NATS, the idea is the same, use an intermediate channel to manage your connections.

Continue

A lot of topics haven't been included, like gRPC-web, testing, gRPC-gateway, health check, profiling and so many things, I will continue with a 2nd post in the future. Hope you like this one, if you have questions or suggestions, or how you solve the above problems, just submit an issue here, all are welcomed.

Refs