gRPC学习Go版(一)

news2024/11/20 6:19:01

文章目录

  • 微服务入门
  • gRPC是什么
      • proto 服务定义
      • gRPC 优势
  • gRPC入门
      • 简单使用
      • 一元RPC
      • 服务流RPC
      • 客户流RPC
      • 双工流RPC
  • gRPC底层原理
      • RPC流
      • 长度前缀的消息分帧
      • 请求消息
      • 响应信息
      • 通信模式下的消息流

微服务入门

现在的软件很少是一个孤立的单体应用运行的,相反更多是通过互联网连接在一起的,以相互传递消息的方式进行通信和协调,也就是分布式软件的集合。

例如:一个商城系统由多个分布式的应用程序组成,像订单应用、商品应用、支付应用、数据库应用等等,这些程序可以分布在不同的网络位置中运行,通过不同的通信协议传递信息。

传统的软件被拆分成细粒度、面向业务的实体,这就是微服务。

最传统的方式就是构造成REST API 服务,也就是一组架构约束条件原则,把应用和服务定义为一组资源,但是这种方式的配置太麻烦,笨重低效。

为了更好的扩展性、低耦合进程间通信,这也就是gRPC的优势

gRPC是什么

以往我们是通过简单的路由映射来允许客户端获取路由信息和交换路由信息。

在gRPC中,我们可以一次性的在一个 proto文件中定义服务并使用任意的支持gRPC的语言去实现客户端和服务端,整个过程操作变得简单,就像调用本地函数一样。

通过 proto生成服务端代码,也就是服务端的骨架,提供低层通信抽象

通过 proto生成客户端代码,也就是客户端的存根,隐藏了不同语言的差异,提供抽象的通信方式,就像调用本地函数一样。

go get google.golang.org/grpc
go get github.com/golang/protobuf/protoc-gen-go

proto 服务定义

gRPC 使用protocol buffer 来定义服务接口,protocol buffer和 XML、JSON一样是一种结构化数据序列化的可扩展存储结构,protocol buffer是一种语言中立,结构简单高效,比XML更小更简单,可以通过特殊的插件自动生成代码来读写操作这个数据结构。

import "myproject/other_protos.proto";		// 导入其他 proto文件

message SearchRequest 
{
  required string query = 1;				// 必须赋值字段
  optional int32 page_number = 2 [default = 10];		// 可选字段
  repeated int32 result_per_page = 3;	// 可重复字段 
}

message SearchResponse 
{
  message Result 		// 嵌套定义
  {
    required string url = 1;
    optional string title = 2;
    repeated string snippets = 3;
  }
  repeated Result result = 1;
}

message SomeOtherMessage 
{
  optional SearchResponse.Result result = 1;	// 使用其他消息的定义
}

service List{				// 定义gRPC服务接口
	rpc getList(SearchRequest) returns (SearchResponse);
}
// 插件自动生成gRPC骨架和存根
protoc --go_out=plugins=grpc:. route_guide.proto

后面需要实现服务端具体的逻辑就行,然后注册到gRPC服务器
客户端在调用远程方法时会使用阻塞式存根,所以gRPC主要使用同步的方式通信,在建立连接后,可以使用流的方式操作。
客户端编排为protocol buffer的格式,服务端再解排执行,以HTTP2 传输

gRPC 优势

  • 更高效的进程通信:使用基于protocol buffer在Http2 中以二进制协议通信,而不是JSON、XML文本格式
  • 简单定义的服务接口、易扩展
  • 强类型、跨语言
  • 一元RPC、服务端流、客户端流、双工流

gRPC入门

简单使用

protocol buffer

syntax = "proto3";
package ecommerce;

service ProductInfo {
    rpc addProduct(Product) returns (ProductID);
    rpc getProduct(ProductID) returns (Product);
}

message Product {
    string id = 1;
    string name = 2;
    string description = 3;
    float price = 4;
}

message ProductID {
    string value = 1;
}

服务端

// server is used to implement ecommerce/product_info.
type server struct {
	productMap map[string]*pb.Product
}

// AddProduct implements ecommerce.AddProduct
func (s *server) AddProduct(ctx context.Context,
	in *pb.Product) (*pb.ProductID, error) {
	out, err := uuid.NewV4()
	if err != nil {
		return nil, status.Errorf(codes.Internal, "Error while generating Product ID", err)
	}
	in.Id = out.String()
	if s.productMap == nil {
		s.productMap = make(map[string]*pb.Product)
	}
	s.productMap[in.Id] = in
	log.Printf("Product %v : %v - Added.", in.Id, in.Name)
	return &pb.ProductID{Value: in.Id}, status.New(codes.OK, "").Err()
}

// GetProduct implements ecommerce.GetProduct
func (s *server) GetProduct(ctx context.Context, in *pb.ProductID) (*pb.Product, error) {
	product, exists := s.productMap[in.Value]
	if exists && product != nil {
		log.Printf("Product %v : %v - Retrieved.", product.Id, product.Name)
		return product, status.New(codes.OK, "").Err()
	}
	return nil, status.Errorf(codes.NotFound, "Product does not exist.", in.Value)
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterProductInfoServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

客户端

func main() {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewProductInfoClient(conn)

	// Contact the server and print out its response.
	name := "Apple iPhone 11"
	description := "Meet Apple iPhone 11. All-new dual-camera system with Ultra Wide and Night mode."
	price := float32(699.00)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.AddProduct(ctx, &pb.Product{Name: name, Description: description, Price: price})
	if err != nil {
		log.Fatalf("Could not add product: %v", err)
	}
	log.Printf("Product ID: %s added successfully", r.Value)

	product, err := c.GetProduct(ctx, &pb.ProductID{Value: r.Value})
	if err != nil {
		log.Fatalf("Could not get product: %v", err)
	}
	log.Printf("Product: %v", product.String())
}

客户端连接gRPC服务器以后,就可以像调用本地函数一样操作远程服务器。

一元RPC

通信时始终只有一个请求和一个响应

service OrderManagement {
    rpc addOrder(Order) returns (google.protobuf.StringValue);
    rpc getOrder(google.protobuf.StringValue) returns (Order);
}

message Order {
    string id = 1;
    repeated string items = 2;
    string description = 3;
    float price = 4;
    string destination = 5;
}

message CombinedShipment {
    string id = 1;
    string status = 2;
    repeated Order ordersList = 3;
}

服务端

func (s *server) AddOrder(ctx context.Context, orderReq *pb.Order) (*wrapper.StringValue, error) {
	log.Printf("Order Added. ID : %v", orderReq.Id)
	orderMap[orderReq.Id] = *orderReq
	return &wrapper.StringValue{Value: "Order Added: " + orderReq.Id}, nil
}

func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {
	ord, exists := orderMap[orderId.Value]
	if exists {
		return &ord, status.New(codes.OK, "").Err()
	}
	return nil, status.Errorf(codes.NotFound, "Order does not exist. : ", orderId)
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterOrderManagementServer(s, &server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

客户端

func main() {
	// Setting up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	client := pb.NewOrderManagementClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()

	// Add Order
	order1 := pb.Order{Id: "101", Items: []string{"iPhone XS", "Mac Book Pro"}, Destination: "San Jose, CA", Price: 2300.00}
	res, _ := client.AddOrder(ctx, &order1)
	if res != nil {
		log.Print("AddOrder Response -> ", res.Value)
	}
  // Get Order
	retrievedOrder , err := client.GetOrder(ctx, &wrapper.StringValue{Value: "106"})
	log.Print("GetOrder Response -> : ", retrievedOrder)
} 

服务流RPC

通信时可以是一个请求,服务端多次响应,比如查询业务,服务端模糊匹配找到一次就返回客户端一次响应这样的多次响应

rpc searchOrders(google.protobuf.StringValue) returns (stream Order);
func (s *server) SearchOrders(searchQuery *wrappers.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
	for key, order := range orderMap {
		log.Print(key, order)
		for _, itemStr := range order.Items {
			log.Print(itemStr)
      // 检查字段是否包含查询字符串
			if strings.Contains(itemStr, searchQuery.Value) {
				// 服务端 Send 方法写入流中发送给客户端
				err := stream.Send(&order)
				if err != nil {
					return fmt.Errorf("error sending message to stream : %v", err)
				}
				log.Print("Matching Order Found : " + key)
				break
			}
		}
	}
	return nil
}
searchStream, _ := client.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
	for {
    // 客户端 Recv 方法接收服务端发送的流
		searchOrder, err := searchStream.Recv()
		if err == io.EOF {
			log.Print("EOF")
			break
		}
		if err == nil {
			log.Print("Search Result : ", searchOrder)
		}
	}

客户流RPC

客户端多个请求发给服务端,服务端发送一个响应给客户端,比如更新业务,客户端的读个请求发过来,服务端更新完返回一个成功的结果

rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
  
	ordersStr := "Updated Order IDs : "
	for {
    // Recv 对客户端发来的请求接收
		order, err := stream.Recv()
		if err == io.EOF {
			// 流结束,关闭并发送响应给客户端
			return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
		}
		if err != nil {
			return err
		}
		// 更新数据
		orderMap[order.Id] = *order
		log.Printf("Order ID : %s - %s", order.Id, "Updated")
		ordersStr += order.Id + ", "
	}
}
updateStream, err := client.UpdateOrders(ctx)
	if err != nil {
		log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
	}
	// Updating order 1
	if err := updateStream.Send(&updOrder1); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder1, err)
	}
	// Updating order 2
	if err := updateStream.Send(&updOrder2); err != nil {
		log.Fatalf("%v.Send(%v) = %v", updateStream, updOrder2, err)
	}
	// 发送关闭信号并接收服务端响应
updateRes, err := updateStream.CloseAndRecv()
	if err != nil {
		log.Fatalf("%v.CloseAndRecv() got error %v, want %v", updateStream, err, nil)
	}
	log.Printf("Update Orders Res : %s", updateRes)

双工流RPC

对应的业务就比如实时的消息流

rpc processOrders(stream google.protobuf.StringValue) returns (stream CombinedShipment);
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {

	batchMarker := 1
	var combinedShipmentMap = make(map[string]pb.CombinedShipment)
	for {
    // 接收请求
		orderId, err := stream.Recv()
		log.Printf("Reading Proc order : %s", orderId)
		if err == io.EOF {
			// 客户端请求发完,返回对应响应
			log.Printf("EOF : %s", orderId)
			for _, shipment := range combinedShipmentMap {
				if err := stream.Send(&shipment); err != nil {
					return err
				}
			}
			return nil
		}
		if err != nil {
			log.Println(err)
			return err
		}
		// 处理逻辑
		destination := orderMap[orderId.GetValue()].Destination
		shipment, found := combinedShipmentMap[destination]

		if found {
			ord := orderMap[orderId.GetValue()]
			shipment.OrdersList = append(shipment.OrdersList, &ord)
			combinedShipmentMap[destination] = shipment
		} else {
			comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!"}
			ord := orderMap[orderId.GetValue()]
			comShip.OrdersList = append(shipment.OrdersList, &ord)
			combinedShipmentMap[destination] = comShip
			log.Print(len(comShip.OrdersList), comShip.GetId())
		}
		// 分批块发送回响应
		if batchMarker == orderBatchSize {
			for _, comb := range combinedShipmentMap {
				log.Printf("Shipping : %v -> %v", comb.Id, len(comb.OrdersList))
				if err := stream.Send(&comb); err != nil {
					return err
				}
			}
			batchMarker = 0
			combinedShipmentMap = make(map[string]pb.CombinedShipment)
		} else {
			batchMarker++
		}
	}
}
func main(){
streamProcOrder, err := client.ProcessOrders(ctx)

	if err := streamProcOrder.Send(&wrapper.StringValue{Value: "102"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "102", err)
	}

	if err := streamProcOrder.Send(&wrapper.StringValue{Value: "103"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "103", err)
	}

channel := make(chan struct{})
  // 起个协程接收返回的响应
	go asncClientBidirectionalRPC(streamProcOrder, channel)
  // 模拟消息延迟,发送请求 1
	time.Sleep(time.Millisecond * 1000)
  if err := streamProcOrder.Send(&wrapper.StringValue{Value: "101"}); err != nil {
		log.Fatalf("%v.Send(%v) = %v", client, "101", err)
	}
	// 关闭流
	if err := streamProcOrder.CloseSend(); err != nil {
		log.Fatal(err)
	}
	channel <- struct{}{}
}

func asncClientBidirectionalRPC(streamProcOrder pb.OrderManagement_ProcessOrdersClient, c chan struct{}) {
	for {
		combinedShipment, errProcOrder := streamProcOrder.Recv()
		if errProcOrder == io.EOF {
			break
		}
		log.Printf("Combined shipment : ", combinedShipment.OrdersList)
	}
	<-c
}

gRPC底层原理

RPC流

服务端实现protocol buffer定义的方法,客户端保留一个存根,提供服务端方法的抽象,客户端只需要调用存根中的方法,就可以远程调用服务端方法。

  • 调用存根方法
  • 存根创建HTTP POST请求(gRPC中所有请求都是 POST),设置content-typeapplication/grpc
  • 到达服务端,会先检查请求头是不是gRPC请求,否则返回415

请添加图片描述

长度前缀的消息分帧

在写入消息前,先写入长度消息表明每条消息的大小。

每条消息都有额外的4字节来设置大小,也就是说消息的大小不能超过4GB

帧首中还有单字节无符号整数,用来表明数据是否进行了压缩

为1表示使用 message-encoding中的编码机制进行了压缩

请添加图片描述

请求消息

客户端发送,包含3个部分:请求头信息、长度前缀的消息、流结束标记

1、对于gRPC 都是POST

2、协议:Http/Https

3、/服务名/方法名

4、目标URI的主机名

5、对不兼容代理的检测,gRPC下这个值必须为 trailers

6、超时时间

7、媒体类型

8、压缩类型

请添加图片描述
请添加图片描述

当因为没有要发送的数据而需要关闭请求流时,必须发送一个带标记的空数据帧

请添加图片描述

响应信息

服务端发送,包含3个部分:响应头信息、长度前缀的信息、trailers

END_STREAM 标记不会随数据帧一起发送,而是作为单独的头信息来发送,名为 trailer

请添加图片描述
请添加图片描述

通信模式下的消息流

一元RPC

请添加图片描述

服务流RPC

请添加图片描述

客户流RPC

请添加图片描述

双工流RPC
请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/106542.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

玩以太坊链上项目的必备技能(错误处理以及异常-Solidity之旅十四)

错误处理 作为开发者的我们知道&#xff0c;我们所编写出来的程序难免会出现 bug &#xff0c;而要做的是捕获异常&#xff0c;给用户抛出一个友好地错误提示。 而在 Solidity 中&#xff0c;根据状态恢复异常来处理错误&#xff0c;该异常将撤销在当前调用中对状态所做的所有…

[思维模式-9]:《如何系统思考》-5- 认识篇 - 改变开环、组合逻辑的线性思考,实施闭环、时序逻辑的动态思考。

目录 第1章 因果关系 1.1 因果关系 1.2 因果关系的特点 1.3 因果关系的类型 第2章 线性思考遇到的问题&#xff1a;开环思维、组合逻辑 2.1 开环系统 2.2 组合逻辑 2.3 线性关系 2.4 什么是线性思维&#xff1a;线性因果关系 2.5 线性思维的数学本质 2.6 线性思维的…

自动化药房出药升降机选型设计

一、 运动规划、运动参数的确定 1、 运动参数计算 运动参数主要通过速度规划确定&#xff0c;速度规划采用直线速度特性&#xff0c;如图所示。 运动方程为&#xff1a; 2、 X方向的速度和加速度的估算 已知参数&#xff1a; X方向行程:1…

stream_component_open函数分析

stream_component_open() 函数主要作用是打开 音频流或者视频流 对应的解码器&#xff0c;开启解码线程去解码。 流程图如下&#xff1a; stream_component_open() 的函数定义如下&#xff1a; /* open a given stream. Return 0 if OK */ static int stream_component_open(…

K8S知识点及dashboard操作

1.什么是K8S&#xff1f; K8S是一组服务器集群&#xff0c;可以在集群的各个节点上运行特定的容器。 K8S所管理的是&#xff1a;集群节点上的容器 特性&#xff1a; 自我修复&#xff0c;弹性伸缩&#xff08;根据实时服务器的并打情况&#xff0c;增加或收缩容器数量&…

网络编程套接字Socket(通过两个用例,逐行注释,详细理解)干活满满建议收藏

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言1.分类1.流套接字2.数据报套接字3.原始套接字2.Socket通信模型 3.UDP套接字编程1. DatagramSocket API1.构造方法1.DatagramSocket()2.DatagramSocket(int port)…

C语言之复合类型上卷(十八)(阴阳两极)

上一篇: C语言之内存管理&#xff08;十七&#xff09;&#xff08;转世灵童现世&#xff09; 逐梦编程&#xff0c;让中华屹立世界之巅。 简单的事情重复做,重复的事情用心做,用心的事情坚持做&#xff1b; 文章目录前言一、什么是结构体&#xff1f;二、结构体的定义及初始化…

USB TO SPI(上海同旺电子)调试器调试MCP3201 A/D 转换器

所需设备&#xff1a; 1、USB TO SPI(上海同旺电子)&#xff1b; 2、MCP3201 12 位A/D 转换器; 特性 • 12 位分辨率 • 1 LSB DNL &#xff08;最大值&#xff09; • 1 LSB INL &#xff08;最大值&#xff09;&#xff08;MCP3201-B&#xff09; • 2 LSB INL &#xff…

pdf文件太大怎么变小,如何压缩pdf大小

pdf文件太大怎么变小&#xff1f;如果你是Windows电脑&#xff0c;可以使用PDF编辑器来减小PDF文件的大小&#xff0c;比如这款出色的PDF压缩工具-易我PDF编辑器&#xff0c;它的“压缩”功能提供了两种减小文件大小的方法&#xff0c;这使得它既适合那些只想获得更小的PDF的人…

【vscode】c++程序的自动编译及调试(环境centos)

目录1.新增配置文件&#xff08;1&#xff09;c_cpp_properties.json&#xff08;2&#xff09;files.associations&#xff08;3&#xff09;tasks.json(4)CMakeLists.txt2.断点调试1.新增配置文件 VS Code的配置文件一般是指特定目录下的JSON文件。所谓JSON是一种文本格式&a…

LCF-ATEPC(2020 Elsevier)面向中文的方面级提取和分类

论文题目&#xff08;Title&#xff09;&#xff1a;A Multi-task Learning Model for Chinese-oriented Aspect Polarity Classification and Aspect Term Extraction &#xff08;面向中文的方面极性分类和方面项提取的多任务学习模型) 研究问题&#xff08;Question&#…

适用于 Windows 10/11 电脑 的 5 大好用的离线录屏软件

屏幕录制应用程序可以数字记录出现在任何设备或 PC 屏幕上的内容&#xff0c;并同时以高清流式传输音频和视频。 因此&#xff0c;他们帮助创建营销视频、跟踪客户行为、设计产品演示、监控员工活动、录制教育内容、网络研讨会内容和业务会议内容。 现在您已经意识到屏幕录…

VS系列多通道振弦传感器无线采发仪的数据发送说明

每次设备启动后会将采集到的传感器数据进行内部存储&#xff0c;并在设置好的时间间隔将数据发送出去&#xff0c;通过修改“数据发送方式”参数&#xff0c;监测数据可由数据接口输出也可经由无线网络发送。在发送监测数据时&#xff0c;可通过修改“数据包协议”参数来设置所…

函数和数组习题

个人主页&#xff1a;平行线也会相交 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 平行线也会相交 原创 收录于专栏【C语言基础习题】 文章目录知识点习题2.实现一个整型数组的冒泡排序&#xff08;编程体&#xff09;。3.编程题&#xff1a;创建一个整型…

springcloud,springboot各个版本之间的关系

1 版本关系 在实际的开发中如果要自己搭建矿建&#xff0c;发现springcloud&#xff0c;springboot的版本可能是首先需要确定的&#xff0c;那么他们之间的关系是什么呢&#xff1f;看官网&#xff0c;地址 Spring Cloud 左侧是cloud的版本&#xff0c;右侧是对应的文档&…

Splunk Window 客户端迁移

最近客户的Splunk deployment server 要迁移,伴随着client 端的配置也要相应的调整: 先看一下架构: 看一下主要的参数: Summary of key terminology Heres a recap of the key definitions: TermMeaningdeployment serverA Splunk Enterprise instance that acts as a c…

Java中的日期与时间

Java中的日期与时间\huge{Java中的日期与时间}Java中的日期与时间 JavaJavaJava中有很多类是专门用于描述日期类的。 Date类 DateDateDate类&#xff1a;用于表示当前所在系统的日期时间信息。 Date类的构造器 示例&#xff1a; Date d new Date(); System.out.println(d);…

12月第3周榜单丨B站UP主排行榜(飞瓜数据B站)发布!

飞瓜轻数发布2022年12月12日-12月18日飞瓜数据UP主排行榜&#xff08;B站平台&#xff09;&#xff0c;通过充电数、涨粉数、成长指数三个维度来体现UP主账号成长的情况&#xff0c;为用户提供B站号综合价值的数据参考&#xff0c;根据UP主成长情况用户能够快速找到运营能力强的…

Redis高级篇

redis的四个问题&#xff1a; 1.Redis是基于内存存储,服务重启可能会丢失数据; 2.并发能力问题&#xff1a;单节点Redis能力虽然不错,但也无法满足如618这种高并发的场景(618并发 数量达到数十万甚至上百万); 3.如果reids宕机,服务不可用,则需要一种自动的故障恢复手段; 4.存…

自学Python可以找到工作吗?

自学Python可以找到工作吗&#xff1f;自学Python找工作主要看自己的学习能力&#xff0c;自学能力很强学完并精通当然可以工作&#xff0c;不过对于大多数人而言一般都挺难&#xff0c;学习不成系统&#xff0c;遇到问题没人解决很容易放弃半途而废。 学Python能干很多很多事…