go语言 grpc 拦截器

news2025/2/22 1:45:10

文章目录

    • 拦截器
      • 服务端拦截器
        • 一元拦截器
        • 流拦截器
      • 客户端拦截器
        • 一元拦截器
        • 流拦截`
      • 多个拦截器
  • 代码仓库

拦截器

gRPC拦截器(interceptor)是一种函数,它可以在gRPC调用之前和之后执行一些逻辑,例如认证、授权、日志记录、监控和统计等。拦截器函数是gRPC中非常重要的概念,它允许我们在服务端和客户端添加自定义逻辑,以满足业务需求和运维需求。

在gRPC中,拦截器函数通常通过实现grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口来定义。UnaryServerInterceptor用于拦截一元RPC请求,而StreamServerInterceptor用于拦截流式RPC请求。在客户端中,我们可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor来拦截gRPC调用。

在gRPC中,拦截器函数可以被链接起来,形成一个拦截器链。在这个拦截器链中,每个拦截器函数都可以处理请求并将其转发给下一个拦截器函数,或者直接返回响应。因此,我们可以在拦截器函数中编写不同的逻辑,例如实现认证、授权、监控和统计等。
以下是一些常见的gRPC拦截器:

  • 认证和授权拦截器:用于对gRPC调用进行身份验证和权限控制,例如检查token、验证用户名和密码、检查访问控制列表等;
  • 日志记录拦截器:用于记录gRPC调用的日志,例如记录请求的方法、参数、响应状态等;
  • 监控和统计拦截器:用于监控gRPC调用的性能和吞吐量,例如记录调用次数、响应时间、错误率等;
  • 缓存拦截器:用于在服务端或客户端缓存一些数据,例如缓存计算结果、缓存数据库查询结果等。

服务端拦截器

在这里插入图片描述

一元拦截器
package main

import (
	"context"
	"flag"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "mygrpc/proto/hello" // 引入编译生成的包
)

const (
	defaultName = "world"
)

var (
	addr = flag.String("addr", "localhost:50051", "the address to connect to")
	name = flag.String("name", defaultName, "Name to greet")
)

func main() {
	flag.Parse()
	// 与服务建立连接.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 创建指定服务的客户端
	c := pb.NewGreeterClient(conn)

	// 连接服务器并打印出其响应。
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	// 调用指定方法
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 14:52:55 ======= [Server Interceptor]  /hello.Greeter/SayHello
2023/12/07 14:52:55  Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55  Post Proc Message : message:"Hello world"

流拦截器

流式拦截器需要对grpc.ServerStream进行包装,重新实现RecvMsg和SendMsg方法。

func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {
	log.Printf("Recved %v", req.GetName())
	// 具体返回多少个response根据业务逻辑调整
	for i := 0; i < 10; i++ {
		// 通过 send 方法不断推送数据
		err := stream.Send(&pb.HelloReply{})
		if err != nil {
			log.Fatalf("Send error:%v", err)
			return err
		}
	}
	return nil
}

type wrappedStream struct {
	// 包装器流
	grpc.ServerStream
}
// 接受信息拦截器
func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.RecvMsg(m)
}
// 发送消息拦截器
func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ServerStream.SendMsg(m)
}

func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
	return &wrappedStream{s}
}

func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 前置处理
	log.Println("====== [Server Stream Interceptor] ", info.FullMethod)

	// 包装器流调用 流RPC
	err := handler(srv, newWrappedStream(ss))
	if err != nil {
		log.Printf("RPC failed with error %v", err)
	}
	return err
}
func main() {
	flag.Parse()
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	// 开启rpc
	s := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))
	// 注册服务
	pb.RegisterGreeterServer(s, &server{})
	log.Printf("service listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

结果

GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor]  /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 开始服务端rpc流测试
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)


客户端拦截器

在这里插入图片描述

一元拦截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// 前置处理逻辑
	log.Println("Method : " + method)

	// 调用invoker 执行远程方法
	err := invoker(ctx, method, req, reply, cc, opts...)

	// 后置处理逻辑
	log.Println(reply)
	return err
}

func main() {
	flag.Parse()
	// 与服务建立连接.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加拦截器
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	// 创建指定服务的客户端
	c := pb.NewGreeterClient(conn)

	// 连接服务器并打印出其响应。
	ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 设置超时时间为一秒
	defer cancel()
	// 调用指定方法
	r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	log.Printf("Greeting: %s", r.GetMessage())
}

结果

2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流拦截`
type wrappedStream struct {
	grpc.ClientStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
	log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
	log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))
	return w.ClientStream.SendMsg(m)
}

func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {
	return &wrappedStream{s}
}

func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	
  // 前置处理逻辑
	log.Println("======= [Client Interceptor] ", method)
  
  // 调用streamer 来获取客户端流
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}
	return newWrappedStream(s), nil
}

func main(){
   // 注册拦截器到客户端流
 conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
                     
	c := pb.NewOrderManagementClient(conn)
  ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
  
  // 调用客户端流RPC方法
  searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})
	for {
		searchOrder, err := searchStream.Recv()
		if err == io.EOF {
			log.Print("EOF")
			break
		}
		if err == nil {
			log.Print("Search Result : ", searchOrder)
		}
	}
}

结果

2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客户端流传输结束

多个拦截器

在grpc中默认的拦截器不可以传多个,因为在源码中,存在一些问题

func chainUnaryClientInterceptors(cc *ClientConn) {
	interceptors := cc.dopts.chainUnaryInts
	if cc.dopts.unaryInt != nil {
		interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
	}
	var chainedInt UnaryClientInterceptor
	if len(interceptors) == 0 {
		chainedInt = nil
	} else if len(interceptors) == 1 {
		chainedInt = interceptors[0]
	} else {
		chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
			return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
		}
	}
	cc.dopts.unaryInt = chainedInt
}

当存在多个拦截器时,取的就是第一个拦截器。因此结论是允许传多个,但并没有用。

如果真的需要多个拦截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 链式方法。核心方法如下

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
	n := len(interceptors)
	if n > 1 {
		lastI := n - 1
		return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
			var (
				chainHandler grpc.UnaryInvoker
				curI         int
			)

			chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
				if curI == lastI {
					return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)
				}
				curI++
				err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)
				curI--
				return err
			}

			return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)
		}
	}
    ...
}

代码仓库

https://github.com/onenewcode/mygrpc.git

也可以直接下载绑定的资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1296553.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

目标检测——OverFeat算法解读

论文&#xff1a;OverFeat: Integrated Recognition, Localization and Detection using Convolutional Networks 作者&#xff1a;Pierre Sermanet, David Eigen, Xiang Zhang, Michael Mathieu, Rob Fergus, Yann LeCun 链接&#xff1a;https://arxiv.org/abs/1312.6229 文章…

actitivi自定义属性(二)

声明&#xff1a;此处activiti版本为6.0 此文章介绍后端自定义属性解析&#xff0c;前端添加自定义属性方法连接&#xff1a;activiti自定义属性&#xff08;一&#xff09;_ruoyi activiti自定义标题-CSDN博客 1、涉及到的类如下&#xff1a; 简介&#xff1a;DefaultXmlPar…

2023年江西省“振兴杯”网络信息行业(信息安全测试员)职业技能竞赛 Write UP

文章目录 一、2023csy-web1二、2023csy-web2三、2023csy-web3四、2023csy-web4五、2023csy-misc1六、2023csy-misc2七、2023csy-crypto1八、2023csy-re1 一、2023csy-web1 该题提供一个web靶场&#xff0c;《伟大的挑战者》&#xff0c;分值&#xff1a;5分 web页面一直在播放c…

nodejs+vue+微信小程序+python+PHP的智能停车系统-计算机毕业设计推荐django

随着网络技术的不断发展&#xff0c;多媒体技术应用渐渐的出现在教育领域中&#xff0c;智能停车算法研究管理已经成为社会的一个热门话题。互联网应用在全球范围内日益普及&#xff0c;在许多的网络服务中&#xff0c;Web给人耳目一新的感觉。在这其中&#xff0c;网络平台开展…

unity 2d 入门 飞翔小鸟 小鸟碰撞 及死亡(九)

1、给地面&#xff0c;柱体这种添加2d盒装碰撞器&#xff0c;小鸟移动碰到就不会动了 2、修改小鸟的脚本&#xff08;脚本命名不规范&#xff0c;不要在意&#xff09; using System.Collections; using System.Collections.Generic; using UnityEngine;public class Fly : Mo…

【hugging face】bitsandbytes中8 bit量化的理解

8 位量化使数十亿参数规模的模型能够适应更小的硬件&#xff0c;而不会降低性能。 8 位量化的工作原理如下&#xff1a; 1.从输入隐藏状态中按列提取较大值&#xff08;离群值&#xff09;。 2.对 FP16 中的离群值和 int8 中的非离群值执行矩阵乘法。 3.改变非异常值结果以将值…

SAP UI5 walkthrough step6 Modules

在SAPUI5 中&#xff0c;资源通常用作Modules&#xff0c;这个我们将用Message Toast 来实现告警功能 修改controller.js webapp/controller/App.controller.js sap.ui.define(["sap/ui/core/mvc/Controller","sap/m/MessageToast" ], (Controller, Mes…

running小程序重要技术流程文档

一、项目文件说明&#xff1a; &#xff08;注&#xff1a;getMyMoney无用已删除&#xff09; 二、重要文件介绍 1.reinfo.js&#xff1a;位于utils文件下&#xff0c;该文件封装有统一的请求URL&#xff0c;和请求API同意封装供页面调用&#xff1b;调用时候需要在页面上先…

算法Day26 数位统计

数位统计 Description 给你一个整数n&#xff0c;统计并返回各位数字都不同的数字x的个数&#xff0c;其中0 ≤ x < 10^n。 Input 输入整数n 0≤n≤13 Output 输出整数个数 Sample 代码 import java.util.Scanner;public class Main {public static void main(String[] ar…

鸿蒙方舟开发框架ArkUI简介

语雀知识库地址&#xff1a;语雀HarmonyOS知识库 飞书知识库地址&#xff1a;飞书HarmonyOS知识库 嗨&#xff0c;各位别来无恙呐&#xff0c;我是小白 众所周知&#xff0c;华为在今年推出了 HarmonyOS 4.0 版本&#xff0c;而在此之前的版本中&#xff0c;HarmonyOS 应用的 …

TCP一对一聊天

客户端 import java.awt.BorderLayout; import java.awt.Color; import java.awt.Dimension; import java.awt.Font; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.io.BufferedReader; import java.io.IOException; import java.io…

【web安全】文件读取与下载漏洞

前言 菜某整理仅供学习&#xff0c;有误请赐教。 概念 个人理解&#xff1a;就是我们下载一个文件会传入一个参数&#xff0c;但是我们可以修改参数&#xff0c;让他下载其他的文件。因为是下载文件&#xff0c;所以我们可以看到文件里面的源码&#xff0c;内容。 文件读取…

MindOpt APL:一款适合优化问题数学建模的编程语言

什么是建模语言 建模语言是一种描述信息或模型的编程语言&#xff0c;在运筹优化领域&#xff0c;一般是指代数建模语言。 比如要写一个线性规划问题的建模和求解&#xff0c;可以采用C、Python、Java等通用编程语言来实现计算机编程&#xff08;码代码&#xff09;&#xff0…

JavaScript常用技巧专题一

文章目录 一、前言二、生成随机颜色的两种方式2.1、生成RandomHexColor2.2、生成随机RGBA 三、复制内容到剪贴板的两种方式3.1、方式13.2、方式2 四、获取URL中的查询参数五、打乱数组六、深拷贝一个对象七、确保元素在可见区域内八、获取当前选中的文本九、浏览器cookie9.1、获…

深入了解数据库锁:类型、应用和最佳实践

目录 1. 引言 2. 数据库锁的基本概念 2.1 悲观锁和乐观锁 2.2 排他锁和共享锁 3. 悲观锁的应用场景 3.1 长事务和大事务 3.2 并发修改 3.3 数据库死锁 4. 悲观锁的最佳实践 4.1 精细控制锁的粒度 4.2 避免死锁 4.3 考虑乐观锁 5. 案例分析 5.1 银行系统的转账操作…

搭乘“低代码”快车,引领食品行业数字化转型全新升级

数字化技术作为重塑传统行业重要的力量&#xff0c;正以不可逆转的趋势改变着企业经营与客户消费的方式。 在近些年的企业数字化服务与交流过程中&#xff0c;织信团队切实感受到大多数企业经营者们从怀疑到犹豫再到焦虑最终转为坚定的态度转变。 在这场数字化转型的竞赛中&a…

Could not resolve all dependencies for configuration ‘:app:androidApis‘.

android studio出现Could not resolve all dependencies for configuration ‘:app:androidApis’. 试过很多种方法&#xff0c;但是都不好使&#xff0c;不管怎么样都是提示如下报错&#xff1a; Using insecure protocols with repositories, without explicit opt-in, is un…

Unity中Batching优化的GPU实例化整理总结

文章目录 前言一、GPU Instancing的支持1、硬件支持2、Shader支持3、脚本支持 二、我们来顺着理一下GPU实例化的使用步骤1、GPU实例化前的C#代码准备2、在 appdata 和 v2f 中定义GPU实例化ID3、在顶点着色 和 片元着色器 设置GPU Instance ID&#xff0c;使实例化对象顶点位置正…

RK3588平台开发系列讲解(hardware)reference-ril源码分析

平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、reference-ril目录介绍二、支持的功能三、Android RIL 框架沉淀、分享、成长,让自己和他人都能有所收获!😄 一、reference-ril目录介绍 目录:3588-android12/hardware/ril/reference-ril

做数据分析为何要学统计学(5)——什么问题适合使用t检验?

t检验&#xff08;Students t test&#xff09;&#xff0c;主要依靠总体正态分布的小样本&#xff08;例如n < 30&#xff09;对总体均值水平进行差异性判断。 t检验要求样本不能超过两组&#xff0c;且每组样本总体服从正态分布&#xff08;对于三组以上样本的&#xff0…