gRPC学习Go版(一)
创始人
2024-04-27 21:05:48
0

文章目录

  • 微服务入门
  • gRPC是什么
      • proto 服务定义
      • gRPC 优势
  • gRPC入门
      • 简单使用
      • 一元RPC
      • 服务流RPC
      • 客户流RPC
      • 双工流RPC
  • gRPC底层原理
      • RPC流
      • 长度前缀的消息分帧
      • 请求消息
      • 响应信息
      • 通信模式下的消息流

微服务入门

现在的软件很少是一个孤立的单体应用运行的,相反更多是通过互联网连接在一起的,以相互传递消息的方式进行通信和协调,也就是分布式软件的集合。

例如:一个商城系统由多个分布式的应用程序组成,像订单应用、商品应用、支付应用、数据库应用等等,这些程序可以分布在不同的网络位置中运行,通过不同的通信协议传递信息。

传统的软件被拆分成细粒度、面向业务的实体,这就是微服务。

最传统的方式就是构造成REST API 服务,也就是一组架构约束条件原则,把应用和服务定义为一组资源,但是这种方式的配置太麻烦,笨重低效。

为了更好的扩展性、低耦合进程间通信,这也就是gRPC的优势

gRPC是什么

以往我们是通过简单的路由映射来允许客户端获取路由信息和交换路由信息。

在gRPC中,我们可以一次性的在一个 proto文件中定义服务并使用任意的支持gRPC的语言去实现客户端和服务端,整个过程操作变得简单,就像调用本地函数一样。

通过 proto生成服务端代码,也就是服务端的骨架,提供低层通信抽象

通过 proto生成客户端代码,也就是客户端的存根,隐藏了不同语言的差异,提供抽象的通信方式,就像调用本地函数一样。

go get google.golang.org/grpc
go get github.com/golang/protobuf/protoc-gen-go

proto 服务定义

gRPC 使用protocol buffer 来定义服务接口,protocol buffer和 XML、JSON一样是一种结构化数据序列化的可扩展存储结构,protocol buffer是一种语言中立,结构简单高效,比XML更小更简单,可以通过特殊的插件自动生成代码来读写操作这个数据结构。

import "myproject/other_protos.proto";		// 导入其他 proto文件message SearchRequest 
{required string query = 1;				// 必须赋值字段optional int32 page_number = 2 [default = 10];		// 可选字段repeated int32 result_per_page = 3;	// 可重复字段 
}message SearchResponse 
{message Result 		// 嵌套定义{required string url = 1;optional string title = 2;repeated string snippets = 3;}repeated Result result = 1;
}message SomeOtherMessage 
{optional SearchResponse.Result result = 1;	// 使用其他消息的定义
}service List{				// 定义gRPC服务接口rpc getList(SearchRequest) returns (SearchResponse);
}
// 插件自动生成gRPC骨架和存根
protoc --go_out=plugins=grpc:. route_guide.proto后面需要实现服务端具体的逻辑就行,然后注册到gRPC服务器
客户端在调用远程方法时会使用阻塞式存根,所以gRPC主要使用同步的方式通信,在建立连接后,可以使用流的方式操作。
客户端编排为protocol buffer的格式,服务端再解排执行,以HTTP2 传输

gRPC 优势

  • 更高效的进程通信:使用基于protocol buffer在Http2 中以二进制协议通信,而不是JSON、XML文本格式
  • 简单定义的服务接口、易扩展
  • 强类型、跨语言
  • 一元RPC、服务端流、客户端流、双工流

gRPC入门

简单使用

protocol buffer

syntax = "proto3";
package ecommerce;service ProductInfo {rpc addProduct(Product) returns (ProductID);rpc getProduct(ProductID) returns (Product);
}message Product {string id = 1;string name = 2;string description = 3;float price = 4;
}message ProductID {string value = 1;
}

服务端

// server is used to implement ecommerce/product_info.
type server struct {productMap map[string]*pb.Product
}// AddProduct implements ecommerce.AddProduct
func (s *server) AddProduct(ctx context.Context,in *pb.Product) (*pb.ProductID, error) {out, err := uuid.NewV4()if err != nil {return nil, status.Errorf(codes.Internal, "Error while generating Product ID", err)}in.Id = out.String()if s.productMap == nil {s.productMap = make(map[string]*pb.Product)}s.productMap[in.Id] = inlog.Printf("Product %v : %v - Added.", in.Id, in.Name)return &pb.ProductID{Value: in.Id}, status.New(codes.OK, "").Err()
}// GetProduct implements ecommerce.GetProduct
func (s *server) GetProduct(ctx context.Context, in *pb.ProductID) (*pb.Product, error) {product, exists := s.productMap[in.Value]if exists && product != nil {log.Printf("Product %v : %v - Retrieved.", product.Id, product.Name)return product, status.New(codes.OK, "").Err()}return nil, status.Errorf(codes.NotFound, "Product does not exist.", in.Value)
}func main() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterProductInfoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

客户端

func main() {// Set up a connection to the server.conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewProductInfoClient(conn)// Contact the server and print out its response.name := "Apple iPhone 11"description := "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode."price := float32(699.00)ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()r, err := c.AddProduct(ctx, &pb.Product{Name: name, Description: description, Price: price})if err != nil {log.Fatalf("Could not add product: %v", err)}log.Printf("Product ID: %s added successfully", r.Value)product, err := c.GetProduct(ctx, &pb.ProductID{Value: r.Value})if err != nil {log.Fatalf("Could not get product: %v", err)}log.Printf("Product: %v", product.String())
}

客户端连接gRPC服务器以后,就可以像调用本地函数一样操作远程服务器。

一元RPC

通信时始终只有一个请求和一个响应

service OrderManagement {rpc addOrder(Order) returns (google.protobuf.StringValue);rpc getOrder(google.protobuf.StringValue) returns (Order);
}message Order {string id = 1;repeated string items = 2;string description = 3;float price = 4;string destination = 5;
}message CombinedShipment {string id = 1;string status = 2;repeated Order ordersList = 3;
}

服务端

func (s *server) AddOrder(ctx context.Context, orderReq *pb.Order) (*wrapper.StringValue, error) {log.Printf("Order Added. ID : %v", orderReq.Id)orderMap[orderReq.Id] = *orderReqreturn &wrapper.StringValue{Value: "Order Added: " + orderReq.Id}, nil
}func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {ord, exists := orderMap[orderId.Value]if exists {return &ord, status.New(codes.OK, "").Err()}return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)
}func main() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterOrderManagementServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

客户端

func main() {// Setting up a connection to the server.conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defer cancel()// Add Orderorder1 := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}res, _ := client.AddOrder(ctx, &order1)if res != nil {log.Print("AddOrder Response -> ", res.Value)}// Get OrderretrievedOrder , err := client.GetOrder(ctx, &wrapper.StringValue{Value: "106"})log.Print("GetOrder Response -> : ", retrievedOrder)
} 

服务流RPC

通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应

rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
func (s *server) SearchOrders(searchQuery *wrappers.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {for key, order := range orderMap {log.Print(key, order)for _, itemStr := range order.Items {log.Print(itemStr)// 检查字段是否包含查询字符串if strings.Contains(itemStr, searchQuery.Value) {// 服务端 Send 方法写入流中发送给客户端err := stream.Send(&order)if err != nil {return fmt.Errorf("error sending message to stream : %v", err)}log.Print("Matching Order Found : " + key)break}}}return nil
}
searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})for {// 客户端 Recv 方法接收服务端发送的流searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}

客户流RPC

客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果

rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {ordersStr := "Updated Order IDs : "for {// Recv 对客户端发来的请求接收order, err := stream.Recv()if err == io.EOF {// 流结束,关闭并发送响应给客户端return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})}if err != nil {return err}// 更新数据orderMap[order.Id] = *orderlog.Printf("Order ID : %s - %s", order.Id, "Updated")ordersStr += order.Id + ", "}
}
updateStream, err := client.UpdateOrders(ctx)if err != nil {log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)}// Updating order 1if err := updateStream.Send(&updOrder1); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)}// Updating order 2if err := updateStream.Send(&updOrder2); err != nil {log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)}// 发送关闭信号并接收服务端响应
updateRes, err := updateStream.CloseAndRecv()if err != nil {log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)}log.Printf("Update Orders Res : %s", updateRes)

双工流RPC

对应的业务就比如实时的消息流

rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {batchMarker := 1var combinedShipmentMap = make(map[string]pb.CombinedShipment)for {// 接收请求orderId, err := stream.Recv()log.Printf("Reading Proc order : %s", orderId)if err == io.EOF {// 客户端请求发完,返回对应响应log.Printf("EOF : %s", orderId)for _, shipment := range combinedShipmentMap {if err := stream.Send(&shipment); err != nil {return err}}return nil}if err != nil {log.Println(err)return err}// 处理逻辑destination := orderMap[orderId.GetValue()].Destinationshipment, found := combinedShipmentMap[destination]if found {ord := orderMap[orderId.GetValue()]shipment.OrdersList = append(shipment.OrdersList, &ord)combinedShipmentMap[destination] = shipment} else {comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!"}ord := orderMap[orderId.GetValue()]comShip.OrdersList = append(shipment.OrdersList, &ord)combinedShipmentMap[destination] = comShiplog.Print(len(comShip.OrdersList), comShip.GetId())}// 分批块发送回响应if batchMarker == orderBatchSize {for _, comb := range combinedShipmentMap {log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrdersList))if err := stream.Send(&comb); err != nil {return err}}batchMarker = 0combinedShipmentMap = make(map[string]pb.CombinedShipment)} else {batchMarker++}}
}
func main(){
streamProcOrder, err := client.ProcessOrders(ctx)if err := streamProcOrder.Send(&wrapper.StringValue{Value: "102"}); err != nil {log.Fatalf("%v.Send(%v) = %v", client, "102", err)}if err := streamProcOrder.Send(&wrapper.StringValue{Value: "103"}); err != nil {log.Fatalf("%v.Send(%v) = %v", client, "103", err)}channel := make(chan struct{})// 起个协程接收返回的响应go asncClientBidirectionalRPC(streamProcOrder, channel)// 模拟消息延迟,发送请求 1time.Sleep(time.Millisecond * 1000)if err := streamProcOrder.Send(&wrapper.StringValue{Value: "101"}); err != nil {log.Fatalf("%v.Send(%v) = %v", client, "101", err)}// 关闭流if err := streamProcOrder.CloseSend(); err != nil {log.Fatal(err)}channel <- struct{}{}
}func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {for {combinedShipment, errProcOrder := streamProcOrder.Recv()if errProcOrder == io.EOF {break}log.Printf("Combined shipment : ", combinedShipment.OrdersList)}<-c
}

gRPC底层原理

RPC流

服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。

  • 调用存根方法
  • 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置content-typeapplication/grpc
  • 到达服务端,会先检查请求头是不是gRPC请求,否则返回415

请添加图片描述

长度前缀的消息分帧

在写入消息前,先写入长度消息表明每条消息的大小。

每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB

帧首中还有单字节无符号整数,用来表明数据是否进行了压缩

为1表示使用 message-encoding中的编码机制进行了压缩

请添加图片描述

请求消息

客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记

1、对于gRPC 都是POST

2、协议:Http/Https

3、/服务名/方法名

4、目标URI的主机名

5、对不兼容代理的检测,gRPC下这个值必须为 trailers

6、超时时间

7、媒体类型

8、压缩类型

请添加图片描述
请添加图片描述

当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧

请添加图片描述

响应信息

服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers

END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer

请添加图片描述
请添加图片描述

通信模式下的消息流

一元RPC

请添加图片描述

服务流RPC

请添加图片描述

客户流RPC

请添加图片描述

双工流RPC
请添加图片描述

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...