现在的软件很少是一个孤立的单体应用运行的,相反更多是通过互联网连接在一起的,以相互传递消息的方式进行通信和协调,也就是分布式软件的集合。
例如:一个商城系统由多个分布式的应用程序组成,像订单应用、商品应用、支付应用、数据库应用等等,这些程序可以分布在不同的网络位置中运行,通过不同的通信协议传递信息。
传统的软件被拆分成细粒度、面向业务的实体,这就是微服务。
最传统的方式就是构造成REST API 服务,也就是一组架构约束条件原则,把应用和服务定义为一组资源,但是这种方式的配置太麻烦,笨重低效。
为了更好的扩展性、低耦合进程间通信,这也就是gRPC的优势
以往我们是通过简单的路由映射来允许客户端获取路由信息和交换路由信息。
在gRPC中,我们可以一次性的在一个 proto文件中定义服务并使用任意的支持gRPC的语言去实现客户端和服务端,整个过程操作变得简单,就像调用本地函数一样。
通过 proto生成服务端代码,也就是服务端的骨架,提供低层通信抽象
通过 proto生成客户端代码,也就是客户端的存根,隐藏了不同语言的差异,提供抽象的通信方式,就像调用本地函数一样。
go get google.golang.org/grpc
go get github.com/golang/protobuf/protoc-gen-go
gRPC 使用protocol buffer 来定义服务接口,protocol buffer和 XML、JSON一样是一种结构化数据序列化的可扩展存储结构,protocol buffer是一种语言中立,结构简单高效,比XML更小更简单,可以通过特殊的插件自动生成代码来读写操作这个数据结构。
import "myproject/other_protos.proto"; // 导入其他 proto文件message SearchRequest
{required string query = 1; // 必须赋值字段optional int32 page_number = 2 [default = 10]; // 可选字段repeated int32 result_per_page = 3; // 可重复字段
}message SearchResponse
{message Result // 嵌套定义{required string url = 1;optional string title = 2;repeated string snippets = 3;}repeated Result result = 1;
}message SomeOtherMessage
{optional SearchResponse.Result result = 1; // 使用其他消息的定义
}service List{ // 定义gRPC服务接口rpc getList(SearchRequest) returns (SearchResponse);
}
// 插件自动生成gRPC骨架和存根
protoc --go_out=plugins=grpc:. route_guide.proto后面需要实现服务端具体的逻辑就行,然后注册到gRPC服务器
客户端在调用远程方法时会使用阻塞式存根,所以gRPC主要使用同步的方式通信,在建立连接后,可以使用流的方式操作。
客户端编排为protocol buffer的格式,服务端再解排执行,以HTTP2 传输
protocol buffer
syntax = "proto3";
package ecommerce;service ProductInfo {rpc addProduct(Product) returns (ProductID);rpc getProduct(ProductID) returns (Product);
}message Product {string id = 1;string name = 2;string description = 3;float price = 4;
}message ProductID {string value = 1;
}
服务端
// server is used to implement ecommerce/product_info.
type server struct {productMap map[string]*pb.Product
}// AddProduct implements ecommerce.AddProduct
func (s *server) AddProduct(ctx context.Context,in *pb.Product) (*pb.ProductID, error) {out, err := uuid.NewV4()if err != nil {return nil, status.Errorf(codes.Internal, "Error while generating Product ID", err)}in.Id = out.String()if s.productMap == nil {s.productMap = make(map[string]*pb.Product)}s.productMap[in.Id] = inlog.Printf("Product %v : %v - Added.", in.Id, in.Name)return &pb.ProductID{Value: in.Id}, status.New(codes.OK, "").Err()
}// GetProduct implements ecommerce.GetProduct
func (s *server) GetProduct(ctx context.Context, in *pb.ProductID) (*pb.Product, error) {product, exists := s.productMap[in.Value]if exists && product != nil {log.Printf("Product %v : %v - Retrieved.", product.Id, product.Name)return product, status.New(codes.OK, "").Err()}return nil, status.Errorf(codes.NotFound, "Product does not exist.", in.Value)
}func main() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterProductInfoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
客户端
func main() {// Set up a connection to the server.conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewProductInfoClient(conn)// Contact the server and print out its response.name := "Apple iPhone 11"description := "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode."price := float32(699.00)ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()r, err := c.AddProduct(ctx, &pb.Product{Name: name, Description: description, Price: price})if err != nil {log.Fatalf("Could not add product: %v", err)}log.Printf("Product ID: %s added successfully", r.Value)product, err := c.GetProduct(ctx, &pb.ProductID{Value: r.Value})if err != nil {log.Fatalf("Could not get product: %v", err)}log.Printf("Product: %v", product.String())
}
客户端连接gRPC服务器以后,就可以像调用本地函数一样操作远程服务器。
通信时始终只有一个请求和一个响应
service OrderManagement {rpc addOrder(Order) returns (google.protobuf.StringValue);rpc getOrder(google.protobuf.StringValue) returns (Order);
}message Order {string id = 1;repeated string items = 2;string description = 3;float price = 4;string destination = 5;
}message CombinedShipment {string id = 1;string status = 2;repeated Order ordersList = 3;
}
服务端
func (s *server) AddOrder(ctx context.Context, orderReq *pb.Order) (*wrapper.StringValue, error) {log.Printf("Order Added. ID : %v", orderReq.Id)orderMap[orderReq.Id] = *orderReqreturn &wrapper.StringValue{Value: "Order Added: " + orderReq.Id}, nil
}func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {ord, exists := orderMap[orderId.Value]if exists {return &ord, status.New(codes.OK, "").Err()}return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)
}func main() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterOrderManagementServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
客户端
func main() {// Setting up a connection to the server.conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defer cancel()// Add Orderorder1 := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}res, _ := client.AddOrder(ctx, &order1)if res != nil {log.Print("AddOrder Response -> ", res.Value)}// Get OrderretrievedOrder , err := client.GetOrder(ctx, &wrapper.StringValue{Value: "106"})log.Print("GetOrder Response -> : ", retrievedOrder)
}
通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应
rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
func (s *server) SearchOrders(searchQuery *wrappers.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {for key, order := range orderMap {log.Print(key, order)for _, itemStr := range order.Items {log.Print(itemStr)// 检查字段是否包含查询字符串if strings.Contains(itemStr, searchQuery.Value) {// 服务端 Send 方法写入流中发送给客户端err := stream.Send(&order)if err != nil {return fmt.Errorf("error sending message to stream : %v", err)}log.Print("Matching Order Found : " + key)break}}}return nil
}
searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})for {// 客户端 Recv 方法接收服务端发送的流searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}
客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {ordersStr := "Updated Order IDs : "for {// Recv 对客户端发来的请求接收order, err := stream.Recv()if err == io.EOF {// 流结束,关闭并发送响应给客户端return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})}if err != nil {return err}// 更新数据orderMap[order.Id] = *orderlog.Printf("Order ID : %s - %s", order.Id, "Updated")ordersStr += order.Id + ", "}
}
updateStream, err := client.UpdateOrders(ctx)if err != nil {log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)}// Updating order 1if err := updateStream.Send(&updOrder1); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)}// Updating order 2if err := updateStream.Send(&updOrder2); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)}// 发送关闭信号并接收服务端响应
updateRes, err := updateStream.CloseAndRecv()if err != nil {log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)}log.Printf("Update Orders Res : %s", updateRes)
对应的业务就比如实时的消息流
rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {batchMarker := 1var combinedShipmentMap = make(map[string]pb.CombinedShipment)for {// 接收请求orderId, err := stream.Recv()log.Printf("Reading Proc order : %s", orderId)if err == io.EOF {// 客户端请求发完,返回对应响应log.Printf("EOF : %s", orderId)for _, shipment := range combinedShipmentMap {if err := stream.Send(&shipment); err != nil {return err}}return nil}if err != nil {log.Println(err)return err}// 处理逻辑destination := orderMap[orderId.GetValue()].Destinationshipment, found := combinedShipmentMap[destination]if found {ord := orderMap[orderId.GetValue()]shipment.OrdersList = append(shipment.OrdersList, &ord)combinedShipmentMap[destination] = shipment} else {comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!"}ord := orderMap[orderId.GetValue()]comShip.OrdersList = append(shipment.OrdersList, &ord)combinedShipmentMap[destination] = comShiplog.Print(len(comShip.OrdersList), comShip.GetId())}// 分批块发送回响应if batchMarker == orderBatchSize {for _, comb := range combinedShipmentMap {log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrdersList))if err := stream.Send(&comb); err != nil {return err}}batchMarker = 0combinedShipmentMap = make(map[string]pb.CombinedShipment)} else {batchMarker++}}
}
func main(){
streamProcOrder, err := client.ProcessOrders(ctx)if err := streamProcOrder.Send(&wrapper.StringValue{Value: "102"}); err != nil {log.Fatalf("%v.Send(%v) = %v", client, "102", err)}if err := streamProcOrder.Send(&wrapper.StringValue{Value: "103"}); err != nil {log.Fatalf("%v.Send(%v) = %v", client, "103", err)}channel := make(chan struct{})// 起个协程接收返回的响应go asncClientBidirectionalRPC(streamProcOrder, channel)// 模拟消息延迟,发送请求 1time.Sleep(time.Millisecond * 1000)if err := streamProcOrder.Send(&wrapper.StringValue{Value: "101"}); err != nil {log.Fatalf("%v.Send(%v) = %v", client, "101", err)}// 关闭流if err := streamProcOrder.CloseSend(); err != nil {log.Fatal(err)}channel <- struct{}{}
}func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {for {combinedShipment, errProcOrder := streamProcOrder.Recv()if errProcOrder == io.EOF {break}log.Printf("Combined shipment : ", combinedShipment.OrdersList)}<-c
}
服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。
- 调用存根方法
- 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置
content-type
为application/grpc
- 到达服务端,会先检查请求头是不是gRPC请求,否则返回415
在写入消息前,先写入长度消息表明每条消息的大小。
每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB
帧首中还有单字节无符号整数,用来表明数据是否进行了压缩
为1表示使用
message-encoding
中的编码机制进行了压缩
客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记
1、对于gRPC 都是POST
2、协议:Http/Https
3、/服务名/方法名
4、目标URI的主机名
5、对不兼容代理的检测,gRPC下这个值必须为 trailers
6、超时时间
7、媒体类型
8、压缩类型
当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧
服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers
END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer
一元RPC
服务流RPC
客户流RPC
双工流RPC