Golang——gRPC认证和拦截器

news2024/11/27 21:44:12

一. OpenSSL

        1.1 介绍

        OpenSSL是一个开放源代码的软件库包,用于支持网络通讯过程中的加密。这个库提供的功能包含了SSL和TLS协议的实现,并可用于生成密钥、证书、进行密码运算等。

        其组成主要包括一下三个组件:

  1. openssl:多用途的命令行工具

  2. libcrypto:加密算法库

  3. libssl:加密模块应用库,实现了ssl及tls

openssl可以实现秘钥证书管理、对称加密和非对称加密 。

        官网:[ Downloads ] - /source/index.html

        1.2 Windows安装方法

        OpenSSL官网没有提供windows版本的安装包,可以选择其它开源平台提供的工具。

Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions

        以该工具为例:

        进入下载界面,选择下载的版本,下载完,之后安装即可。

        1.3 生成公钥和私钥

        openssl命令详解-CSDN博客

生成私钥:openssl genrsa -out rsa_private_key.pem 1024
生成公钥:openssl rsa -in rsa_private_key.pem -pubout -out rsa_public_key.pem

二. gRPC认证

        gRPC默认内置了两种认证方式:

  • SSL/TLS认证
  • 基于Token的认证

        同时,gRPC提供了接口用于扩展自定义认证方式。

        2.1 TLS认证

        2.1.1 什么是TLS认证

        TLS(Transport Layer Security,安全传输层),TLS是建立在传输层TCP协议之上的协议,服务于应用层,它的前身是SSL(Secure Socket Layer,安全套接字层),它实现了将应用层的报文进行加密后再交由TCP进行传输的功能。

        2.1.2 TLS的作用

TLS协议主要解决如下三个网络安全问题。

  • 保密(message privacy),保密通过加密encryption实现,所有信息都加密传输,第三方无法嗅探;
  • 完整性(message integrity),通过MAC校验机制,一旦被篡改,通信双方会立刻发现;
  • 认证(mutual authentication),双方认证,双方都可以配备证书,防止身份被冒充;

        2.1.3 TLS认证实例

  • 证书制作

        制作公钥:自签名公钥(x509),   制作私钥。

#生成一个名为server_private.key的RSA私钥,使用SHA256算法和4096位密钥长度。然后使用该私钥生成一个有效期为36500天的自签名证书,并将其保存为名为server.pem的文件。同时在证书中添加subjectAltName扩展,指定DNS名称为www.wy.com。
openssl req -newkey rsa:4096 -nodes -sha256 -keyout server_private.key -x509 -days 36500 -out server.pem -addext "subjectAltName =DNS:www.wy.com"
  •  openssl req:生成自签名证书
  • -newkey rsa:4096 :生成新的4096位rsa密钥对
  • -sha256:使用sha256加密
  • -keyout:指定生成的私钥文件
  • -x509:指输出证书
  • -days 36500:有效期 36500
  • -out:输出证书的文件名
  • -addext:添加扩展

        注意需要在证书中添加subjectAltName扩展,指定DNS名称。不然在客户端连接服务器时会报错,报错信息为:

        rpc error: code = Unavailable desc = connection error: desc = "transport: authentication handshake failed: tls: failed to verify certificate: x509: certificate relies on legacy Common Name field, use SANs instead"

        因为go1.15 版本开始废弃CommonName,因此推荐使用SAN证书。如果想兼容之前的方式,需要设置环境变量 GODEBUG为 x509ignoreCN=0。(创建 SSL/TLS 证书时,证书依赖于传统的 Common Name (CN) 字段,而没有使用现代标准所推荐的 Subject Alternative Names (SANs) 字段。现代的 TLS 客户端(比如最新版本的浏览器和安全工具)要求证书使用 SANs 字段来指定有效的主机名。)

        自定义信息: 

You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:CN #国家
State or Province Name (full name) [Some-State]:SHANGHAI #省份
Locality Name (eg, city) []:SHANGHAI #城市
Organization Name (eg, company) [Internet Widgits Pty Ltd]:BF #公司
Organizational Unit Name (eg, section) []:Dev #部门
Common Name (e.g. server FQDN or YOUR name) []:www.wy.com #服务器名称
Email Address []:xxx@xxx.com #邮箱地址
  • 目录结构

  • 示例代码 

         服务端代码:

package main

import (
	"context"
	"fmt"
	"net"
	hello "sample-app/grpc/proto"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials" //引入gRPC认证包
)

const (
    //服务器地址
    Addr = "127.0.0.1:8080"
)

type helloService struct{}

//定义hello 服务
var HelloService = helloService{}

//实现proto hello service方法
func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
	resp := new(hello.HelloResponse)
	resp.Message = fmt.Sprintf("Hello %s", req.Name)
	return resp, nil
}

func main() {
	ls, err := net.Listen("tcp", Addr)
	if err != nil {
		fmt.Println(err)
		return
	}
	//TLS认证
	cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
	if err != nil {
		fmt.Println(err)
		return
	}
	//新建一个grpc服务器,并开启TLS认证
    //上面监听并没有进行连接客户端
	server := grpc.NewServer(grpc.Creds(cert))
	//注册HelloService
	hello.RegisterHelloServer(server, HelloService)
	fmt.Println("Listen on" + Addr + "with TLS")
    //这里面才会连接客户端,需要进行认证
	server.Serve(ls)
}
  • credentials.NewServerTLSFromFile:从输入证书文件和密钥文件为服务端构造TLS凭证

  • grpc.Creds:返回一个ServerOption,用于设置服务器连接的凭证。

        客户端代码:

package main

import (
	"context"
	"fmt"
	hello "sample-app/grpc/proto"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

const (、
    //gRPC服务器地址
	Addr = "127.0.0.1:8080"
)

func main() {
	//TLS连接
	cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
	if err != nil {
		fmt.Println("credentials fail ", err)
		return
	}
    //请求连接的时候 需要认证
	conn, err := grpc.Dial(Addr, grpc.WithTransportCredentials(cert))
	if err != nil {
		fmt.Println("Dial fail", err)
		return
	}
	defer conn.Close()

	c := hello.NewHelloClient(conn)
	req := new(hello.HelloRequest)
	req.Name = "gRPC"

	resp, err := c.SayHello(context.Background(), req)
	if err != nil {
		fmt.Println("say hello fail", err)
		return
	}

	fmt.Println(resp.Message)
}
  • credentials.NewClientTLSFromFile:从输入的证书文件中为客户端构造TLS凭证。

  • grpc.WithTransportCredentials:配置连接级别的安全凭证(例如,TLS/SSL),返回一个DialOption,用于连接服务器。

        proto文件:

syntax="proto3";
package hello;
option go_package="hello";

service Hello
{
    rpc SayHello(HelloRequest)returns(HelloResponse){}; 
}

message HelloRequest
{
    string name = 1;
}

message HelloResponse
{
    string message = 1;
}

使用下面命令生成pb.go文件:

protoc --go_out=plugins=grpc:"生成pb.go文件地址" -I="proto文件地址" "proto文件地址\文件" 

演示:

        实际TLS认证不是这样,客户端和服务器时分离的。客户端有证书(包含公钥),服务端有证书和私钥。

        客户端发送请求给服务器请求连接,服务器将证书通过私钥加密后发送给客户端。客户端有证书,里面包含服务器私钥对应的公钥。使用公钥对数据进行解密,获得证书数据,与本地证书数据进行比较。

         2.2 Token认证

        继续扩展上面的代码,实现TLS+Token认证机制。

        2.2.1 什么是Token认证

        Token认证是一种基于Token的身份验证方法,用于在客户端和服务器之间进行身份验证。以下是Token认证的主要概念、流程以及优缺点:

  • 主要概念

    • Token的含义:Token(令牌)是服务端生成的一串字符串,作为客户端进行请求的一个标识。
    • Token的组成:一般包括用户身份标识(uid)、时间戳(time)和签名(sign)等元素。
    • Token的作用:Token主要用于身份验证、授权、会话管理和跨域资源共享(CORS)等方面。
  • 认证流程

    • 用户登录并获取Token:用户使用用户名和密码登录,成功后服务端生成Token并发送给客户端。
    • 客户端存储和使用Token:客户端将Token保存在本地(如cookie或localStorage),并在后续请求中携带该Token。
    • 服务端验证Token:服务端收到请求后,验证Token的合法性,若合法则处理请求并返回数据。

        2.2.2 示例代码

        根据上面的代码,实现TLS+Token认证机制。

  • 认证原理

        客户端发送请求,会将Token放到context.Context上下文中,服务器收到请求,从上下文中获取Token验证,然后进行下一步操作。

  • 目录结构

  • 客户端代码

        grpc/credential包内默认定义了PerRPCCredentials接口,是提供用于自定义接口,他的作用是将所需安全认证信息添加到每个RPC上下文中。其包含两个方法。

type PerRPCCredentials interface {
    //获取当前请求认证所需的元数据
	GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
    //是否需要基于TLS认证进行安全传输
	RequireTransportSecurity() bool
}
package main

import (
	"context"
	"fmt"
	hello "sample-app/grpc/proto"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

const (
	Addr = "127.0.0.1:8080"
	//是否使用TLS
	OpenTLS = true
)

// 自定义认证
type Token struct {
	Appid  string
	Appkey string
}

// 实现自定义认证方法
func (t Token) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"appid":  t.Appid,
		"appkey": t.Appkey,
	}, nil
}

// 实现自定义认证是否开启TLS
func (t Token) RequireTransportSecurity() bool {
	return OpenTLS
}

func main() {
	//TLS连接
	var opt []grpc.DialOption
	if OpenTLS {
		cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
		if err != nil {
			fmt.Println("credentials fail ", err)
			return
		}
		opt = append(opt, grpc.WithTransportCredentials(cert))
	} else {
		opt = append(opt, grpc.WithInsecure())
	}
	//使用自定义认证
	tk := Token{
		Appid:  "101010",
		Appkey: "i am a key",
	}
	opt = append(opt, grpc.WithPerRPCCredentials(&tk))

	conn, err := grpc.Dial(Addr, opt...)
	if err != nil {
		fmt.Println("Dial fail", err)
		return
	}
	defer conn.Close()
	//初始化服务器
	c := hello.NewHelloClient(conn)

	req := new(hello.HelloRequest)
	req.Name = "gRPC"

	resp, err := c.SayHello(context.Background(), req)
	if err != nil {
		fmt.Println("say hello fail", err)
		return
	}

	fmt.Println(resp.Message)
}

        定义一个结构Token,包含Token所需属性字段。实现PerRPCCredentials接口的两个方法。每次调用token信息会通过请求metadata传输到服务端。

        下面查看服务端如何获取metadata中信息。 

  • 服务端代码

        使用metadata.FromIncomingContext:从上下文中获取元数据。

package main

import (
	"context"
	"fmt"
	"net"
	hello "sample-app/grpc/proto"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials" //引入gRPC认证包
	"google.golang.org/grpc/metadata"
)

var Addr = "127.0.0.1:8080"

type helloService struct{}

var HelloService = helloService{}

func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
	//认证
	md, ok := metadata.FromIncomingContext(c)
	if !ok {
		return nil, grpc.Errorf(codes.Unauthenticated, "无Token认证信息")
	}

	var appid string
	var appkey string
	vals := md.Get("appid")
	if len(vals) != 0 {
		appid = vals[0]
	}

	val_key := md.Get("appkey")
	if len(val_key) != 0 {
		appkey = val_key[0]
	}
	//认证token
	if appid != "101010" || appkey != "i am a key" {
		return nil, grpc.Errorf(codes.Unauthenticated, "Token认证信息错误: Appid:%s, Appkey:%s", appid, appkey)
	}
	//fmt.Println("authenticated succ " + appid + "-" + appkey)

	resp := new(hello.HelloResponse)
	resp.Message = fmt.Sprintf("Hello %s \nToken info: Appid=%s, AppKey=%s", req.Name, appid, appkey)

	return resp, nil
}

func main() {
	ls, err := net.Listen("tcp", Addr)
	if err != nil {
		fmt.Println(err)
		return
	}
	//TLS认证
	cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
	if err != nil {
		fmt.Println(err)
		return
	}
	//新建一个grpc服务器,并开启TLS认证
	server := grpc.NewServer(grpc.Creds(cert))
	//注册HelloService
	hello.RegisterHelloServer(server, HelloService)
	fmt.Println("Listen on " + Addr + " with TLS")

	server.Serve(ls)
}
  • 演示 

        成功: 

         失败:

        补充:

        google.golang.org/grpc/credentials/oauth 包已实现了用于 Google API oauth jwt 验证
的方法,使用方法可以参考 官方文档 。在实际应用中,我们可以根据自己的业务需求实现合适的验证方 式。

三. 拦截器

        grpc服务端和客户端都提供了interceptor功能,可以在请求前后处理一些通用逻辑,比如:记录日志,tracing,身份认证等。

        在上面自定义Token认证的示例中,认证信息是由每个服务中的方法处理并认证,如果有大量的接口,这种姿势就不优雅了,每一个接口实现都需要先处理认证信息。这个时候interceptor就可以用来解决这个问题,在请求被转到具体接口之前处理认证信息,一处认证到处无忧。在客户端,我们增加一个请求日志,记录请求相关的参数和耗时等。

        3.1 grpc的interceptor

        gRPC服务端和客户端均可实现各自的拦截器,根据rpc的两种请求方式可分为两种:

  • Unary Interceptor(一元拦截器)
  • Stream Interceptor(流式拦截器)

        rpc的两种请求方式:

  • 一元请求(Unary):客户端发送一个请求给服务端,然后立即得到一个响应,但是服务器端并不能主动向客户端发送消息。
  • 流式请求:当数据量比较大,可能会对响应时间产生影响,可以使用流式来分批次来传输数据,不需要一次性就处理完数据。
    • 服务端流式请求(server streaming):客户端发送一个请求给服务端,然后服务端就会向客户端逐渐返回一系列的消息。
    • 客户端流式请求(client streaming):客户端就像一个流,连续发送多个消息给服务端,然后等待服务端的一个响应。
    • 双向流式请求(Bidirectional streaming):客户端和服务端可以连续互发消息。

对应Protobuf文件service定义:

        下面是一个例子:

        一元请求:request和response不需要stream。

        服务端流式请求:request不需要stream,response需要stream

        客户端流式请求:request需要stream,response不需要stream。

        双向流式请求:request和response都需要stream。

syntax = "proto3";

option go_package = "github.com/lixd/grpc-go-example/features/proto/echo";

package echo;


// Echo 服务,包含了4种类型API
service Echo {
  // UnaryAPI
  rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
  // SServerStreaming
  rpc ServerStreamingEcho(EchoRequest) returns (stream EchoResponse) {}
  // ClientStreamingE
  rpc ClientStreamingEcho(stream EchoRequest) returns (EchoResponse) {}
  // BidirectionalStreaming
  rpc BidirectionalStreamingEcho(stream EchoRequest) returns (stream EchoResponse) {}
}

message EchoRequest {
  string message = 1;
}

message EchoResponse {
  string message = 1;
}

        3.2 一元拦截器

        对于一元服务器拦截器,只需要定义UnaryServerInterceptor方法即可。其中,handler(ctx, req)即调用rpc方法。

type UnaryServerInterceptor func(
    ctx context.Context,   //rpc上下文
    req interface{},       //rpc请求参数
    info *UnaryServerInfo, //rpc方法信息
    handler UnaryHandler,  //rpc方法本身,真正执行逻辑
)(interface{}, error){
    return handler(ctx, req)
}    

        对于一元客户端拦截器,一样需要定义一个方法UnaryClientInterceptor,其中invoker()才真正请求rpc。

type UnaryClientInterceptor func(
    ctx context.Context,  //rpc上下文
    method string,        //调用方法名
    req,                  //rpc请求参数
    reply interface{},    //rpc响应结果
    cc *ClientConn,       //连接句柄
    invoker UnaryInvoker, //调用rpc本身方法   
    opts ...CallOption    //调用配置
) error {
    return invoker(ctx, method, req, reply, cc, opts...)
}

        一元拦截器的实现,根据handle和invoker的前后:调用前预处理,调用rpc方法,调用后处理。

  • 服务端代码
package main

import (
	"context"
	"fmt"
	"net"
	hello "sample-app/grpc/proto"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials" //引入gRPC认证包
	"google.golang.org/grpc/metadata"
)

var Addr = "127.0.0.1:8080"

type helloService struct{}

var HelloService = helloService{}

func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
	//fmt.Println("authenticated succ " + appid + "-" + appkey)
	resp := new(hello.HelloResponse)
	resp.Message = fmt.Sprintf("Hello %s", req.Name)

	return resp, nil
}

func main() {
	ls, err := net.Listen("tcp", Addr)
	if err != nil {
		fmt.Println(err)
		return
	}
	//TLS认证
	cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
	if err != nil {
		fmt.Println(err)
		return
	}

	var opts []grpc.ServerOption
	//开启TLS认证
	opts = append(opts, grpc.Creds(cert))
	//注册拦截器
	opts = append(opts, grpc.UnaryInterceptor(UnaryServerInterceptor))

	//server := grpc.NewServer(grpc.Creds(cert))
	server := grpc.NewServer(opts...)
	//注册HelloService
	hello.RegisterHelloServer(server, HelloService)
	fmt.Println("Listen on " + Addr + " with TLS " + "Interceptor")

	server.Serve(ls)
}

func auth(ctx context.Context) error {
	md, ok := metadata.FromIncomingContext(ctx)
	if !ok {
		return grpc.Errorf(codes.Unauthenticated, "无Token认证信息")
	}

	var appid string
	var appkey string
	vals := md.Get("appid")
	if len(vals) != 0 {
		appid = vals[0]
	}

	val_key := md.Get("appkey")
	if len(val_key) != 0 {
		appkey = val_key[0]
	}
	//认证token
	if appid != "101010" || appkey != "i am a key" {
		return grpc.Errorf(codes.Unauthenticated, "Token认证信息错误: Appid:%s, Appkey:%s", appid, appkey)
	}
	return nil
}

func UnaryServerInterceptor(ctx context.Context, req interface{}, into *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	err = auth(ctx)
	if err != nil {
		return nil, err
	}

	return handler(ctx, req)
}
  • 客户端代码
package main

import (
	"context"
	"fmt"
	"log"
	hello "sample-app/grpc/proto"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

const (
	Addr = "127.0.0.1:8080"
	//是否使用TLS
	OpenTLS = true
)

// 自定义认证
type Token struct {
	Appid  string
	Appkey string
}

// 实现自定义认证方法
func (t Token) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
	return map[string]string{
		"appid":  t.Appid,
		"appkey": t.Appkey,
	}, nil
}

// 实现自定义认证是否开启TLS
func (t Token) RequireTransportSecurity() bool {
	return OpenTLS
}

func main() {
	//TLS连接
	var opt []grpc.DialOption
	if OpenTLS {
		cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
		if err != nil {
			fmt.Println("credentials fail ", err)
			return
		}
		opt = append(opt, grpc.WithTransportCredentials(cert))
	} else {
		opt = append(opt, grpc.WithInsecure())
	}
	//使用自定义认证
	tk := Token{
		Appid:  "101010",
		Appkey: "i am not a key",
	}
	opt = append(opt, grpc.WithPerRPCCredentials(&tk))
	//加入拦截器
	opt = append(opt, grpc.WithUnaryInterceptor(UnaryClientInterceptor))


	conn, err := grpc.Dial(Addr, opt...)
	if err != nil {
		fmt.Println("Dial fail", err)
		return
	}
	defer conn.Close()
	//初始化服务器
	c := hello.NewHelloClient(conn)

	req := new(hello.HelloRequest)
	req.Name = "gRPC"

	resp, err := c.SayHello(context.Background(), req)
	if err != nil {
		fmt.Println("say hello fail", err)
		return
	}

	fmt.Println(resp.Message)
}

func UnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	stat_time := time.Now()
	err := invoker(ctx, method, req, reply, cc, opts...)
	log.Printf("method=%s, req=%v, reply=%v, duration=%s, error=%v\n", method, req, reply, time.Since(stat_time), err)
	return err
}
  • 运行结果

认证成功:

认证失败:

         3.3 流式拦截器

        流式拦截器的实现与一元拦截器一致,实现提供的方法即可,方法参数含义如下:

type StreamServerInterceptor func(
    srv interface{},        //rpc请求参数
    ss ServerStream,        //服务端stream对象
    info *StreamServerInfo, //rpc方法信息
    handler StreamHandler   //rpc方法本身,真正执行逻辑
) (err error) {
    return handler(src, ss)
}
type StreamClientInterceptor func(
    ctx context.Context,//rpc上下文
    desc *StreamDesc,   //流信息
    cc *ClientConn,     //连接句柄
    method string,      //调用方法名
    streamer Streamer,  //调用rpc方法本身
    opts ...CallOption  //调用配置	    
) (ClientStream, error) {
    //流操作预处理
    clientStream, err := streamer(ctx, desc, cc, method, opts...)
    //根据某些条件,通过clientStream拦截流操作
    return clientStream, err
}

        与其他拦截器不同,客户端流式拦截器的实现分为两个部分,流操作预处理和流操作拦截,其不能在事后进行rpc方法调用,只能通过ClientStream对象进行操作拦截。即需要进行rpc方法调用后,才能进行操作拦截。例如根据特定的metadata,调用ClientStream.CloseSend()终止流操作。

        下面实现了一个打印请求和响应日志的拦截器,只是函数签名变成了grpc.StreamServerInterceptor 和 grpc.StreamClientInterceptor 。

  • proto文件
syntax="proto3";
package hello;
option go_package="hello";

service Hello
{
    rpc SayHello(HelloRequest)returns(HelloResponse){}; 
    //双向流式请求
    rpc Streaming(stream StreamRequest) returns (s StreamResponse) {}
}

message HelloRequest
{
    string name = 1;
}

message HelloResponse
{
    string message = 1;
}

message StreamRequest
{
    string input = 1;
}

message StreamResponse
{
    string output = 1;
}

使用下面命令生成go语言文件:

protoc --go_out=plugins=grpc:.\grpc\proto -I=.\grpc\proto .\grpc\proto\hello.proto
  • 服务端代码

        服务端定义结构体,实现了proto文件HelloServer接口,即实现了SayHello和Streaming方法。

        实现了拦截器方法StreamServerInterceptor(函数名随便定义)。

        服务端的实现其实和一元拦截器的使用方式没什么太大区别,但是流的特性在于请求和响应不是一次性完成的,而是多次发送和接收数据。所以我们可能需要在发送和接收数据的过程中处理一些公共逻辑。这才是拦截器特别的地方。

        我们注意到handler方法调用的第二个参数grpc.ServerStream接口类型,这个类型包含了SendMsg和RecvMsg方法,所以我们可以使用一个自定义类型实现这个接口,并重写这两个方法,我们就可以实现打印日志的目的。

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"net"
	hello "sample-app/grpc/proto"
	"strconv"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials" //引入gRPC认证包
	"google.golang.org/grpc/metadata"
)

var Addr = "127.0.0.1:8080"

type helloService struct{}

var HelloService = helloService{}

func (h helloService) SayHello(c context.Context, req *hello.HelloRequest) (*hello.HelloResponse, error) {
	resp := new(hello.HelloResponse)
	resp.Message = fmt.Sprintf("Hello %s", req.Name)

	return resp, nil
}

func (h helloService) Streaming(stream hello.Hello_StreamingServer) error {
	for n := 0; ; {
		res, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}

		v, _ := strconv.Atoi(res.Input)
		log.Printf("[server streaming] recv : %d\n", v)
		n += v
		log.Printf("[server streaming] send : %d\n", n)
		stream.Send(&hello.StreamResponse{Output: strconv.Itoa(n)})
	}
}

func main() {
	ls, err := net.Listen("tcp", Addr)
	if err != nil {
		fmt.Println(err)
		return
	}
	//TLS认证
	cert, err := credentials.NewServerTLSFromFile("..\\..\\key\\server.pem", "..\\..\\key\\server_private.key")
	if err != nil {
		fmt.Println(err)
		return
	}

	var opts []grpc.ServerOption
	//开启TLS认证
	opts = append(opts, grpc.Creds(cert))
	//注册拦截器
	opts = append(opts, grpc.StreamInterceptor(StreamServerInterceptor))

	//server := grpc.NewServer(grpc.Creds(cert))
	server := grpc.NewServer(opts...)
	//注册HelloService
	hello.RegisterHelloServer(server, HelloService)
	fmt.Println("Listen on " + Addr + " with TLS " + "with StreamInterceptor")

	server.Serve(ls)
}

// 里面定义grpc.ServerStream接口类型的属性
// 是为了重写SendMsg和RecvMsg方法
type serverStream struct {
	//需要使用匿名字段
	//内嵌方法重写
	grpc.ServerStream
}

// 重写ServerStream的SendMsg方法
func (s serverStream) SendMsg(m interface{}) error {
	//发送数据前处理
	log.Printf("[server SendMsg]: send : %T\n", m)
	return s.ServerStream.SendMsg(m)
}

// 重写ServerStream的RecvMsg方法
func (s serverStream) RecvMsg(m interface{}) error {
	//接收数据前处理
	log.Printf("[server Recv Stream]: recv : %T\n", m)
	return s.ServerStream.RecvMsg(m)
}

// 流式拦截器
func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	//前置逻辑
	log.Printf("[StreamServerInterceptor] accept request : %s\n", info.FullMethod)
	arg := serverStream{ss}
	err := handler(srv, arg)
	return err
}
  • 客户端代码

        客户端代码和服务端代码类似,只是对应数据处理的接口变成了grpc.ClientStream。

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	hello "sample-app/grpc/proto"
	"strconv"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
)

const (
	Addr = "127.0.0.1:8080"
	//是否使用TLS
	OpenTLS = true
)

func main() {
	//TLS连接
	var opt []grpc.DialOption
	if OpenTLS {
		cert, err := credentials.NewClientTLSFromFile("..\\..\\key\\server.pem", "www.wy.com")
		if err != nil {
			fmt.Println("credentials fail ", err)
			return
		}
		opt = append(opt, grpc.WithTransportCredentials(cert))
	} else {
		opt = append(opt, grpc.WithInsecure())
	}
	//加入流式拦截器
	opt = append(opt, grpc.WithStreamInterceptor(StreamClientInterceptor))

	conn, err := grpc.Dial(Addr, opt...)
	if err != nil {
		fmt.Println("Dial fail", err)
		return
	}
	defer conn.Close()
	//初始化服务器
	c := hello.NewHelloClient(conn)
	//单项请求
	req := new(hello.HelloRequest)
	req.Name = "gRPC"
	resp, err := c.SayHello(context.Background(), req)
	if err != nil {
		fmt.Println("say hello fail", err)
		return
	}
	fmt.Println(resp.Message)
	//流式发送数据
	Streaming(c)
}

type clientStream struct {
	grpc.ClientStream
}

func (c clientStream) SendMsg(m interface{}) error {
	log.Printf("[client SendMsg] send : %T\n", m)
	return c.ClientStream.SendMsg(m)
}

func (c clientStream) RecvMsg(m interface{}) error {
	log.Printf("[client RecvMsg] recv : %T\n", m)
	return c.ClientStream.RecvMsg(m)
}

// 拦截器方法
func StreamClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	//前置逻辑
	log.Printf("[StreamClientInterceptor] send req : %s\n", method)
	//请求
	s, err := streamer(ctx, desc, cc, method, opts...)
	if err != nil {
		return nil, err
	}
	return clientStream{s}, nil
}

// 流式发送
func Streaming(pb hello.HelloClient) error {
	stream, err := pb.Streaming(context.Background())
	if err != nil {
		return err
	}

	for n := 0; n < 5; n++ {
		log.Printf("[client Streaming] send : %d\n", n)
		err = stream.Send(&hello.StreamRequest{Input: strconv.Itoa(n)})
		if err != nil {
			return err
		}

		resp, err := stream.Recv()
		//发送完毕,退出
		if err == io.EOF {
			return nil
		}

		if err != nil {
			return err
		}

		log.Printf("[client Streaming] recv : %s\n", resp.Output)
	}
	//停止发送
	stream.CloseSend()

	return nil
}
  • 演示

        注意点:

  • server和client的recv和send互成一对,最后一次输出recv是结束消息,err==io.EOF
  • 在自定义的RecvMsg方法中,前置位置只能读取消息的类型,而无法读取消息的实际数据,因为这个时候收到的消息还没有解析,如果需要接收消息的实际数据,需要把自定义的处理逻辑放后面。
func (s serverStream) RecvMsg(m interface{}) error {
	err := s.ServerStream.RecvMsg(m)
	log.Printf("[server Recv Stream]: recv : %T\n", m)
	return err
}
        项目推荐:  go-grpc-middleware
        这个项目对interceptor 进行了封装,支持多个拦截器的链式组装,对于需要多种处理的地方使用起 来会更方便些。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1825572.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

智能化状态管理:自动状态流转处理模块

目录 基本背景介绍 具体实现 基本数据准备 基本数据表 状态转换常量 状态转换注解 任务处理模版 各任务实现逻辑 开启比对任务进行处理 降噪字段处理任务处理 开启业务数据比对处理 业务数据比对处理 开始核对数据生成最终报告处理 核对数据生成最终报告处理 状…

[渗透测试学习] SolarLab-HackTheBox

SolarLab-HackTheBox 信息搜集 nmap扫描端口 nmap -sV -v 10.10.11.16扫描结果如下 PORT STATE SERVICE VERSION 80/tcp open http nginx 1.24.0 135/tcp open msrpc Microsoft Windows RPC 139/tcp open netbios-ssn Microsoft Windows n…

观光车司机N2精选考试题库(附答案)

一、判断题 1、在使用手电钻、电砂轮等手持电动工具时,为保证安全,应该装设漏电保护器。(√) 2、碳弧气刨的方法设备工具简单.操作使用安全。(√) 3、事故调查组有权向有关单位和个人了解与事故有关的情况。()(√) 4、发射药(动力药)是能产生发射和推进效应的烟火药,有粒状、粉…

SAP BOM项目类别N非库存项目简介

在BOM的项目类别中用的最多的就是L类型的库存管理,还有T类型的文本类型,但是在实际业务中也会存在物料不做库存管理,但是物料需要进行成本的管控,进入对应的工单成本中,比如在电子行业中需要烧录的正版软件,或者是电脑制造行业中需要预装的正版的Windows系统,购买的软件…

【SpringBoot】SpringBoot:简化数据库操作与API开发

文章目录 引言SpringBoot概述数据库操作简化传统数据库操作的挑战使用Spring Data JPA示例&#xff1a;定义Repository接口实现服务层 使用MyBatis示例&#xff1a;配置MyBatis定义Mapper接口 API开发简化RESTful API概述创建RESTful API示例&#xff1a;定义控制器 高级特性与…

【二】【动态规划NEW】91. 解码方法,62. 不同路径,63. 不同路径 II

91. 解码方法 一条包含字母 A-Z 的消息通过以下映射进行了 编码 &#xff1a; ‘A’ -> “1” ‘B’ -> “2” … ‘Z’ -> “26” 要 解码 已编码的消息&#xff0c;所有数字必须基于上述映射的方法&#xff0c;反向映射回字母&#xff08;可能有多种方法&#xff…

小知识点快速总结:Batch Normalization Layer(BN层)的作用

本系列文章只做简要总结&#xff0c;不详细说明原理和公式。 目录 1. 参考文章2. 主要作用3. 具体分析3.1 正则化&#xff0c;降低过拟合3.2 提高模型收敛速度&#xff0c;加速训练3.3 减少梯度爆炸或者梯度消失的情况 4. 补充4.1 BN层做的是标准化不是归一化4.2 BN层的公式4.…

洗地机提升渗透率,降价不是唯一解

作者 | 辰纹 来源 | 洞见新研社 添可2019年开创洗地机赛道时&#xff0c;看好的人不多&#xff0c;在扫地机器人正被风口吹在天上翻滚的那个年代&#xff0c;洗地机被扣上了“智商税”的标签。 洗地机到底有没有用&#xff0c;市场用脚投票。 奥维云网数据显示&#xff0c…

PS通过GTX实现SFP网络通信2

PS 程序设计 LWIP 库修改 修改原因 SDK 2017.4 自带的 LWIP 1.4.1 库的版本为 2.0 &#xff0c;直接使用该库将无法通过 SFP 实现网络通信。 因此需要进行修改。 修改的原因有 2 个&#xff0c;第 1 个原因是由于 2017.4 版本产生的新 bug 。在 2015.4 版本…

Java数据结构之ArrayList(如果想知道Java中有关ArrayList的知识点,那么只看这一篇就足够了!)

前言&#xff1a;ArrayList是Java中最常用的动态数组实现之一&#xff0c;它提供了便捷的操作接口和灵活的扩展能力&#xff0c;使得在处理动态数据集合时非常方便。本文将深入探讨Java中ArrayList的实现原理、常用操作以及一些使用场景。 ✨✨✨这里是秋刀鱼不做梦的BLOG ✨✨…

Kotlin 语言基础学习

什么是Kotlin ? Kotiln翻译为中文是:靠他灵。它是由JetBrains 这家公司开发的,JetBrains 是一家编译器软件起家的,例如常用的WebStorm、IntelliJ IDEA等软件。 Kotlin官网 JetBrains 官网 Kotlin 语言目前的现状: 目前Android 已将Kotlin 作为官方开发语言。 Spring 框…

Java—读取properties配置文件

编写配置文件 usernameroot password123456 urljdbc:mysql://localhost:3306/myDatabase driverClassNamecom.mysql.cj.jdbc.Driver 编写测试类 import java.io.FileInputStream; import java.io.IOException; import java.util.Enumeration; import java.util.Properties;/*…

vagrant putty错误的解决

使用Vagrant projects for Oracle products and other examples 新创建的虚机&#xff0c;例如vagrant-projects/OracleLinux/8。 用vagrant ssh可以登录&#xff1a; $ vagrant ssh > vagrant: Getting Proxy Configuration from Host...Welcome to Oracle Linux Server …

专业学习|博弈论-博弈论概述

&#xff08;一&#xff09;认识博弈论&#xff1a;解析复杂决策与策略 &#xff08;1&#xff09;认识博弈 博弈论广泛应用于分析个体间因利益冲突而产生的决策问题。通过构建不同模型来探讨如经贸关系、军事威胁等问题&#xff0c;旨在寻找均衡解并提供新知&#xff0c;相较…

C语言概述与历史

引言 C语言是一门历史悠久且影响深远的编程语言。它不仅为后继的许多编程语言奠定了基础&#xff0c;同时因其高效性和灵活性在系统编程和嵌入式开发领域得到了广泛应用。本篇文章将全面介绍C语言的起源与发展、设计目标与理念&#xff0c;以及C语言的标准演化历程&#xff0c;…

字符数组基础知识及题目

死识。。。 字符该如何存储呢&#xff1f;这一点我们在以前就接触过了。用char来存储。 如何输入一个单词呢&#xff1f; char a[10002]; scanf("%s",a); 就不用地址符了。 如何输入句子呢&#xff1f; char a[100002]; gets(a); gets是读入句子的&#xff0c…

利用智能交流控制设计方法实现更好的家电安全

从机电到数字控制的转变首先是通过现成的电子设备完成的——系统架构是围绕 MCU、分立晶体管和高压双向可控硅构建的。 家用电器的这场小型革命部分是由于减少能源和水的浪费以及提高易用性的需求日益增长而推动的。 随着市场及其标准的化&#xff0c;性能和成本效率一直是家…

用MATLAB绘制地球围绕太远运动而月球围绕地球运动

绘制 MATLAB代码: clc;close all;clear all;warning off;%清除变量 rand(seed, 100); randn(seed, 100); format long g;% 初始化参数 num_frames 1000; % 动画帧数 G200; dt 0.01; % 时间步长% 设置太阳、地球和月球的初始位置和半径 sun_position [0, 0]; earth_radius …

Docker MySQL Shutting down mysqld

6月初至6月15日发现MySQL无故停机多次&#xff0c;导致系统无法使用。接下来各种日志查看&#xff0c;排查原因。先附上一份Docker种MySQL的日志的截图。 一、根据Docker的日志初步估计是数据库内存飙升&#xff0c;从而被系统杀掉进程 查询Linux系统日志&#xff0c;在宿主机…

海康威视-按时间下载录像文件

目录 1、流程图 1.1、录像查找 1.2、录下下载 2、按时间下载 2.1、开启下载 2.2、后台下载 2.2.1、方式一 2.2.2、方式二 3、问题整理 3.1、错误码34 3.2、错误码10 3.3、下载的文件大小为0kb 4、错误码 由于没有在官方文档中找到通过ISAPI协议透传实现按时间下…