gRPC 客户端调用服务端需要连接池吗?

news2025/1/11 21:05:36

发现的问题

在微服务开发中,gRPC 的应用绝对少不了,一般情况下,内部微服务交互,通常是使用 RPC 进行通信,如果是外部通信的话,会提供 https 接口文档

对于 gRPC 的基本使用可以查看文章 gRPC介绍

对于 gRPC ,我们需要基本知道如下的一些知识点:

  • gRPC 的基本四种模式的应用场景

    • 请求响应模式
    • 客户端数据流模式
    • 服务端数据流模式
    • 双向流模式
  • Proto 文件的定义和使用
  • gRPC 拦截器的应用 , 基本的可以查看这篇 gRPC 拦截器

    • 实际上有客户端拦截器 和 服务端拦截器,具体详细的可以自行学习
  • gRPC 的设计原理细节
  • Go-Kit 的使用

当然今天并不是要聊 gRPC 的应用或者原理,而是想聊我们在开发过程中很容易遇到的问题:

  • 未复用 gRPC 客户端连接,影响性能

最近审查各个服务代码中,发现整个部门使用 gRPC 客户端请求服务端接口的时候,都是会新建一个连接,然后调用服务端接口,使用完毕之后就 close 掉, 例如这样

这会有什么问题呢?

正常简单的使用不会有啥问题,但如果是面临高并发的情况,性能问题很容易就会出现,例如我们在做性能测试的时候,就会发现,打一会性能测试,客户端请求服务端的时候就会报错:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

实际去查看问题的时候,很明显,这是 gRPC 的连接数被打满了,很多连接都还未完全释放

那这个时候,简单思考一下,我们是没有必要对于每一次客户端请求服务端接口的时候,都新建立一次连接,并且调用完毕之后就马上关闭连接

我们知道,gRPC 的通信本质上也是 TCP 的连接,那么一次连接就需要三次握手,和四次挥手,每一次建立连接和释放连接的时候,都需要走这么一个过程,如果我们频繁的建立和释放连接,这对于资源和性能其实都是一个大大的浪费

我们还知道 gRPC 是一个高性能、开源和拥有统一规定的 RPC框架,面向对象的 http/2 通信协议,能够能节省空间和 IO 密集度的开销 ,但是我们并没有很好的将他运用起来,gRPC 服务端的连接管理不用我们操心,但是我们对于 gRPC 客户端的连续非常有必要关心,咱们要想办法复用客户端的连接

gRPC 连接池

复用连接,我们可以使用连接池的方式

对于这种复用资源,我们其实也接触了不少,例如复用线程 worker 的线程池,go 中的协程池 …

简单来说,连接池 ,就是提前创建好一定数量的 tcp 连接句柄放在池子中,咱们需要和外部通信的时候,就去池子中取一个连接来用,用完了之后,咱们就放回去

连接池解决了什么问题

很明显,连接池解决了上述咱们频繁创建连接和释放连接带来的资源和性能上的损耗,咱们节省了这部分开销后,自然就提高了咱们的性能

可是我们再次思考一下,如果这个连接池子就是只能存放固定的连接,那么我们业务扩张的时候,岂不是光等待池子里面有空闲连接就会耗费大量的时间呢?

或者是池子过大,咱们需要的连接数较少,那么开辟那么多连接岂不是一种浪费?

那么我们在设计或者是应用连接池的时候,就需要考虑如下几个方面了:

  • 连接池是否支持扩缩容
  • 空闲的连接是否支持超时自行关闭,是否支持保活
  • 池子满的时候,处理的策略是什么样的

其实关于连接池的设计和库网上都很多,我们可以找一个案例来看看如何来使用连接池,以及它是如何来进行上述几个方面的编码落地的

如何去使用连接池

先来看看客户端如何使用连接池

客户端使用 pool

client/main.go

package main

import (
        "context"
        "flag"
        "fmt"
        "log"
        "time"

        "mypoolclient/pool"
        "mypoolclient/pb"
)

var addr = flag.String("addr", "127.0.0.1:8888", "the address to connect to")

func main() {
        flag.Parse()

        p, err := pool.New(*addr, pool.DefaultOptions)
        if err != nil {
                log.Fatalf("failed to new pool: %v", err)
        }
        defer p.Close()

        conn, err := p.Get()
        if err != nil {
                log.Fatalf("failed to get conn: %v", err)
        }
        defer conn.Close()

        client := pb.NewTestsvrClient(conn.Value())
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()

        res, err := client.Say(ctx, &pb.TestReq{Message: []byte("hi")})
        if err != nil {
                log.Fatalf("unexpected error from Say: %v", err)
        }
        fmt.Println("rpc response:", res)
}

此处的客户端,我们很明显可以看出来,以前咱们使用客户端去调用服务端接口的时候,总会不自觉的 Dial 一下建立连接

咱们使用连接池的话,就可以直接从池子里面拿一个连接出来直接使用即可

服务端

server/client.go

package main

import (
        "context"
        "flag"
        "fmt"
        "log"
        "net"

        "google.golang.org/grpc"

        "mypoolserver/pb"
)

var port = flag.Int("port", 8888, "port number")

// server implements EchoServer.
type server struct{}

func (s *server) Say(context.Context, *pb.TestReq) (*pb.TestRsp, error) {
        fmt.Println("call  Say ... ")
        return &pb.TestRsp{Message: []byte("hello world")}, nil
}

func main() {
        flag.Parse()

        listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", *port))
        if err != nil {
                log.Fatalf("failed to listen: %v", err)
        }

        s := grpc.NewServer()
        pb.RegisterTestsvrServer(s, &server{})
        fmt.Println("start server ...")

        if err := s.Serve(listen); err != nil {
                log.Fatalf("failed to serve: %v", err)
        }


        fmt.Println("over server ...")
}

连接池的具体实现方式

连接池的具体实现方式,参考了 github https://github.com/shimingyah/pool

具体的实现,都放在上述目录的 pool 下面了 , 也可以访问地址 : https://github.com/qingconglaixueit/mypoolapp

pool 包中包含了 3 个文件,作用如下:

.

├── conn.go

– 关于 grpc 连接的结构定义和方法实现

├── options.go

– 拦截器的常量定义,以及 Dial 建立连接的简单封装, 这个文件可要可不要,看自己的需求

└── pool.go

– 具体 pool 的接口定义和实现

直接来看 pool.go 中的接口定义

type Pool interface {
   Get() (Conn, error)

   Close() error

   Status() string
}
  • Get()

获取一个新的连接 , 当关闭连接的时候,会将该连接放入到池子中

  • Close()

关闭连接池,自然连接池子中的连接也不再可用

关于 pool 结构的定义 ,conn 结构的定义建议,将上述 github 地址上的源码下载下来进行阅读,下面主要是分享关于

  • 连接池子的创建,扩缩容,释放
  • 具体 TCP 连接的创建和释放

创建连接池

func New(address string, option Options) (Pool, error) {
   if address == "" {
      return nil, errors.New("invalid address settings")
   }
   if option.Dial == nil {
      return nil, errors.New("invalid dial settings")
   }
   if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
      return nil, errors.New("invalid maximum settings")
   }
   if option.MaxConcurrentStreams <= 0 {
      return nil, errors.New("invalid maximun settings")
   }

   p := &pool{
      index:   0,
      current: int32(option.MaxIdle),
      ref:     0,
      opt:     option,
      conns:   make([]*conn, option.MaxActive),
      address: address,
      closed:  0,
   }

   for i := 0; i < p.opt.MaxIdle; i++ {
      c, err := p.opt.Dial(address)
      if err != nil {
         p.Close()
         return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
      }
      p.conns[i] = p.wrapConn(c, false)
   }
   log.Printf("new pool success: %v\n", p.Status())

   return p, nil
}

关于 pool 的接口,可以看成是这样的

对于创建连接池,除了校验基本的参数以外,我们知道池子其实是一个 TCP 连接的切片,长度为 option.MaxActive 即最大的活跃连接数

p.conns[i] = p.wrapConn(c, false) 表示咱们初始化一个连接,并放到连接池中,且初始化的 once 参数置为 false,表示该连接默认保存在池子中,不被销毁

换句话说,当我们需要真实销毁连接池中的连接的时候,就将该链接的 once 参数置为 false 即可,实际上也无需我们使用这去做这一步

实际上 关于每一个连接的建立也是在 New 里面完成的,只要有 1 个连接未建立成功,那么咱们的连接池就算是建立失败,咱们会调用 p.Close() 将之前建立好的连接全部释放掉

// 关闭连接池
func (p *pool) Close() error {
   atomic.StoreInt32(&p.closed, 1)
   atomic.StoreUint32(&p.index, 0)
   atomic.StoreInt32(&p.current, 0)
   atomic.StoreInt32(&p.ref, 0)
   p.deleteFrom(0)
   log.Printf("close pool success: %v\n", p.Status())
   return nil
}

// 清除从 指定位置开始到 MaxActive 之间的连接
func (p *pool) deleteFrom(begin int) {
   for i := begin; i < p.opt.MaxActive; i++ {
      p.reset(i)
   }
}

// 清除具体的连接
func (p *pool) reset(index int) {
   conn := p.conns[index]
   if conn == nil {
      return
   }
   conn.reset()
   p.conns[index] = nil
}

这里我们可以看到,当需要从池子中清除具体的连接的时候,最终从连接池子中取出对应位置上的连接 ,conn := p.conns[index], conn.reset() ,实际上是给当前这个连接进行参数赋值

func (c *conn) reset() error {
   cc := c.cc
   c.cc = nil
   c.once = false
   if cc != nil {
      return cc.Close()
   }
   return nil
}

func (c *conn) Close() error {
   c.pool.decrRef()
   if c.once {
      return c.reset()
   }
   return nil
}

最终调用 Close() 将指定的连接清除掉,这些动作都是连接池自动给我们做了,无需我们使用者去担心

我们使用连接池通过 pool.Get() 拿到具体的连接句柄 conn 之后,我们使用 conn.Close() 关闭连接,实际上也是会走到上述的 Close() 实现的位置,但是我们并未指定当然也没有权限显示的指定将 once 置位为 false ,因此对于调用者来说,是关闭了连接,对于连接池来说,实际上是将连接归还到连接池中

关于连接池子的缩容和扩容是在 pool.Get() 中实现的

func (p *pool) Get() (Conn, error) {
   // the first selected from the created connections
   nextRef := p.incrRef()
   p.RLock()
   current := atomic.LoadInt32(&p.current)
   p.RUnlock()
   if current == 0 {
      return nil, ErrClosed
   }
   if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
      next := atomic.AddUint32(&p.index, 1) % uint32(current)
      return p.conns[next], nil
   }

   // the number connection of pool is reach to max active
   if current == int32(p.opt.MaxActive) {
      // the second if reuse is true, select from pool's connections
      if p.opt.Reuse {
         next := atomic.AddUint32(&p.index, 1) % uint32(current)
         return p.conns[next], nil
      }
      // the third create one-time connection
      c, err := p.opt.Dial(p.address)
      return p.wrapConn(c, true), err
   }

   // the fourth create new connections given back to pool
   p.Lock()
   current = atomic.LoadInt32(&p.current)
   if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
      // 2 times the incremental or the remain incremental
      increment := current
      if current+increment > int32(p.opt.MaxActive) {
         increment = int32(p.opt.MaxActive) - current
      }
      var i int32
      var err error
      for i = 0; i < increment; i++ {
         c, er := p.opt.Dial(p.address)
         if er != nil {
            err = er
            break
         }
         p.reset(int(current + i))
         p.conns[current+i] = p.wrapConn(c, false)
      }
      current += i
      log.Printf("grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
         p.current, current, increment, p.opt.MaxActive)
      atomic.StoreInt32(&p.current, current)
      if err != nil {
         p.Unlock()
         return nil, err
      }
   }
   p.Unlock()
   next := atomic.AddUint32(&p.index, 1) % uint32(current)
   return p.conns[next], nil
}

从 Get 的实现中,我们可以知道 Get 的逻辑如下

  • 先增加连接的引用计数,如果在设定 current*int32(p.opt.MaxConcurrentStreams) 范围内,那么直接取连接进行使用即可
  • 若当前的连接数达到了最大活跃的连接数,那么就看我们新建池子的时候传递的 option 中的 reuse 参数是否是 true,若是复用,则随机取出连接池中的任意连接提供使用,如果不复用,则新建一个连接
  • 其余的情况,就需要我们进行 2 倍或者 1 倍的数量对连接池进行扩容了

实际上,上述的库中,并没有提供咱们缩容的算法,如果真的有这方面的需求的话

也可以在 Get 的实现上进行缩容,具体的缩容策略可以根据实际情况来定,例如当引用计数 nextRef 只有当前活跃连接数的 20% 的时候(这只是一个例子),就可以考虑缩容了

感谢阅读,欢迎交流,点个赞,关注一波 再走吧

可以进入地址进行体验和学习:https://xxetb.xet.tech/s/3lucCI

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/908899.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ClickHouse(二十三):Java Spark读写ClickHouse API

进入正文前&#xff0c;感谢宝子们订阅专题、点赞、评论、收藏&#xff01;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; &#x1f3e1;个人主页&#xff1a;含各种IT体系技术&#xff0c;IT贫道_Apache Doris,大数据OLAP体系技术栈,Kerberos安全认证-CSDN博客 &…

vue开发环境搭建(WebStorm)

一、安装Node.js&#xff0c;搭建Vue环境 1、访问Node.js官网&#xff08;https://nodejs.org/en/download/&#xff09;进行安装包下载。 2、下载成功之后运行安装程序&#xff0c;进行安装。 如果是用安装程序进行安装&#xff0c;在安装过程中会自动进行Nodejs环境变量的配置…

最新两年工作经验总结

最新两年工作经验总结 前言URP的使用1&#xff1a;如何开启URP1、老项目升级为URP2、创建新项目时选择URP创建 2&#xff1a;URP阴影的设置 PolyBrush的使用&#xff08;地图编辑插件&#xff09;制作山峰or低谷边缘柔化雨刷上色制造场景中的物体贴图地形创建容易踩坑的点ProBu…

springboot大文件上传、分片上传、断点续传、秒传的实现

对于大文件的处理&#xff0c;无论是用户端还是服务端&#xff0c;如果一次性进行读取发送、接收都是不可取&#xff0c;很容易导致内存问题。所以对于大文件上传&#xff0c;采用切块分段上传&#xff0c;从上传的效率来看&#xff0c;利用多线程并发上传能够达到最大效率。 …

示例1:FreeRTOS移植详解_基于HAL库工程

1、开发环境 (1)Keil MDK: V5.38.0.0 (2)STM32CubeMX: V6.8.1 (3)MCU: STM32F103C8(F1系列软仿真最方便) (4)ARM编译器&#xff1a;V5(使用V6编译会报错) 2、移植准备工作 (1)用于移植FreeRTOS的基础工程。 时钟已配置好串口已配置好printf已经重定向到串口1 (2)FreeRT…

《YOLO小目标检测》专栏介绍 CSDN独家改进创新实战专栏目录

&#x1f4a1;&#x1f4a1;&#x1f4a1;Yolo小目标检测&#xff0c;独家首发创新&#xff08;原创&#xff09;&#xff0c;适用于Yolov5、Yolov7、Yolov8等各个Yolo系列&#xff0c;专栏文章提供每一步步骤和源码&#xff0c;带你轻松实现小目标检测涨点 &#x1f4a1;&…

【二分查找篇】速刷牛客TOP101 高效刷题指南

文章目录 17、BM17 二分查找-I18、BM18 二维数组中的查找19、BM19 寻找峰值20、BM20 数组中的逆序对21、BM21 旋转数组的最小数字22、BM22 比较版本号23、BM23 二叉树的前序遍历 17、BM17 二分查找-I 思路步骤&#xff1a; step 1&#xff1a;从数组首尾开始&#xff0c;每次取…

wustojc日期格式变化

#include <stdio.h> int main() {char a[10];for(int i0;i<10;i){//用一个耍聪明的方法&#xff0c;全部用数组存储&#xff1b;面向结果编程a[0]getchar();}printf("%c%c%c%c%c%c%c%c%c%c",a[6],a[7],a[8],a[9],a[2],a[0],a[1],a[5],a[3],a[4]);return 0;}…

什么是跳跃表 ? 说一说跳跃表的查询和新增流程 ?

1.什么是跳跃表&#xff08;Skip List&#xff09; 跳跃表是 ZSet 有序列表底层的一种实现&#xff0c;也成为跳表。它通过添加多层链表的方式&#xff0c;用于在有序集合中进行高效的查找操作。 简单跳跃表的结构图&#xff1a; 从图中可以看出跳跃表有这些特征&#xff1a; …

Nginx-URLRewrite伪静态

URLRwrite是指将真实地址隐藏&#xff0c;用户访问是通过伪地址进行访问&#xff0c;这样可以隐藏URL中的传参等等 URLwrite演示&#xff0c;浏览器输入伪URL&#xff0c;回车会跳转到真实URL Rewrite匹配规则 redirect是指当请求伪装地址后&#xff0c;页面会直接跳转到真实…

基于微信小程序的上门维修评价系统_22c7h-

随着科学研究的不断深入&#xff0c;有关上门维修的各种信息量也在成倍增长。面对庞大的信息量&#xff0c;就需要有上门维修系统来提高管理工作的效率。通过这样的系统&#xff0c;我们可以做到信息的规范管理和快速查询&#xff0c;从而减少了管理方面的工作量。 建立基于微信…

聊聊 Docker

聊聊 Docker Docker 是什么&#xff1f; 定义 Docker 是一款 开源的应用容器引擎。 简单来说&#xff0c;就是 以容器虚拟化技术为基础的软件。可以把应用程序和所依赖的包一起打包到一个可移植的镜像中&#xff0c;发布到 Linux 或者 Windows 上运行。&#xff08;代码 运…

数据通信——传输层(传输层概述)

引言 终于到传输层了&#xff0c;网络层还有很多需要补充的&#xff0c;后期在慢慢填补了。 我们看哈&#xff01;在物理层我们设计出来各种硬件&#xff0c;然后使它们在物理上相互连接&#xff0c;信号以比特流的形式进行发送&#xff1b;随后&#xff0c;在数据链路层&#…

Mybatis介绍和搭建(详细搭建步骤)

目录 一、mybatis介绍 官方简介 通俗易懂 二、搭建步骤 1.创建Maven项目 2.创建数据库并建表和相关类 3.创建全局配置文件,配置数据库连接信息 4.配置sql映射文件 5.测试 一、mybatis介绍 官方简介 MyBatis 是一款优秀的持久层框架&#xff0c;它支持自定义 SQL、存…

C语言和JavaScript中的默认排序行为对比

前言 今天在js里使用sort时遇见了一个不理解的现象 即使用sort默认排序后 9 从排序前的第一位被排到了最后一位.一开始我对js sort的理解和c一样&#xff0c;然后通过查阅后发现并不是这样. 正文 排序是一项常见而重要的操作。不同的编程语言提供了不同的排序函数&#xf…

Vue开发中如何解决国际化语言切换问题

Vue开发中如何解决国际化语言切换问题 引言&#xff1a; 在如今的全球化时代&#xff0c;应用程序的国际化变得越来越重要。为了让不同地区的用户能够更好地使用应用程序&#xff0c;我们需要对内容进行本地化&#xff0c;以适应不同语言和文化环境。对于使用Vue进行开发的应用…

ROS2 中的分布式系统

一、说明 当您运行 ROS2 应用程序时&#xff0c;通常需要在不同机器的不同位置运行 ROS2 节点。由于 ROS2 在抽象的 DDS 层中使用节点之间的通信&#xff0c;因此我们可以非常轻松地安排通信。 为了充分理解 ROS2 的架构&#xff0c;我建议您熟悉本文。 出于本文的目的&#xf…

Java 计算生肖,java Data中获取年,根据生日日期获取生肖注解,根据输入时间获取生肖,自定义注解的方式获取生肖 根据年份时间获取十二生肖

最近&#xff0c;开发中需要增加生肖&#xff0c;但是不想增加字段&#xff0c;于是通过注解的方式&#xff0c;实现生日与生肖的转换。 话不多说&#xff0c;直接上代码&#xff0c;如下&#xff1a; 实体类中的字段&#xff0c;添加自定义注解&#xff08;ToChineseZodiacSe…

各地区-不同行业-就业、失业、工资144个指标(1990-2021年)

一、数据介绍 数据名称&#xff1a;各地区-不同行业-就业、失业、工资144个指标 数据年份&#xff1a;1990-2021年&#xff08;1990-2007缺失较多&#xff09; 数据样本&#xff1a;994条 数据整理&#xff1a;自主整理 二、参考文献 [1]戚聿东,刘翠花,丁述磊.数字经济发展…

数据库系统课设——基于python+pyqt5+mysql的酒店管理系统(可直接运行)--GUI编程

几个月之前写的一个项目&#xff0c;通过这个项目&#xff0c;你能学到关于数据库的触发器知识&#xff0c;python的基本语法&#xff0c;python一些第三方库的使用&#xff0c;包括python如何将前后端连接起来&#xff08;界面和数据&#xff09;&#xff0c;还有界面的设计等…