微服务网关(四)tcp代理模块

news2025/2/5 2:48:15

微服务网关(四)tcp代理模块

tcp代理服务器的代理实现:

image-20230111155622092

请求流程:

image-20230111160021918

代理的启停方法

//并发执行
go func() {
	tcp_proxy_router.TcpServerRun()
}()

tcp_proxy_router.TcpServerStop()

tcp_server

一次完整流程

tcp_server.go

首先是定义TcpServer结构体

type TcpServer struct {
	Addr    string     //监听地址
	Handler TCPHandler //实际逻辑回调设置handler
	err     error
	BaseCtx context.Context //上下文
	//读写超时设置
	WriteTimeout     time.Duration
	ReadTimeout      time.Duration
	KeepAliveTimeout time.Duration //连接一直保持 发数据包的时间
	mu               sync.Mutex //锁
	inShutdown       int32      //是否关闭
	doneChan         chan struct{}
	l                *onceCloseListener //单次启动时,listen需要执行的一次性操作的设置
}

然后定义TCP开启和关闭方法

ListenAndServe

func (srv *TcpServer) ListenAndServe() error {
	//验证服务是否关闭
		//原子方法验证是否为0
		//atomic.LoadInt32(&srv.inShutdown) != 0
    
	if srv.shuttingDown() {
		return ErrServerClosed
	}
	if srv.doneChan == nil {
		srv.doneChan = make(chan struct{})
	}
	addr := srv.Addr
	if addr == "" {
		return errors.New("need addr")
	}
	//调用核心的官方方法,并传入到Serve中
	ln, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}
	return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

Serve

func (srv *TcpServer) Serve(l net.Listener) error {
	srv.l = &onceCloseListener{Listener: l}
	//退出时执行listener关闭
	defer srv.l.Close()
	if srv.BaseCtx == nil {
		srv.BaseCtx = context.Background()
	}
	BaseCtx := srv.BaseCtx
	ctx := context.WithValue(BaseCtx, ServiceContextKey, srv)
	//轮询Listener的Accept方法,获取客户端发来的conn
	for {
		//轮询Listener的Accept方法,获取客户端发来的conn
		rw, err := l.Accept()
		if err != nil {
			select {
			case <-srv.getDoneChan():
				return ErrServerClosed
			default:
			}
			fmt.Printf("accept fail, err: %v\n", err)
			continue
		}
		//拿到便马上创建连接,见下文
		c := srv.newConn(rw)
        //这里便跳转到了tcp_conn.go中的serve方法进行协程处理,见下文
		go c.serve(ctx)
	}
	return nil
}

tcp_conn.go

跳转到tcp_conn.go中的newConn、serve方法

func (srv *TcpServer) newConn(rwc net.Conn) *conn {
   c := &conn{
      server: srv,
      rwc:    rwc,
   }
   // 设置超时时间参数返回
   if d := c.server.ReadTimeout; d != 0 {
      c.rwc.SetReadDeadline(time.Now().Add(d))
   }
   if d := c.server.WriteTimeout; d != 0 {
      c.rwc.SetWriteDeadline(time.Now().Add(d))
   }
   if d := c.server.KeepAliveTimeout; d != 0 {
      if tcpConn, ok := c.rwc.(*net.TCPConn); ok {
         tcpConn.SetKeepAlive(true)
         tcpConn.SetKeepAlivePeriod(d)
      }
   }
   return c
}

server中,使用recover拦截错误信息的原因:因为是用go开启的协程,所以里面的error是用的panic,所以使用了recover进行捕获

func (c conn) serve(ctx context.Context) {
   defer func() {
      //recover拦截错误信息并打印
      if err := recover(); err != nil && err != ErrAbortHandler {
         const size = 64 << 10
         buf := make([]byte, size)
         buf = buf[:runtime.Stack(buf, false)]
         fmt.Printf("tcp: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
      }
      c.close()
   }()
   //获取连接地址、上下文、handler
   c.remoteAddr = c.rwc.RemoteAddr().String()
   ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
   if c.server.Handler == nil {
      panic("handler empty")
   }
   //接着实现Handler中的ServeTCP即可
   c.server.Handler.ServeTCP(ctx, c.rwc)
}

tcp_proxy_router

tcpserver.go

TcpServerRun
  1. 获取TCP服务列表,将tcp的端口全部打开
func TcpServerRun() {
	// 获取TCP服务列表,将tcp的端口全部打开
	serviceList := dao.ServiceManagerHandler.GetTcpServiceList()
	for _, serviceItem := range serviceList {
        tempItem := serviceItem
		//通过将tempItem传入协程 开启所有的tcp服务
        //具体协程内容见下一个代码块
		go func(serviceDetail *dao.ServiceDetail) {
            //配置回调handler
            //......
            //配置上下文
            //......
            //配置tcp服务器TCPServer
			tcpServer := &tcp_server.TcpServer{
				Addr:    addr,
				Handler: routerHandler,
				BaseCtx: baseCtx,
			}
			//放入切片中
			tcpServerList = append(tcpServerList, tcpServer)
			log.Printf(" [INFO] tcp_proxy_run %v\n", addr)
			//开启监听
			if err := tcpServer.ListenAndServe(); err != nil && err != tcp_server.ErrServerClosed {
				log.Fatalf(" [INFO] tcp_proxy_run %v err:%v\n", addr, err)
			}
        }(tempItem)
	}
}

协程内部

  1. 获取端口
  2. 获取负载均衡
  3. 构建路由及设置中间件
  4. 构建回调handler
    1. 传入负载均衡策略,路由
  5. 配置TCPServer
  6. 调用tcpServer.ListenAndServe()方法开启服务监听
//通过将tempItem传入协程 开启所有的tcp服务
go func(serviceDetail *dao.ServiceDetail) {
   //设置tcp服务器
   //获取端口
   addr := fmt.Sprintf(":%d", serviceDetail.TCPRule.Port)
   //获取负载均衡
   rb, err := dao.LoadBalancerHandler.GetLoadBalancer(serviceDetail)
   if err != nil {
      log.Fatalf(" [INFO] GetTcpLoadBalancer %v err:%v\n", addr, err)
      return
   }
   //构建路由及设置中间件
   router := tcp_proxy_middleware.NewTcpSliceRouter()
   router.Group("/").Use(
      tcp_proxy_middleware.TCPFlowCountMiddleware(),
      tcp_proxy_middleware.TCPFlowLimitMiddleware(),
      tcp_proxy_middleware.TCPWhiteListMiddleware(),
      tcp_proxy_middleware.TCPBlackListMiddleware(),
   )

   //构建回调handler
   routerHandler := tcp_proxy_middleware.NewTcpSliceRouterHandler(
      //传入负载均衡策略,路由
      func(c *tcp_proxy_middleware.TcpSliceRouterContext) tcp_server.TCPHandler {
         return reverse_proxy.NewTcpLoadBalanceReverseProxy(c, rb)
      }, router)
   baseCtx := context.WithValue(context.Background(), "service", serviceDetail)

   // 配置TCPServer
   tcpServer := &tcp_server.TcpServer{
      Addr:    addr,
      Handler: routerHandler,
      BaseCtx: baseCtx,
   }
    //放入切片中
   tcpServerList = append(tcpServerList, tcpServer)
   log.Printf(" [INFO] tcp_proxy_run %v\n", addr)

   //开启监听
   if err := tcpServer.ListenAndServe(); err != nil && err != tcp_server.ErrServerClosed {
      log.Fatalf(" [INFO] tcp_proxy_run %v err:%v\n", addr, err)
   }
}(tempItem)

注意这里还要定义切片,然后将tcpServer放入切片中(36行)因为之后我们还需要利用这个切片数组来关闭tcp服务

image-20230111164613766

ServerStop

遍历tcp切片列表关闭即可

func TcpServerStop() {
   for _, tcpServer := range tcpServerList {
      tcpServer.Close()
      log.Printf(" [INFO] tcp_proxy_stop %v stopped\n", tcpServer.Addr)
   }
}

反向代理

TCP反向代理的源码实现

reverse_proxy

tcp_reverse_proxy.go

首先是TCP反向代理结构体

// TcpReverseProxy TCP反向代理
type TcpReverseProxy struct {
   ctx                  context.Context //单次请求单独设置
   Addr                 string
   KeepAlivePeriod      time.Duration //设置
   DialTimeout          time.Duration //设置超时时间
   DialContext          func(ctx context.Context, netWork, address string) (net.Conn, error)
   OnDialError          func(src net.Conn, dstDialErr error)
   ProxyProtocolVersion int
}

接着设置New方法

func NewTcpLoadBalanceReverseProxy(c *tcp_proxy_middleware.TcpSliceRouterContext, lb load_balance.LoadBalance) *TcpReverseProxy {
	return func() *TcpReverseProxy {
		nextAddr, err := lb.Get("")
		if err != nil {
			log.Fatal("get next addr fail")
		}
		//设置上TCP反向代理结构体
		return &TcpReverseProxy{
			ctx:             c.Ctx,
			Addr:            nextAddr,
			KeepAlivePeriod: time.Second,
			DialTimeout:     time.Second,
		}
	}()
}

然后就是编写核心方法ServeTCP

// ServeTCP 传入上游 conn,在这里完成下游连接与数据交换
func (dp *TcpReverseProxy) ServeTCP(ctx context.Context, src net.Conn) {
   //设置连接超时
   var cancel context.CancelFunc
   if dp.DialTimeout >= 0 {
      ctx, cancel = context.WithTimeout(ctx, dp.dialTimeout())
   }
   //开启与下游的连接,见下文
   dst, err := dp.dialContext()(ctx, "tcp", dp.Addr)
   if cancel != nil {
      cancel()
   }
   if err != nil {
      dp.onDialError()(src, err)
      return
   }
   defer func() { go dst.Close() }() //记得退出下游连接

   //设置dst的 keepAlive 参数,在数据请求之前
   if ka := dp.keepAlivePeriod(); ka > 0 {
      if c, ok := dst.(*net.TCPConn); ok {
         c.SetKeepAlive(true)
         c.SetKeepAlivePeriod(ka)
      }
   }
   errc := make(chan error, 1)
   //开启协程进行上游下游数据交换,见下文
   go dp.proxyCopy(errc, src, dst)
   go dp.proxyCopy(errc, dst, src)
   <-errc
}
//开启与下游的连接
func (dp *TcpReverseProxy) dialContext() func(ctx context.Context, netWork, address string) (net.Conn, error) {
   if dp.DialContext != nil {
      return dp.DialContext
   }
   return (&net.Dialer{
      Timeout:   dp.DialTimeout,     //连接超时
      KeepAlive: dp.KeepAlivePeriod, //设置连接的检测时长
   }).DialContext
}
//上游下游数据交换
func (dp *TcpReverseProxy) proxyCopy(errc chan<- error, dst, src net.Conn) {
	_, err := io.Copy(dst, src)
	errc <- err
}

结束!

补充

TCP代理特点:

  • 流式数据及无状态数据推荐使用
  • 对服务管控比较少,只能做流量控制及请求来源限制
  • 如果有对应协议代理,推荐使用对应协议代理

thrift:

​ 只使用过一次,记得不太清了,等着以后想起来再回过头填坑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/343902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JVM的垃圾回收机制

复制算法、Eden区和Survivor区 首先我们就来探索一下对于JVM堆内存中的新生代区域&#xff0c;是怎么进行垃圾回收的。 实际上JVM是把新生代分为三块区域的&#xff1a;1个Eden区&#xff0c;2个Survivor区。 其中Eden区占用80%的内存空间&#xff0c;每块Survivor各占用10%的内…

使用yolov5训练数据集笔记

准备工作 1. 安装labelimg labelimg:主要用于目标检测的目标框绘制&#xff0c;得到关于我们训练的边框位置、类别等数据 pip install labelimg2. 下载yolov5源码 我使用的是v7.0版本&#xff0c;直接下载即可&#xff0c;下载后解压出来 2.1 安装yolov5运行依赖包 进入…

SurfaceFlinger详解

SurfaceFlinger的定义 大多数应用在屏幕上一次显示三个层&#xff1a;屏幕顶部的状态栏、底部或侧面的导航栏以及应用界面。有些应用会拥有更多或更少的层&#xff08;例如&#xff0c;默认主屏幕应用有一个单独的壁纸层&#xff0c;而全屏游戏可能会隐藏状态栏&#xff09;。…

棱形打印--进阶2(Java)

棱形打印 问题 * *** ***** ******* ********* ******* ***** *** * * * …

centos上搭建nginx视频点播服务器(nginx+vod+lua http发送鉴权消息)

需求背景&#xff1a;想着搭建一个视频点播服务器&#xff0c;最后选择了nginxvod的方案&#xff0c;用lua脚本写拉流鉴权&#xff0c;但是环境搭建过程中又发现nginxvodlua的环境并不是很容易搭建&#xff0c;是nginxlua的环境&#xff0c;手动搭建比较麻烦&#xff0c;但还是…

Numpy基础与实例——人工智能基础

文章目录一、Numpy概述1、优势2、numpy历史3、Numpy的核心&#xff1a;多维数组4、内存中的ndarray对象4.1 元数据&#xff08;metadata&#xff09;4.2 实际数据二、numpy基础1、 ndarray数组2、 arange、zeros、ones、zeros_like3、ndarray对象属性的基本操作3.1 修改数组维度…

羊了个羊游戏开发教程1:堆叠牌的拾取

本文首发于微信公众号&#xff1a; 小蚂蚁教你做游戏。欢迎关注领取更多学习做游戏的原创教程资料&#xff0c;每天学点儿游戏开发知识。嗨&#xff01;大家好&#xff0c;我是小蚂蚁。最近“羊了个羊”小游戏爆火。一下子让想做微信小游戏或者想学做微信小游戏的人多了很多&am…

Java Map集合

8 Map集合 HashMap: 元素按照键是无序&#xff0c;不重复&#xff0c;无索引&#xff0c;值不做要求 LinkedHashMap: 元素按照键是有序&#xff0c;不重复&#xff0c;无索引&#xff0c;值不做要求 8.1 Map集合概述和特点 Map集合是一种双列集合&#xff0c;每个元素包含两个…

【C++】 C C++ 内存管理

文章目录&#x1f4d5; C、C 内存分布&#x1f4d5; C 内存管理方式1. 操作内置类型2. 操作自定义类型&#x1f4d5; operator new 与 operator delete&#x1f4d5; 定位 new&#x1f4d5; C、C 内存分布 C 和 C 的内存分布没什么区别&#xff0c;C 是基于 C 语言的&#xff…

腾讯xSRC[linux+docker]搭建教程

腾讯xSRC[linuxdocker]搭建教程 1.下载镜像 docker pull xsrc/xsrc:v1.0.12.启动镜像 1️⃣启动镜像 docker run -it -d --name xsrc_web -p 60080:80 -p 63306:3306 --privilegedtrue xsrc/xsrc:v1.0.1注意将3306端口映射到8806端口&#xff0c;以便于远程连接访问容器内数…

手写识别字体的步骤是什么?怎么识别图片中的文字?

手写识别字体的步骤是什么&#xff1f;怎么识别图片中的文字&#xff1f; 1. 打开信风工具网&#xff0c;点击拍照按钮&#xff0c;选择拍图识字模式&#xff0c;对准需要识别的文件进行拍摄&#xff61;在线工具地址&#xff1a; https://ocr.bytedance.zj.cn/image/ImageT…

VScode 自定义主题颜色

vscode其实已经有很多完善且好看的主题了&#xff0c;但我总觉得每一个主题对我来说&#xff0c;都有那么一点点不够完美&#xff0c;比如亮色的主题&#xff0c;颜色就没有深色主题那么好看&#xff0c;对比度高。 好不容易看到一个好看的主题吧&#xff0c;又觉得某一部分的…

2023213-popover弹窗框中的teleported属性--Element-plus踩坑日记

popover弹窗框中的teleported属性–Element plus踩坑日记 今天在做项目时&#xff0c;有一个地方用到了弹窗框&#xff0c;但是有需求需要修改弹窗的阴影部分 比如下方的 我想对阴影进行修改&#xff0c;但是很是纳闷&#xff0c;各种标签选择器都不生效&#xff0c;很奇怪。…

使用地理定位来自定义网络钓鱼

在全球市场中&#xff0c;地理定位的能力是巨大的。 从本质上讲&#xff0c;这意味着企业可以根据收件人的位置定制广告。 纽约人可能会收到与法国人不同的广告。这使得广告对企业更有价值&#xff0c;对消费者来说更个性化。 还有另一群人想要个性化他们的产品&#xff1a;…

2023年要跟踪的11个销售管理关键指标

销售管理关键指标有&#xff1a;营销合格线索数量&#xff08;MQL&#xff09;、MQL 到 SQL 的转换率、商机赢单率、获客成本、总销售额、客户终身价值&#xff08;LTV&#xff09;、LTV 与 CAC 比率、赢单周期、每客户平均销售额&#xff08;平均客单价&#xff09;、每销售人…

全球十大资质正规现货黄金交易平台排名榜单(最新版汇总)

如今&#xff0c;在金融市场上&#xff0c;黄金已经成为公众喜爱的避险产品&#xff0c;尤其是近年来出现的现货黄金&#xff0c;这是许多朋友日常财务管理的标准。但我们在参考黄金交易平台排名进场时&#xff0c;需要留意哪些因素&#xff1f; 1、交易模式 事实上&#xf…

软件测试 -- 高阶 2 软件测试与软件开发

辅车相依&#xff0c;唇亡齿寒。-- 《左传僖公五年》 释译&#xff1a;颊骨和齿床互相依靠&#xff0c;嘴唇没有了&#xff0c;牙齿就会感到寒冷。比喻利害密要相关&#xff0c;命运紧密相关联。-- 百度百科 测试与开发是什么关系&#xff1f; 1. 软件开发流程 2. 开发和测…

AcWing 167. 木棒(DFS + 剪枝优化)

AcWing 167. 木棒&#xff08;DFS 剪枝优化&#xff09;一、问题二、分析1、整体分析2、剪枝优化&#xff08;1&#xff09;优化搜索顺序&#xff08;2&#xff09;排除等效冗余&#xff08;3&#xff09;可行性剪枝&#xff08;4&#xff09;最优性剪枝&#xff08;5&#xf…

ASEMI低压MOS管AO3401封装,AO3401图片

编辑-Z ASEMI低压MOS管AO3401参数&#xff1a; 型号&#xff1a;AO3401 封装&#xff1a;SOT-23 漏极-源极电压&#xff08;VDS&#xff09;&#xff1a;30V 栅源电压&#xff08;VGS&#xff09;&#xff1a;12V 连续漏电流&#xff08;I&#xff09;&#xff1a;4.2A …

K_A12_004 基于STM32等单片机采集人体红外感应(HC-SR501)模块串口与OLED0.96双显示

K_A12_004 基于STM32等单片机采集人体红外感应&#xff08;HC-SR501&#xff09;模块串口与OLED0.96双显示一、资源说明二、基本参数参数引脚说明三、驱动说明模块工作原理:对应程序:四、部分代码说明1、接线引脚定义1.1、STC89C52RCHC-SR501模块1.2、STM32F103C8T6HC-SR501模块…