golang工程——grpc一元请求与流请求

news2024/11/19 4:30:26

唉,难顶,像块砖一样到处搬。又跑去golang技术栈的项目俩月
本篇博客多有参考,记录一下近期学习

rpc、grpc原理

grpc 是一项进程间通信技术,可以用来连接、调用、操作和调试构建分布式程序,调用过程如同调用函数一样,整个过程操作起来很简单,就像调用本地方法一样。与许多rpc 系统一样,grpc 是定义服务的思想,服务器需要实现此接口并运行grpc 来处理客户端调用。

在这里插入图片描述

grpc 默认使用 Protocol Buffers 作为 IDL 定义服务接口。 Protocol Buffers 是中立的、与平台无关、实现结构化数据序列化的可扩展机制,虽然也可以使用json,但是 Protocol Buffers 是二进制编解码,所以编解码性能比Json好。服务接口定义在 proto 文件中指定,也就是在扩展名为 “. proto” 的普通文本文件中。我们要按照普通的 Protocol Buffers 范式来定义grpc服务,并将方法参数和返回类型指定为 Protocol buffers 的格式,也因为服务定义是Protocol Buffers 规范的扩展,所以可以借助特殊的 gRPC 插件来根据 proto 文件生成代码。

grpc常用通信模式

一元rpc模式

类似http 协议一问一答

服务端数据流模式

这种流模式可以理解为,服务器向客户端源源不断的发送数据流,应用场景很多,比如游戏中定时任务或者其他事件造成玩家数据变化需要将数据推送给客户端。

一元rpc模式下,grpc服务器端和 grpc 客户端在通信时始终只有 一个请求和 一个响应。在服务器端流rpc 模式下,服务端接收到一个请求后发送多个响应组成的序列,在服务器发送所有响应消息完毕后,发送trailer 元数据给客户端,标识流结束。

客户端接收流数据需要循环接收,直到出现io.EOF,代表服务器发送流数据已经完毕,后面会写grpc实现这个功能的原理

客户端数据流模式

客户端可以将数据源源不断发送给服务器,跟服务端流相反,客户端会发送多条响应,服务器发送一条响应,但是服务器不必等到发送完所有消息才响应。可以发送一条或几条消息就开始响应。

下面来看一个例子:物联网硬件将本地的缓存信息上传到服务器

  • 服务器需要以流的方式去接收数据,当客户端关闭流的时候会返回io.EOF,这时候我们可以做响应。
双向数据流模式

双方都可以将数据源源不断发给对方。简单来说就是上面客户端流和服务器流的一个整合。

下面来看一个例子:玩家连续进行了多次战斗请求,服务器将操作结果响应给玩家

  • 服务器读到客户端流关闭时返回nil,标记服务器流结束。

  • 与之前客户端流模式不一样,客户端流模式是直接sendAndClose()。下面这样读到一半数据返回nil,也标识服务器流数据结束,只是可能会丢数据

  • 启动一个协程异步接收数据,官方有说明,一个goroutine 读,一个goroutine 写是不会有并发问题的。

  • stream.CloseSend()代表关闭客户端流,标记客户端流已经结束

  • 客户端需要通过定义rpc方法c.DataUpload(ctx)打开流,然后通过send 发送请求,发送完后调用CloseAndRecv关闭流等待消息响应,并处理错误,这里为了demo 演示 ,err就直接panic,实际情况可能更加复杂,对错误处理也很多种方式。

四种通信模式例子

例子

echo.proto

syntax = "proto3";
option go_package = "grpc/echo";
import  "google/protobuf/timestamp.proto";
package grpc.echo;

message EchoRequest {
  string message = 1;
  bytes bytes = 2;
  int32 length = 3;
  google.protobuf.Timestamp time = 4;

}
message EchoResponse {
  string message = 1;
  bytes bytes = 2;
  int32 length = 3;
  google.protobuf.Timestamp time = 4;
}

service Echo {
  //一元请求
  rpc UnaryEcho(EchoRequest) returns(EchoResponse) {}
  //服务端流
  rpc ServerStreamingEcho(EchoRequest) returns(stream EchoResponse){}
  //客户端流
  rpc ClientStreamingEcho(stream EchoRequest) returns( EchoResponse){}
  //双向流
  rpc BidirectionalStreamingEcho(stream EchoRequest) returns(stream EchoResponse){}
}

服务端
server.go

package server

import (
    "context"
    "fmt"
    "google.golang.org/protobuf/types/known/timestamppb"
    "grpc/echo"
    "io"
    "log"
    "os"
    "strconv"
    "sync"
    "time"
)


type EchoServer struct {
    echo.UnimplementedEchoServer
}
func (EchoServer) UnaryEcho(ctx context.Context, in *echo.EchoRequest) (*echo.EchoResponse, error) {
    fmt.Printf("server recv :%v\n", in.Message)

    return &echo.EchoResponse{
        Message: "server send message",
    }, nil
}
//服务端流,发送文件
func (EchoServer) ServerStreamingEcho(in *echo.EchoRequest, stream echo.Echo_ServerStreamingEchoServer) error {
    fmt.Printf("server recv :%v\n", in.Message)
    filePath := "echo-server-practice/files/server.jpg"

    file, err := os.Open(filePath)
    if err != nil {
        log.Fatal(err.Error())
    }
    defer file.Close()

    buf := make([]byte, 1024)

    for {
        n, err := file.Read(buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        stream.Send(&echo.EchoResponse{
            Message: "server sending files",
            Bytes: buf[:n],
            Time: timestamppb.New(time.Now()),
            Length: int32(n),
        })
    }
    //服务端流结束 return nil
    return nil
}
//客户端流,接收文件
func (EchoServer) ClientStreamingEcho(stream echo.Echo_ClientStreamingEchoServer) error {

    filePath := "echo-server-practice/files/" + strconv.FormatInt(time.Now().UnixMilli(), 10) + ".jpg"

    file, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)

    if err != nil {
        log.Fatal(err)
        return err
    }
    defer file.Close()

    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }

        if err != nil {
            log.Println(err)
            return err
        }
        file.Write(res.Bytes[:res.Length])
        fmt.Printf("server recv :%v\n", res.Message)

    }
    err = stream.SendAndClose(
        &echo.EchoResponse{
            Message: "server send complete",
        })
    return err
}
//双向流,互发文件
func (EchoServer) BidirectionalStreamingEcho(stream echo.Echo_BidirectionalStreamingEchoServer) error {

    wg := &sync.WaitGroup{}

    wg.Add(1)
    go func () {
        defer wg.Done()
        //接收客户端文件

        filePath := "echo-server-practice/files/" + strconv.FormatInt(time.Now().UnixMilli(), 10) + ".jpg"

        file, err := os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)

        if err != nil {
            log.Fatal(err)
        }
        defer file.Close()

        for {
            res, err := stream.Recv()
            if err == io.EOF {
                break
            }

            if err != nil {
                log.Println(err)
                return
            }
            file.Write(res.Bytes[:res.Length])
            fmt.Printf("server recv :%v\n", res.Message)

        }
    }()

    wg.Add(1)

    go func() {
        defer wg.Done()
        // 发文件
        filePath := "echo-server-practice/files/server.jpg"

        file, err := os.Open(filePath)
        if err != nil {
            log.Fatal(err.Error())
        }
        defer file.Close()

        buf := make([]byte, 1024)

        for {
            n, err := file.Read(buf)
            if err == io.EOF {
                break
            }
            if err != nil {
                log.Fatal(err)
                return
            }

            stream.Send(&echo.EchoResponse{
                Message: "server sending files",
                Bytes: buf[:n],
                Time: timestamppb.New(time.Now()),
                Length: int32(n),
            })
        }
    } ()
    wg.Wait()
    // 服务端流关闭
    return nil
}

main.go

package main

import (
    "flag"
    "fmt"
    "google.golang.org/grpc"
    "grpc/echo"
    "grpc/echo-server-practice/server"
    "log"
    "net"
)

var (
    port = flag.Int("port", 50053, "port")
)

func main() {
    flag.Parse()

    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))

    if err != nil {
        log.Fatal(err)
    }

    // grpc server
    s := grpc.NewServer()
    echo.RegisterEchoServer(s, &server.EchoServer{})

    log.Printf("server listening at : %v\n", lis.Addr())

    if err := s.Serve(lis); err !=nil {
        log.Fatal(err)
    }

}

客户端
client.go

package client

import (
    "fmt"
    "golang.org/x/net/context"
    "google.golang.org/protobuf/types/known/timestamppb"
    "grpc/echo"
    "io"
    "log"
    "os"
    "strconv"
    "sync"
    "time"
)
//一元请求
func CallUnary(client echo.EchoClient) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)

    defer cancel()
    in := &echo.EchoRequest{
        Message: "client send message",
        Time: timestamppb.New(time.Now()),
    }

    res, err := client.UnaryEcho(ctx, in)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("client recv: %v", res.Message)

}
//服务端流
func CallServerStream(client echo.EchoClient) {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)

    defer cancel()
    in := &echo.EchoRequest{
        Message: "client send message",
        Time: timestamppb.New(time.Now()),
    }

    stream, err := client.ServerStreamingEcho(ctx, in)

    if err != nil {
        log.Fatal(err)
    }

    filename := "echo-client-practice/files/" + strconv.FormatInt(time.Now().UnixMilli(), 10) + ".jpg"

    file, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)

    if err != nil {
        log.Fatal(err)
        return
    }
    defer file.Close()

    for {
        res, err := stream.Recv()
        if err == io.EOF {
            break
        }

        if err != nil {
            log.Println(err)
            break
        }

        file.Write(res.Bytes[:res.Length])
        fmt.Printf("client recv %v\n", res.Message)
    }
    stream.CloseSend()
}
//客户端流
func CallClientSteam(client echo.EchoClient) {
    // 客户端流
    filePath := "echo-client-practice/files/client.jpg"

    file, err := os.Open(filePath)
    if err != nil {
        log.Fatal(err)
    }

    defer file.Close()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)

    defer cancel()

    stream, err := client.ClientStreamingEcho(ctx)

    if err != nil {
        log.Fatal(err)
    }

    buf := make([]byte, 1024)

    for {
        n, err := file.Read(buf)

        if err == io.EOF {break}
        if err != nil {
            log.Fatal(err)
        }

        stream.Send(&echo.EchoRequest{
            Message: "client sending file",
            Bytes: buf,
            Length: int32(n),
            Time: timestamppb.New(time.Now()),
        })

    }

    res, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("client recv : %v\n", res.Message)
}
// 双向流发文件
func CallBidirectional(client echo.EchoClient) {

    // 发送文件
    ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)

    defer cancel()

    stream, err := client.BidirectionalStreamingEcho(ctx)
    if err != nil {
        log.Fatal(err)
    }

    wg := &sync.WaitGroup{}
    wg.Add(1)

    go func() {
        defer wg.Done()
        filePath := "echo-client-practice/files/client.jpg"

        file, err := os.Open(filePath)
        if err != nil {
            log.Fatal(err)
        }

        defer file.Close()

        buf := make([]byte, 1024)

        for {
            n, err := file.Read(buf)

            if err == io.EOF {break}
            if err != nil {
                log.Fatal(err)
            }

            stream.Send(&echo.EchoRequest{
                Message: "client sending file",
                Bytes: buf,
                Length: int32(n),
                Time: timestamppb.New(time.Now()),
            })
        }
        stream.CloseSend()
    }()

    wg.Add(1)

    go func() {
        //接收文件
        defer wg.Done()
        filename := "echo-client-practice/files/" + strconv.FormatInt(time.Now().UnixMilli(), 10) + ".jpg"

        file, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)

        if err != nil {
            log.Fatal(err)
            return
        }
        defer file.Close()

        for {
            res, err := stream.Recv()
            if err == io.EOF {
                break
            }

            if err != nil {
                log.Println(err)
                break
            }

            file.Write(res.Bytes[:res.Length])
            fmt.Printf("client recv %v\n", res.Message)
        }
    }()
    wg.Wait()
}

main.go

package main

import (
    "flag"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
    "grpc/echo"
    "grpc/echo-client-practice/client"
    "log"
)

var (
    addr = flag.String("host", "localhost:50053", "")
)




func main() {
    flag.Parse()

    conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))

    if err != nil {
        log.Fatal(err)
    }

    defer conn.Close()
    c := echo.NewEchoClient(conn)

    //client.CallUnary(c)

   // client.CallServerStream(c)
  //client.CallClientSteam(c)
  client.CallBidirectional(c)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1046757.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【剑指Offer】77.按之字形顺序打印二叉树

题目 给定一个二叉树&#xff0c;返回该二叉树的之字形层序遍历&#xff0c;&#xff08;第一层从左向右&#xff0c;下一层从右向左&#xff0c;一直这样交替&#xff09; 数据范围&#xff1a;0≤n≤1500,树上每个节点的val满足 ∣val∣<1500 要求&#xff1a;空间复杂度…

深入浅出DAX:SELECTEDVALUE()

深入浅出DAX&#xff1a;SELECTEDVALUE() SELECTEDVALUE()&#xff0c;如果筛选 columnName 的上下文后仅剩下一个非重复值&#xff0c;则返回该值。否则返回alternateResult&#xff0c;语法如下&#xff1a; SELECTEDVALUE(<columnName>[, <alternateResult>] …

CorelDRAW Graphics Suite2023绿色中文版本下载教程

CorelDRAW Graphics Suite2023版是领先的一体化软件包&#xff0c;它包括多个程序&#xff0c;如CorelDRAW、Corel PHOTO-PAINT、Corel CAPTURE、Corel Font Manager、Duplexing Wizard等&#xff0c;可全部安装&#xff0c;也可根据实际需要选择进行安装&#xff0c;都是最新版…

敏捷发布列车初探2 ---- Agile Release Train

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 敏捷发布列车二、ART的特性2.敏捷团队为列车提供动力3.与共同节奏保持一致4.关键角色启用 三、ART的责任总结 敏捷发布列车 敏捷发布列车&#xff08;ART&#xff…

PHP生成二维码带图标代码实例

PHP生成二维码带图标代码实例&#xff08;PHP QR Code二维码生成类库&#xff09; public static function png($text, $outfilefalse, $levelQR_ECLEVEL_L, $size3, $margin4, $saveandprintfalse) { $enc QRencode::factory($level, $size, $margin); return $enc->…

IP地址证书:保护网站安全的重要措施

首先&#xff0c;我们需要了解什么是IP地址证书。IP地址证书是一种专门用于公网IP地址验证的数字证书&#xff0c;它主要用于解决IP地址明文传输的安全隐患和加密传输的问题。那么IP地址证书是如何保护网站的&#xff1f; 1&#xff0c;身份验证 IP地址证书用于对网站进行身份…

mysql workbench常用操作

1、No database selected Select the default DB to be used by double-clicking its name in the SCHEMAS list in the sidebar 方法一&#xff1a;双击你要使用的库 方法二&#xff1a;USE 数据库名 2、复制表名&#xff0c;字段名 3、保存链接

Swift data范围截取问题

文章目录 一、截取字符串的几种方法1. 截取前几位2. 截取后几位3. subData4. 下标截取 二、subData(in:) 报错 EXC_BREAKPOINT 一、截取字符串的几种方法 1. 截取前几位 mobileID.prefix(32)2. 截取后几位 mobileID.suffix(3)3. subData data.subdata(in: 0..<4)4. 下标…

港科夜闻|香港科技大学颁授荣誉大学院士予六位杰出人士

关注并星标 每周阅读港科夜闻 建立新视野 开启新思维 1、香港科技大学颁授荣誉大学院士予六位杰出人士。他们分别为陈晓峰先生、郑志刚博士、冯英伟先生、李慧诗女士、姚珏女士及叶毓强教授(按姓氏英文字母排序)。荣誉大学院士颁授典礼由香港科大校董会主席沈向洋教授和香港科大…

开发APP的费用是多少

开发一款APP的费用可以因多种因素而异&#xff0c;包括项目的规模、功能、复杂性、技术选择、地理位置等。北京是中国的大城市&#xff0c;APP开发的费用也会受到北京的物价水平和市场竞争的影响。以下是一些可以影响APP开发费用的因素&#xff0c;希望对大家有所帮助。北京木奇…

Codeforces Round 889 (Div. 2)A~C1题解

A. Dalton the Teacher 题目分析&#xff1a; 模拟&#xff0c;写出两个就能找出规律&#xff1a;坐在自己位子上的小孩会不高兴&#xff0c;所以统计下来cnt 发现cnt为奇数是(cnt1)/2次就能换成要求&#xff0c;偶数就是cnt/2 #include<bits/stdc.h> #pragma GCC op…

Silicon labs BG22蓝牙开发记录(一)

本篇内容适用初学者使用&#xff0c;主要介绍如何快速了解Siliconlabs EFR32BG22蓝牙芯片相关的软件资源和硬件资源&#xff0c;结合了我作为FAE 支持的一些个人看法&#xff0c;便于使用者快速入门&#xff0c;加速你们的产品实施。 本系列分为&#xff1a; Siliconlabs BG22…

世界前沿技术发展报告2023《世界信息技术发展报告》(一)世界信息技术及产业发展重要动向

&#xff08;一&#xff09;世界信息技术及产业发展重要动向 1. 概述2. 半导体技术水平持续进步&#xff0c;行业内部开始新一轮调整2.1 全球主要经济体加强半导体技术能力建设&#xff0c;推动厂商扩产2.2 先进制程技术持续发展&#xff0c;先进封装技术崭露头角2.3 消费电子半…

幸存者偏差

幸存者偏差&#xff08;英语&#xff1a;survivorship bias&#xff09;&#xff0c;另译为“生存者偏差”&#xff0c;是一种认知偏差。其逻辑谬误表现为过分关注于目前人或物“幸存了某些经历”然而往往忽略了不在视界内或无法幸存这些事件的人或物。 幸存者偏差最早来源于第…

品牌出海的关键步骤:市场定位与创新营销

现如今越来越多的企业开始将目光投向海外市场&#xff0c;寻求品牌出海的机会。品牌出海是指企业将自身品牌拓展至海外市场&#xff0c;通过进军国际市场来实现增长和发展。然而&#xff0c;面对不同的文化、市场环境和消费者需求&#xff0c;成功的品牌出海需要制定有效的策略…

apifox怎么测试API,你学会了吗?

背景 由其他的team做的项目&#xff0c;配置到一个新的环境下。由于项目需要与别的公司的项目接连&#xff0c;所以需要创建公开的API接口&#xff0c;利用apifox来进行测试&#xff08;postman&#xff0c;jmeter都可以&#xff09;。此次利用apifox来创建测试API接口的测试 …

小程序添加隐私保护指引弹框(包含配置隐私保护指引方法)

实现效果&#xff1a; 目录 前言一、 涉及到使用了隐私接口的小程序必须在「小程序管理后台」设置《小程序用户隐私保护指引》&#xff0c;微信一共提供了 4 个接口给开发者使用&#xff0c;分别是:二、 配置隐私协议弹框组件三、在页面中使用协议弹窗组件四、uniapp开发&#…

自己开发一个VSCode插件,快速生成Flex布局代码

插件CSS Flex 安装地址&#xff1a;VSCode插件安装 GitHub&#xff1a;https://github.com/xutao-o/css-flex-code &#x1f4a1;介绍 这是一个快捷生成CSS Flex布局代码的VS Code插件&#xff0c;类似于Google开发者控制台里的Flex布局工具&#xff0c;就是看谷歌的布局工具…

stm32之软件模拟IIC

在之前的文章中分析过在52上的IIC时序&#xff0c;也测试过stm32的自带IIC功能&#xff0c;这里大致写下如何模拟stm32上的IIC。实验硬件基于stm32f103c8t6 废话不多说&#xff0c;先直接上代码。 一、源码 头文件 #include "stdint.h" #include "gpio.h&quo…

编译u-boot的过程中遇到的问题

我使用的是uboot-2016.tar.bz2这个压缩包&#xff0c;将其解压之后进入这个文件夹&#xff0c;里面所包含的文件如下图所示。 这个文件夹下包含着三个Shell脚本文件&#xff0c;都是可执行的&#xff0c;打开其中一个其内容如下。 根据自己的开发板类型决定执行哪一个Shell脚…