grpc实现c++异步非阻塞stream

news2025/1/11 21:49:55

grpc实现c++异步非阻塞stream

参考文章

  1. Non-blocking single-threaded streaming C++ server
  2. gRPC C++ async api doc and sample code
  3. grpc异步stream server端demo

序言

  原来一直是用着同步阻塞的grpc stream。由于不想再创建新的线程来监听grpc stream的新消息了,所以就想着采用异步非阻塞的方式来实现grpc的stream。但是google了一下,发现关于grpc异步非阻塞的文章很少,大部分都是grpc官方的阻塞的demo,所以特此写一篇文章来介绍grpc异步非阻塞的逻辑和实现的方法。
  首先先给出一个结论,grpc stream采用的模型跟普通的异步非阻塞模型类似,所以逻辑相对复杂,如果没有较好的抽象思维以及一定的编码能力,最好还是采用同步阻塞的方式接收消息。
  如参考文章1里面所说的,grpc的异步并不是真正的异步,仍然会有后台线程:

  1. Note grpc library needs to use the thread donated via Next or AsyncNext to do some background work and thus only AsyncNext infrequently with very short deadline may not be a good idea.
    ( 注意grpc库需要使用通过Next或AsyncNext来做一些后台工作,因此使用一个到期期限很短的AsyncNext可能不是一个好主意 )
  2. Also, regarding the single-threadedness, the current grpc implementation creates internal threads to do background work such as timer handling and others. As a result, you will not have a truly single-threaded server even if you only use one thread for the server.
    ( 此外,关于单线程,当前的grpc的实现会创建了内部线程来完成后台工作,如计时器处理等。因此,即使只为服务器使用一个线程,你也不会有真正的单线程服务器。)

grpc的异步模型

  grpc的异步模型以completion queue和tag为核心,completion queue中存放的是触发的事件对应tag,通过completion queue的AsyncNext函数或者Next函数来取出tag,用户再根据tag去调用具体的事件的自定义处理函数,可以看出tag就是识别触发事件(如新链接接入,收到新消息等)的唯一标识符。tag在grpc中是由用户自己去绑定,它的类型是void*,即无类型指针,相对于某些框架来说,grpc的异步框架更自由一点。框架如下图所示:
在这里插入图片描述

  在grpc中使用双向stream流,一个链接就是一个stream,那么,每当我们在completion queue中取到一个tag,首先,我们要通过tag去判断是哪个stream触发了事件,然后我们要根据tag去判断,到底是什么事件触发了,常用的事件有新链接接入(connect),收到新信息(read),消息发送成功(send),链接断开(disconnect)。tag可以使用一个id,这个id对应一个stream,同时对应一个具体的事件(如收到新信息)。为了方便本人所写的例子中,使用的是 std::function<void(bool)>作为tag,就是绑定了对象的类成员函数指针,该类成员函数为void xxx(bool) ;由于该函数指针绑定了对象,而对象和stream是绑定的, 所以很容易从tag中知道是哪个stream,然后不同的事件绑定不同的类成员函数,通过类成员函数的不同就可以实现不同事件的处理。
在这里插入图片描述

  注意,在stream断开的时候,会触发读完成事件(read)和写完成事件(send),通过completion queue的AsyncNext函数或者Next函数返回的ok标识符就可以区分是正常的读写完成事件,还是因为stream断开触发的。

code下载地址

grpc async stream server
https://gitee.com/evilskyman/grpc-demo.git
grpc async stream client
https://gitee.com/evilskyman/grpc-async-client-stream-demo.git

server code

#include <algorithm>
#include <chrono>
#include <cmath>
#include <cstddef>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <queue>
#include <unordered_set>
#include <mutex>
#include <time.h>

#include <grpc/grpc.h>
#include <grpcpp/security/server_credentials.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>

#include "hello.pb.h"
#include "hello.grpc.pb.h"

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerWriter;
using grpc::Status;

using hello::HelloService;
using hello::HelloMsg;
using TagType = std::function<void(bool)>;//本人使用的tag是函数指针,函数指针使用的是绑定了类的类对象函数.

using namespace std;

class GrpcStreamServerInstance;
const static std::string server_address("0.0.0.0:8860");
static unordered_set<GrpcStreamServerInstance *> GrpcStreamServerInstanceSet;//存放着所有的已经连接上的stream对应的GrpcStreamServerInstance
class GrpcStreamServerInterface {
public:
    virtual void connected(bool ok) = 0; //新连接接入服务器
    virtual void readDone(bool ok) = 0;  //读到一帧新消息
    virtual void writeDone(bool ok) = 0; //写入完成一帧消息到客户端
    virtual void disconnect(bool ok) = 0; //服务器被动断开,不管谁发送的断开指令
};

typedef hello::HelloService::AsyncService ServeiceType;

class GrpcStreamServerInstance : public GrpcStreamServerInterface{
private:
   
public:
    GrpcStreamServerInstance(ServeiceType* service,grpc::ServerCompletionQueue* inputCq);
    virtual ~GrpcStreamServerInstance(){};
    void connected(bool ok) override;
    void readDone(bool ok) override;
    void writeDone(bool ok) override;
    void disconnect(bool ok) override;
    bool asycSendMsg(HelloMsg& msg);
private:
    ServeiceType* service;
    grpc::ServerCompletionQueue* cq;//Completion Queue
    ServerContext serverContext;//每一个stream都有自己的serverContext
    ServerAsyncReaderWriter<HelloMsg, HelloMsg> stream;
    //函数指针
    TagType connectedFunc;//新链接接入时触发
    TagType readDoneFunc;//读到新消息时触发
    TagType writeDoneFunc;//发送一帧消息成功后触发
    TagType disconnectFunc;//stream断开时触发
    //inputMsg用来接收消息,用stream.Read()中绑定
    HelloMsg inputMsg;    
    //onWrite用来区分stream有没有在发送消息,如果stream在发送,则只需要将消息写入writeBuffer,
    //否则要使用stream.Write()触发stream的发送;
    bool onWrite;
    //写缓存
    queue<HelloMsg> writeBuffer;
};

//GrpcStreamServerInstance不需要其它线程交互,故不需要互斥锁
GrpcStreamServerInstance::GrpcStreamServerInstance(ServeiceType* inputService,grpc::ServerCompletionQueue* inputCq):\
service(inputService),cq(inputCq),stream(&serverContext)
{
    //使用std::bind绑定对象和类对象函数得到一个函数指针
    connectedFunc = std::bind(&GrpcStreamServerInstance::connected, this, std::placeholders::_1);
    readDoneFunc = std::bind(&GrpcStreamServerInstance::readDone, this, std::placeholders::_1);
    writeDoneFunc = std::bind(&GrpcStreamServerInstance::writeDone, this, std::placeholders::_1);
    disconnectFunc = std::bind(&GrpcStreamServerInstance::disconnect, this, std::placeholders::_1);
    
    //设置serverContext,stream断开时,CompletionQueue会返回一个tag,这个tag就是输入的disconnectFunc这个函数指针
    serverContext.AsyncNotifyWhenDone(&disconnectFunc);
    //设置当新新链接connect的时候,cq返回connectedFunc作为tag
    service->Requesthello(&serverContext,&stream, cq,cq,&connectedFunc);

    onWrite = false;
}
void GrpcStreamServerInstance::connected(bool ok){
    //新建一个GrpcStreamServerInstance,一个client的grpc链接就对应一个GrpcStreamServerInstance实例
    stream.Read(&inputMsg,&readDoneFunc);
    //新的GrpcStreamServerInstance,会在构造函数中调用service->Requesthello()来绑定新链接
    new GrpcStreamServerInstance(service,cq);
    //新加入的stream对应的GrpcStreamServerInstance会被加入到GrpcStreamServerInstanceSet中,用于发送消息或者统计stream链接。
    GrpcStreamServerInstanceSet.insert(this);
    cout << "当前链接数量为"<< GrpcStreamServerInstanceSet.size()<< endl;  
}   

void GrpcStreamServerInstance::readDone(bool ok){
    try{
        if(!ok){
            //当ok == false,说明stream已经断开
            return;
        }
        cout << "收到消息,id为"<< inputMsg.id() <<",msg为" << inputMsg.msg() << endl;  
        stream.Read(&inputMsg,&readDoneFunc);
    }catch(const std::exception& e){     
        cout << e.what() << endl;   
    }
}
void GrpcStreamServerInstance::writeDone(bool ok){
    if(!ok){
        //当ok == false,说明stream已经断开
        return;
    }
    onWrite = false;
    if(writeBuffer.empty())return;
    //当grpc写完时,会触发writeDone,我们只需要从自定义的writeBuffer中取一帧继续写即可
    stream.Write(std::move(writeBuffer.front()),&writeDoneFunc);
    writeBuffer.pop();
    onWrite = true;
}

void GrpcStreamServerInstance::disconnect(bool ok){ 
    GrpcStreamServerInstanceSet.erase(this);
    cout << "链接断开,当前链接数量为"<< GrpcStreamServerInstanceSet.size()<< endl; 
    delete this;
}
bool GrpcStreamServerInstance::asycSendMsg(HelloMsg& msg){
    writeBuffer.push(msg);
    if(!onWrite){
        //没有任何写操作在执行
        onWrite = true;
        stream.Write(std::move(writeBuffer.front()),&writeDoneFunc);
        writeBuffer.pop();
    }
    return true;
}


class GrpcStreamServerThread {
public:
    GrpcStreamServerThread(): msgNum(0){};
    ~GrpcStreamServerThread();
    void run();
    bool sendMsg(const HelloMsg& msg);
    bool isTimeElapsed(struct timeval now,struct timeval last,int64_t ms);
private:
    queue<HelloMsg> msgQueue;
    std::atomic_int msgNum;
    timed_mutex writeLock;
};

bool GrpcStreamServerThread::isTimeElapsed(struct timeval now,struct timeval last,int64_t ms){
    int64_t sub = (now.tv_sec - last.tv_sec)*1000;
    sub  = sub + (now.tv_usec - last.tv_usec)/1000;
    return sub > ms ? true :false;
}

void GrpcStreamServerThread::run(){
    try{
        std::unique_ptr<grpc::ServerCompletionQueue> cq;
        ServeiceType service;
        ServerBuilder builder;

        // builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 10000);
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service);
        cq = builder.AddCompletionQueue();
        std::unique_ptr<Server> server_= builder.BuildAndStart();
        new GrpcStreamServerInstance(&service,cq.get());

        struct timeval lastTime = {0,0};//用来记录时间,保证下面定时消息的发送
        while (true) {
            void * tag;
            bool ok;
            //阻塞100毫秒,gpr_time_from_millis()函数的单位是毫秒,输入的是tag和ok的地址,cq->AsyncNext()会把结果写到地址对应的内存上
            grpc::ServerCompletionQueue::NextStatus status = cq->AsyncNext(&tag, &ok,\
            gpr_time_from_millis(100,GPR_TIMESPAN));

            if(status ==  grpc::ServerCompletionQueue::NextStatus::GOT_EVENT){
                //grpc服务器有新的事件,强制转换tag从void * 到 std::function<void(bool)> *,即void *(bool) 函数指针
                TagType* functionPointer = reinterpret_cast<TagType*>(tag);
                //通过函数指针functionPointer调用函数GrpcStreamServerInstance::xxx
                (*functionPointer)(ok);
            }
            //从msgQueue中取出新的消息,发送各个client
            if( msgNum != 0){
                std::unique_lock<timed_mutex>lock(writeLock,std::defer_lock);
                chrono::milliseconds  tryTime(500);
                if(lock.try_lock_for(tryTime)){
                    //获取到了锁
                    while(!msgQueue.empty()){
                        HelloMsg msg = msgQueue.front();
                        for(auto temp : GrpcStreamServerInstanceSet){
                            temp->asycSendMsg(msg);
                        }
                        msgQueue.pop();
                    }
                }else{
                    cout << "500ms内没抢到锁" << endl;  
                    continue;
                }
            }
            //定时发送消息
            struct timeval now;
            gettimeofday(&now, NULL);
            if(isTimeElapsed(now,lastTime,10*1000)){//判断是否已经过了10秒
                lastTime = now;
                HelloMsg msg;
                msg.set_id(2);
                msg.set_msg("hello world");
                for(auto temp : GrpcStreamServerInstanceSet){
                    temp->asycSendMsg(msg);
                }
            }
            //此处可以加入一些自定义的处理函数,比如记录时间等等,但是不应该阻塞太久。
        }
    }catch(const std::exception& e){        
        cout << e.what() << endl;   
    }
}
GrpcStreamServerThread::~GrpcStreamServerThread(){
    
}


bool GrpcStreamServerThread::sendMsg(const  HelloMsg& msg){
    {
        std::unique_lock<timed_mutex>lock(writeLock,std::defer_lock);
        chrono::milliseconds  tryTime(500);
        //main函数与GrpcStreamServerThread::run()处于不同的线程,需要加锁保证线程安全
        if(lock.try_lock_for(tryTime)){
            //获取到了锁
            msgQueue.push(msg);
            msgNum++;
        }else{
            cout << "500ms内没抢到锁" << endl;  
            return false;
        }
    }
    return true; 
}


int main(){    
    GrpcStreamServerThread* grpcStreamServerThread = new GrpcStreamServerThread();
    thread myThread(std::bind(&GrpcStreamServerThread::run, grpcStreamServerThread));
    HelloMsg msg;
    msg.set_id(1);

    while(1){
        std::string input;
        cin >> input;
        //输入exit,跳出循环结束程序。
        if(input == "exit")break;
        msg.set_msg(input);
        grpcStreamServerThread->sendMsg(msg);
    }
    
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/143590.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怎么提高程序设计能力?可以参考程序-设计原则,程序-设计模式

怎么提高程序设计能力&#xff1f; 简单说下我的方式方法&#xff1a; 【程序架构】 借鉴设计模式和设计原则 【程序业务】 多理解客户需求&#xff0c;理解后&#xff0c;做竞品逻辑分析&#xff0c;分析出其逻辑结构&#xff0c;和数据结构 &#xff1b; 再根据客户需求…

自己有工厂,怎样接外贸订单?

很多做外贸的小工厂和小型加工厂&#xff0c;除了传统的营销渠道外&#xff0c;也不知道如何做、才能接到外贸订单。小工厂想获得外贸订单&#xff0c;可通过以下7个方法&#xff1a;1、注册一些外贸B2B平台&#xff0c;发布产品&#xff0c;等待客户询盘外贸B2B平台太多了&…

商务车改装之奔驰威霆改装

今天来看看这台车的改装效果&#xff0c;首先外观改成GLS的一个包围。同时大灯换了一个三道杠的运动大灯。运动大灯加上包围&#xff0c;是不是时尚了很多。再来看看威霆内饰&#xff0c;白红相间的色彩搭配&#xff0c;仪表台换成一个大连屏的仪表台&#xff0c;带着飞机一样的…

4路DI开关检测计数器NPN/PNP输入,Modbus TCP协议,WiFi模块YL160频率测量 计数器

特点&#xff1a;● 4路开关量输入&#xff0c;支持NPN和PNP输入● DI每一路都可用作计数器或者频率测量● 支持Modbus TCP 通讯协议● 可以设置每转脉冲数用于转速测量● 内置网页功能&#xff0c;可以通过网页查询电平状态● 可以通过网页设定输出状态● 宽电源供电范围&…

OPC Expert v8.1.2211 Crack

像专业人士一样解决您的 OPC 和 DCOM 连接问题 [无需经验] 快速修复 OPC 和 DCOM 错误 使用 OPC Expert&#xff0c;您无需任何经验即可解决和修复 OPC 连接问题。OPC Expert 为您完成繁重的工作&#xff0c;以快速自动诊断 OPC 和 DCOM 问题……Ω578867473而且还不止于此。OP…

基于Java+SpringBoot+vue+element实现餐厅点餐系统平台

基于JavaSpringBootvueelement实现餐厅点餐系统平台 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 欢迎点赞 收藏 ⭐留言 文末获取源码联系方式 文章目录基于JavaSpringBo…

ATGM336H-5N定位导航模块介绍

ATGM336H-5N定位导航模块简介ATGM336H-5N系列模块是9.7X10.1尺寸的高性能BDS/GNSS全星座定位导航模块系列的总称。该系列模块产品都是基于中科微第四代低功耗GNSSSOC 单芯片—AT6558&#xff0c;支持多种卫星导航系统&#xff0c;包括中国的BDS&#xff08;北斗卫星导航系统&am…

基于Java+SpringBoot+vue+element实现火车订票平台管理系统

基于JavaSpringBootvueelement实现火车订票平台管理系统 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取…

足球视频AI(五)——球员与球的对象跟踪

一、基础概念 在之前的四节中&#xff0c;我们尝试解决&#xff1a; 1&#xff0c;球员识别、足球识别、裁判识别&#xff1b; 2&#xff0c;队伍的分类 3&#xff0c;平面坐标的换算 存在关键的问题是&#xff1a;每一帧的画面&#xff0c;每次都是重新识别&#xff0c;无…

【OpenCV】数字图像的表示 | 图像IO操作接口 | 图像混合操作

Ⅰ. 数字图像的表示 0x00 位数 计算机采用0/1编码的系统&#xff0c;数字图像也是利用0/1来记录信息。 我们平常接触的图像都是8位数图像&#xff0c;包含0&#xff5e;255灰度。 0&#xff1a;代表最黑 1&#xff1a;表示最白0x01 二值图像 一幅二值图像的二维矩阵仅由0、1…

VTK-vtkAbstractTransform及其子类

前言&#xff1a;本博文主要研究vtkAbstractTransform及其子类的实现原理&#xff0c;以及由此扩展的类。 目录 vtkAbstractTransform vtkGeneralTransform vtkGeoSphereTransform(9.0.3中没有该接口) vtkGeoTransform(9.0.3中没有该接口) vtkHomogeneousTransform vtkId…

企业微信客户联系自定义工具栏开发

一、问题说明&#xff1a;企业微信中私聊微信客户&#xff0c;在聊天输入框上面有一行快捷工具&#xff0c;这边怎么自定义开发&#xff1f;如下图&#xff0c;我们可以点击快捷发送&#xff0c;然后弹出一个页面&#xff0c;页面中有我们需要发送给当前会话的各种资源&#xf…

Maven高级-聚合-继承

多模块构建维护 作用&#xff1a;聚合用于快速构建maven工程&#xff0c;一次性构建多个项目/模块。 制作方式&#xff1a; 创建一个空模块&#xff0c;打包类型定义为pom <packaging>pom</packaging>定义当前模块进行构建操作时关联的其他模块名称 <modules&g…

2021年MathorCup高校数学建模挑战赛—大数据竞赛B题信息流智能推荐算法中的序列评估问题求解全过程文档及程序

2021年MathorCup高校数学建模挑战赛—大数据竞赛 B题 信息流智能推荐算法中的序列评估问题 原题再现&#xff1a; 随着互联网信息的蓬勃发展&#xff0c;用户在使用互联网应用时面临着信息过载的问题。推荐算法的出现&#xff0c;满足了用户个性化的内容消费需求&#xff0c;…

分布式任务调度系列 - XXL-JOB

一、前言 本内容仅用于个人学习笔记&#xff0c;如有侵扰&#xff0c;联系删除 二、传统的定时任务 1. 概念 1.1、定时任务的基本概念 程序为解决一个信息处理任务而预先编制的工作执行方案&#xff0c;这就是定时任务&#xff0c;核心组成如下&#xff1a; 执行器&#…

28个数据可视化图表的总结和介绍

数据可视化本身就是一种通用语言。我们这里通用语言的意思是&#xff1a;它能够向各行各业的人表示信息。它打破了语言和技术理解的障碍。数据是一些数字和文字的组合&#xff0c;但是可视化可以展示数据包含的信息。 “数据可视化有助于弥合数字和文字之间的差距”——Brie E…

C++入门——引用

1.概念 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空 间&#xff0c;它和它引用的变量共用同一块内存空间 类型& 引用变量名(对象名) 引用实体&#xff1b; 而引用类型必然要与引用实体的类型一致。 …

下载微信小程序中的视频

工具准备&#xff1a;Fiddler 我这里用的5.0的版本。&#xff08;这个用来抓取视频下载地址&#xff09;Internet Download Manager&#xff08;idm&#xff09;版本6.37&#xff08;这个用来下载视频&#xff09;步骤&#xff1a;打开Fiddler如下图配置后抓包2.登录PC版微信&a…

1.5日报

今天完成项目环境的搭建&#xff0c;并成功剥离TestMrl相关接口&#xff0c;并成功运行 遇到的问题&#xff1a; 缺少 <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <v…

展望2023,回首2022,让我们一起努力

回首2022 时光如白驹过隙般&#xff0c;飞逝而过。时光荏苒&#xff0c;日月如梭。不知不觉中&#xff0c;充满希望与光明的2023年已经到来了。回首2022年&#xff0c;有喜悦&#xff0c;有失落&#xff0c;有艰辛与困难&#xff0c;也有解决困难后的欣慰&#xff0c;有着无数…