C++用户信息管理服务 Thrift框架 Mysql数据落地 Redis数据缓存 Kafka 消息队列 总结 附主要源码

news2024/12/26 21:40:59

不知不觉入职已经一个月了,近期提交了考核2,要求如下:

1、编写一个管理用户信息的服务,通过thrift的远程过程调用实现用户信息管理功能
2、用户信息至少包括 唯一ID、用户名、性别、年龄、手机号、邮箱地址、个人描述
3、提供创建用户查询用户信息修改用户信息接口,其中修改用户信息要求除用户ID外其它信息可单独修改
4、数据存储要求使用mysql落地数据(使用存储过程)、使用redis做数据缓存、数据发生变化时通过kafka发送变化消息(使用json数据格式)
5、实现用户信息操作客户端(通过命令行操作)
6、实现用户信息变化处理服务,用户信息变化时输出变化的内容

一、主要思路

首先是用Thrift框架搭建服务端和客户端,其中包括各模块功能,然后服务端将数据同MySQL服务器和redis缓存服务器进行连接。并在用户信息变化时通过生产者传递信息给kafka中的消费者。

查询模块流程

查询(先查询redis 再从查询mysql)
    1客户端选择想要进行的操作 -> 
    2调用服务端函数 -> 
    3服务端查询redis是否存在数据 若没有->4 若有->6
    4服务端查询mysql 若没有->7 若有->5
    5写回redis ->
    6返回数据给客户端 ->
    7返回错误代码 ->

    

创建模块流程

插入 (先插入mysql 再插入redis 再传递kafka)
    1客户端选择想要进行的操作 -> 
    2调用服务端函数 -> 
    3服务端查询mysql是否存在数据 若没有->4 若有->10
    4插入mysql ->
    5插入redis ->
    6传递数据给producer->
    7producer序列化并传递数据给consumer->
    8consumer反序化并输出->
    9返回数据给客户端 ->
    10返回错误代码 ->

修改模块流程

修改 (先修改mysql 再修改redis 再传递kafka)
    1客户端选择想要进行的操作 -> 
    2调用服务端函数 -> 
    3服务端查询mysql是否存在数据 若没有->4 若有->10
    4修改mysql ->
    5修改redis ->
    6传递数据给producer->
    7producer序列化并传递数据给consumer->
    8consumer反序化并输出->
    9返回数据给客户端 ->
    10返回错误代码 ->

 函数返回值错误代码:


    return -2没有该用户id 
    return -1id不合法
    return -101 服务端getuser失败
    return 1数据插入数据库失败
    return 2新数据修改数据库vec失败!
    return 3客户端输出失败

    return 301 redisContext链接失败
    return 302 reply 获取失败
    return 303 reply->type 类型不对
    return 304 reply 数值不对
    return 401 mysql语句返回错误
    return 402 l_result返回错误
    return 403 判断存储过程中的return是否出错
    return 404 数据库没有该id的数据
    return 405 row[0]为null
    return 406 初始化mysql失败
    return 407 mysql连接失败
    return 501 反序列化失败
    return 502 rd_kafka_conf_set失败
    return 503 rd_kafka_new失败

二、数据格式

    MYSQL数据表中的数据格式

数据库 数据表格式:
    create table test(id int primary key ,name varchar(20) not null DEFAULT 'unknow',sex bool, age varchar(20) not null DEFAULT 'unknow',  tel varchar(20) not null DEFAULT 'unknow', email varchar(30) not null DEFAULT 'unknow', pers_desc varchar(150) not null DEFAULT 'unknow')

 MYSQL中的存储过程

存储过程:
    queryuser:
    mysql> create procedure queryuser (in in_id int,out out_ret int,out out_name varchar(20),out out_sex bool,out out_age varchar(20),out out_tel varchar(20),out out_email varchar(30),out out_pers_desc varchar(150))
        ->  label_a:begin
        -> declare v_name varchar(20) ;
        -> declare v_sex bool ;
        -> declare v_age varchar(20) ;
        -> declare v_tel varchar(20) ;
        -> declare v_email varchar(30) ;
        -> declare v_pers_desc varchar(150) ;
        ->  if (in_id<=0) then
        ->  set out_ret=-1; #id error
        -> SELECT out_ret; 
        ->  leave label_a;
        -> end if;
        -> SELECT name,sex,age,tel,email,pers_desc into v_name,v_sex,v_age,v_tel,v_email,v_pers_desc from test where id=in_id limit 1;
        ->  if v_name is NULL then
        ->  set out_ret=-2; #id null
        -> SELECT out_ret; 
        ->  leave label_a;
        -> end if;
        -> set out_ret=0;
        -> set out_name=v_name;
        -> set out_sex=v_sex;
        -> set out_age=v_age;
        -> set out_tel=v_tel;
        -> set out_email=v_email;
        -> set out_pers_desc=v_pers_desc;
        -> SELECT out_ret,out_name,out_sex,out_age,out_tel,out_email,out_pers_desc;
        -> end;
    //call queryuser (1, @ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_pers_desc);


    insertuser:
        mysql> create procedure insertuser (in in_id int,in in_name varchar(20),in in_sex bool,in in_age varchar(20),in in_tel varchar(20),in in_email varchar(30),in in_pers_desc varchar(150),out out_ret int)
        ->  label_a:begin
        ->  if (in_id<=0) then
        ->  set out_ret=-1; #id error
        -> SELECT out_ret; 
        ->  leave label_a;
        -> end if;
        -> insert into test (id,name,sex,age,tel,email,pers_desc) values (in_id,in_name,in_sex,in_age,in_tel,in_email,in_pers_desc);
        -> set out_ret=0;
        -> SELECT out_ret;
        -> end;
    //call insertuser (2,'renjianxu',0,22,123,123,cool man,@ret);

    updatauser:
        mysql> create procedure updatauser (in in_id int,in in_name varchar(20),in in_sex bool,in in_age varchar(20),in in_tel varchar(20),in in_email varchar(30),in in_pers_desc varchar(150),out out_ret int)
        ->  label_a:begin
        ->  if (in_id<=0) then
        ->  set out_ret=-1; #id error
        -> SELECT out_ret; 
        ->  leave label_a;
        -> end if;
        -> update test set name=in_name,sex=in_sex,age=in_age,tel=in_tel,email=in_email,pers_desc=in_pers_desc where id=in_id;
        -> set out_ret=0;
        -> SELECT out_ret;
        -> end;
    //call updatauser (2,'renjianxu',0,23,123,123,cool man,@ret);

 Redis数据格式

Redis数据格式:
    127.0.0.1:6379> hgetall user1
    1) "name"
    2) "zed"
    3) "sex"
    4) "0"
    5) "age"
    6) "22"
    7) "tel"
    8) "1756915"
    9) "email"
    10) "1208086"
    11) "pers_desc"
    12) "handsome man"

Json数据格式

Json数据格式:
    root["ID"]=msg.seqId;
    root["name"]=msg.name;
    root["sex"]=msg.sex;
    root["age"]=msg.age;
    root["tel"]=msg.tel;
    root["email"]=msg.email;
    root["pers_desc"]=msg.pers_desc;
    举例:
    ID:1
    name:"zed"
    sex:0
    age:"22"
    tel:"17569150186"
    email:"1208086"
    pers_desc:"handsome man"

三、代码

代码结构

Datainfo.thrift
client.cpp
consumer.cpp
gen-cpp/Datainfo_types.cpp
gen-cpp/Datainfo_types.h
gen-cpp/Makefile
gen-cpp/Producer.cpp
gen-cpp/Producer.h
gen-cpp/serDemo.cpp
gen-cpp/serDemo.h
gen-cpp/serDemo_server.skeleton.cpp
readme

编译方式

对serDemo_server.skeleton.cpp client.cpp consumer.cpp 分别编译

编译serDemo_server.skeleton.cpp和client.cpp时 分别将cpp放在gen-cpp文件夹里 然后修改Makefile文件中的SRCS和TARGER

编译consumer.cpp时不需要串联其他cpp 只需要编译自身即可

Makefile部分

#
SRCS := serDemo_server.skeleton.cpp Datainfo_types.cpp serDemo.cpp Producer.cpp
OBJS := $(patsubst %.cpp,%.o,$(SRCS))#得到所有生成终极目标的依赖:(.o文件)
FLAG1 :=-lmysqlclient -lthrift -lhiredis  -lrdkafka  -ljsoncpp
FLAG2 :=-L/root/thrift-0.18.1/lib/cpp/.libs -L/usr/lib/mysql 
FLAG3 :=-I/root/thrift-0.18.1/lib/cpp/src -I/usr/include/mysql8 
FLAG4 :=-std=c++11
TARGER := server

$(TARGER):$(OBJS) #终极目标
	g++ -o $@ $^ $(FLAG1)  $(FLAG2) $(FLAG3)

#DEPS := $(patsubst %.o, %.d, $(OBJS)) 
DEPS := $(patsubst %.cpp, %.d, $(SRCS))#将所有 .cpp文件 对应命名.d文件赋值给deps
ifneq ($(MAKECMDGOALS), clean)
	-include $(DEPS) 
endif


%.d: %.cpp
	g++ -MM $< > $@  $(FLAG4)  

%.o: %.cpp 
	g++ -c $< $(FLAG4) 


.PHONY: clean
#清除规则 加入PHONY防止同名clean文件干扰
clean:
	rm -f $(DEPS)
	rm -f $(OBJS)
	rm -f TARGER

服务器部分

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "serDemo.h"
#include "Producer.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <map>
#include <string>
#include <pthread.h>
#include <vector>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <mysql8/mysql.h>
#include <cstdio>
#include <hiredis.h>

// 读: 读redis->没有,读mysql->把mysql数据写回redis,有的话直接从redis中取;
// 写: 写mysql->成功,再写redis;
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
//l_msg_temp.id =-1 getuser 返回了一个空值
// return -2没有该用户id 
// return -1id不合法
// return -101 服务端getuser失败
// return 1数据插入数据库失败
// return 2新数据修改数据库vec失败!
// return 3客户端输出失败

// return 301 redisContext链接失败
// return 302 reply 获取失败
// return 303 reply->type 类型不对
// return 304 reply 数值不对
// return 401 mysql语句返回错误
// return 402 l_result返回错误
// return 403 判断存储过程中的return是否出错
// return 404 数据库没有该id的数据
// return 405 row[0]为null
// return 406 初始化mysql失败
// return 407 mysql连接失败
// return 501 反序列化失败
// return 502 rd_kafka_conf_set失败
// return 503 rd_kafka_new失败
class serDemoHandler : virtual public serDemoIf {
 public:

  const char *hostname = "127.0.0.1";
  const int l_redis_size=12;
  const int l_redis_port=6379;
  serDemoHandler() {
    // Your initialization goes here
  }

  //查询用户
  int32_t QueryUser(const int32_t seqId)
  {
    printf("\nQueryMysql...\n");

    
    if(seqId<1||seqId>=0x7fffffff)
    {
      printf("用户id输入长度或数值不合法,请重新键入数字...\n");
      return -1;
    }
    //先找redis
    //-------------------操作redis部分--------------------↓
    //
    //1链接测试部分
    printf("进行redis链接...先查询redis是否存在数据\n");
    redisContext *connect=NULL;
    redisReply *reply=NULL;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);
    if (connect == NULL || connect->err) 
    {
        if (connect) 
        {
            printf("connect error: %s\n", connect->errstr);	
            redisFree(connect);	//释放redisConnect()所产生的连接。
        } 
        else 
        {
            printf("connect error: can't allocate redis context\n");
        }
        return 301;
    }
    //1链接测试部分

    //2从redis中找数据部分
    reply = (redisReply*)redisCommand(connect,"hgetall user%d",seqId);
    if(reply==NULL)
    {
        std::cout<<"reply获取失败"<<std::endl;
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 302;
    }
    else if(reply->type!=REDIS_REPLY_ARRAY)
    {
        
        std::cout<<"reply->type 类型错误"<<std::endl;
        freeReplyObject(reply);
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 2;
    }
    else if(reply->elements==l_redis_size)//在redis中找到了
    {
        
        std::cout<<"queryuser函数中在redis中找到了"<<std::endl;

        for(int i=0;i<reply->elements;i++)
        {
            if(reply->element[i]==NULL)
            {
                std::cout<<"reply获取失败"<<std::endl;
                freeReplyObject(reply);
                redisFree(connect);	//释放redisConnect()所产生的连接。
                return 302;
            }
            else if(reply->element[i]->type!=REDIS_REPLY_STRING)
            {   
                std::cout<<"数据展示出错2"<<std::endl;
                freeReplyObject(reply);
                redisFree(connect);	//释放redisConnect()所产生的连接。
                return 303;
            }
            //1//输出一下
            std::string l_strs_temp(reply->element[i]->str,reply->element[i]->len);
            printf("index %d:%s\n",i,l_strs_temp.c_str());
            if(i%2==1)//数据部分
            {
              //对于数据部分一些操作...
            }

        }

        freeReplyObject(reply);
        printf("查询redis完毕 找到数据结束redis链接...\n");
        return 0;
    }
    printf("查询redis完毕 没找到数据结束redis链接...\n");
    freeReplyObject(reply);
    redisFree(connect);
    //-------------------操作redis部分--------------------↑
    
    //再找数据库
    //-------------------操作数据库部分--------------------
    printf("开始查找数据库...\n");
    
    MYSQL* mysql = mysql_init(NULL);  
    //MYSQL* mysql;
    
    if(mysql==NULL)
    {
        printf("初始化环境失败\n");
        return 406;//
    }
    
    // 2. 连接数据库服务器
    mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101", 
                                "exam2", 3306, NULL, 0);
                           
    if(mysql == NULL)
    {
        printf("连接数据库服务器失败\n");
        return 407;//
    }
    
    //4调用过程//
    char l_call_cstr[100]={};
    //std::string l_call_str="";
    sprintf(l_call_cstr, "call queryuser (%d,@ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_desc);",seqId);
    //l_call_str=l_call_cstr;
    int l_ret1=mysql_query(mysql,l_call_cstr);
    if(l_ret1!=0)
    {
        std::cout<<"error"<<std::endl;
        return 401;
    }

    
    //返回结果测试  
    MYSQL_RES* l_result=mysql_store_result(mysql);
    if(l_result==NULL)
    {
        printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
        //mysql_free_result(l_result);
        return 402;
    }

    
    //6展示
    MYSQL_ROW row;
    row = mysql_fetch_row(l_result);
    int num=mysql_num_fields(l_result);
    mysql_free_result(l_result);
    
    //std::string s_temp="404";
    
    if(strncmp(row[0],"404",3)==0)//strncmp(row[0],"0",1)!=0
    {
      printf("没有当前查询id\n");
      return 404;//没有该id
    }
    else if(strncmp(row[0],"0",1)!=0)
    {

      printf("存储过程中的return出错%s\n",row[0]);
      return 403;//判断存储过程中的return是否出错
    }
    else if(num!=7||row[0]==NULL||row[1]==NULL||row[2]==NULL||row[3]==NULL||row[4]==NULL||row[5]==NULL||row[6]==NULL)
    {
      std::cout<<num<<std::endl;
      printf("row返回出错或row->num出错\n");
      return 405;//判断存储过程中的return是否出错
    }
    else printf("存储过程返回值为[%s]\n数据为:%s\n%s\n%s\n%s\n%s\n%s\n%s\n",
                row[0], row[1],row[2],row[3],row[4],row[5],row[6],row[7]);
      //释放mysql资源
    
    mysql_close(mysql); 
    printf("queryuser 函数在数据库中找到相关数据...\n");
    //-------------------操作数据库部分--------------------↑



    //-------------------操作redis部分--------------------↓
    printf("插入mysql完毕...将数据插入到redis...\n");
    connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);


    //1链接测试部分
    if (connect == NULL || connect->err) 
    {
        if (connect) 
        {
            printf("connect error: %s\n", connect->errstr);	
            redisFree(connect);	//释放redisConnect()所产生的连接。
        } 
        else 
        {
            printf("connect error: can't allocate redis context\n");
        }
        return 301;
    }
    


    //2写入redis中数据部分
    reply = (redisReply*)redisCommand(connect,"hmset user%d name %s sex %d age %s tel %s email %s pers_desc %s",seqId,row[1],(*(row[2])=='1'),row[3],row[4],row[5],row[6]);
    if(reply==NULL)
    {
        std::cout<<"reply获取失败"<<std::endl;
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 302;
    }
    else if(reply->type!=REDIS_REPLY_STATUS)
    {
        
        std::cout<<"reply->type 类型错误"<<std::endl;
        freeReplyObject(reply);
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 303;
    }
    else if(strncmp("OK",reply->str,2)!=0)
    {
        std::cout<<"数据插入失败"<<std::endl;
        freeReplyObject(reply);
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 304;
    }
    printf("query将数据插入redis完毕 ...\n");
    //-------------------操作redis部分--------------------上


    //3释放redis资源
    freeReplyObject(reply);
    redisFree(connect);

    printf("QueryUser函数结束...\n\n");
    return 0;//该id存在
  }

  int32_t InsertUser(const int32_t seqId,const message& msg)
  {
    printf("\nInsertUser...\n");
    message l_temp_msg=msg;
    int32_t l_Query_return=0;

    //1 判断一下该id是否存在 
    l_Query_return=QueryUser(seqId);
    if(l_Query_return==0)
    {
      printf("该id数据已经存在,插入失败!\n");
      return l_Query_return;
    }
    else if(l_Query_return!=404)
    {
      printf("QueryMysql出错 l_Query_return=%d\n",l_Query_return);
      return l_Query_return;
    }




    //先插入mysql
    //-------------------操作数据库部分--------------------
    //mysql指针初始化
    MYSQL* mysql = mysql_init(NULL);    
    if(mysql==NULL)
    {
        printf("初始化环境失败\n");
        return 1;//
    }

    // 2. 连接数据库服务器
    mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101", 
                                "exam2", 3306, NULL, 0);
    if(mysql == NULL)
    {
        printf("连接数据库服务器失败\n");
        return 2;//
    }

    //4调用过程

    char l_call_cstr[100]={};
    //std::string l_call_str="";
    sprintf(l_call_cstr, "call insertuser (%d,'%s',%d,'%s','%s','%s','%s',@ret);",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
    //l_call_str=l_call_cstr;
    //printf("%s\n",l_call_cstr);
    int32_t l_ret1=mysql_query(mysql, l_call_cstr);
    if(l_ret1!=0)
    {
        std::cout<<"call_insertstudent_error"<<std::endl;
        return 401;
    }


    //返回结果测试  
    // MYSQL_RES* l_result=mysql_store_result(mysql);
    // if(l_result==NULL)
    // {
    //     printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
    //     mysql_free_result(l_result);
    //     return 402;
    // }
    // mysql_free_result(l_result);

    //返回结果测试
    l_Query_return=QueryUser(seqId);
    if(l_Query_return!=0)
    {
      printf("并没有插入进数据库..插入失败!\n");
      return l_Query_return;
    }
    

    mysql_close(mysql); 
    printf("成功插入进数据库..插入成功!\n");
    //-------------------操作数据库部分--------------------↑

    //然后再插入redis
    //-------------------操作redis部分--------------------↓

    redisContext *connect=NULL;
    redisReply *reply=NULL;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);

    if (connect == NULL || connect->err) 
    {
        if (connect) 
        {
          printf("connect error: %s\n", connect->errstr);	
          redisFree(connect);	//释放redisConnect()所产生的连接。
        } 
        else 
        {

          printf("connect error: can't allocate redis context\n");
        
        }
        return 301;
    }
    //1链接测试部分


    //2写入redis中数据部分
    reply = (redisReply*)redisCommand(connect,"hmset user%d name %s sex %d age %s tel %s email %s pers_desc %s",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
    if(reply==NULL)
    {
        std::cout<<"reply获取失败"<<std::endl;
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 302;
    }
    else if(reply->type!=REDIS_REPLY_STATUS)
    {
        
        std::cout<<"reply->type 类型错误"<<std::endl;
        freeReplyObject(reply);
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 303;
    }
    else if(strncmp("OK",reply->str,2)!=0)
    {
        std::cout<<"数据插入失败"<<std::endl;
        freeReplyObject(reply);
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 304;
    }
    printf("成功插入进redis..插入成功!\n");
    freeReplyObject(reply);
    redisFree(connect);
    //-------------------操作redis部分--------------------↑

    int l_return_pro=Prod2Cons(msg);
    //传递给生产者
    if(l_return_pro!=0)
    {
      printf("\n传递给生产者出错...\n");
    }
    printf("\nInsertUser函数结束...\n\n");
    return 0;


  }


  int32_t UpdataUser(const int32_t seqId,const message& msg)
  {
    printf("\nUpdataUser...\n");
    message l_temp_msg=msg;
    int32_t l_Query_return=0;

    //1 判断一下该id是否存在 
    l_Query_return=QueryUser(seqId);
    if(l_Query_return==404)
    {
      printf("该id数据不存在,修改失败!\n");
      return l_Query_return;
    }
    else if(l_Query_return!=0)
    {
      printf("QueryMysql出错 l_Query_return=%d\n",l_Query_return);
      return l_Query_return;
    }



    //-------------------操作数据库部分--------------------
    //mysql指针初始化
    MYSQL* mysql = mysql_init(NULL);    
    if(mysql==NULL)
    {
        printf("初始化环境失败\n");
        return 1;//
    }

    // 2. 连接数据库服务器
    mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101", 
                                "exam2", 3306, NULL, 0);
    if(mysql == NULL)
    {
        printf("连接数据库服务器失败\n");
        return 2;//
    }

    //4调用过程

    char l_call_cstr[100]={0};
    //std::string l_call_str="";
    sprintf(l_call_cstr, "call updatauser (%d,'%s',%d,'%s','%s','%s','%s',@ret);",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
    //l_call_str=l_call_cstr;
    int32_t l_ret1=mysql_query(mysql, l_call_cstr);
    if(l_ret1!=0)
    {
        std::cout<<"call_updatastudent_error"<<std::endl;
        return 401;
    }


    // //返回结果测试  
    // MYSQL_RES* l_result=mysql_store_result(mysql);
    // if(l_result==NULL)
    // {
    //     printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
    //     mysql_free_result(l_result);
    //     return 402;
    // }
    // mysql_free_result(l_result);

    l_Query_return=QueryUser(seqId);
    if(l_Query_return!=0)
    {
      printf("并没有修改进数据库..插入失败!\n");
      return l_Query_return;
    }

    mysql_close(mysql); 
    printf("成功修改进数据库..插入成功!\n");
    //-------------------操作数据库部分--------------------



    //-------------------操作redis部分--------------------
    redisContext *connect=NULL;
    redisReply *reply=NULL;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);

    if (connect == NULL || connect->err) 
    {
        if (connect) 
        {
          printf("connect error: %s\n", connect->errstr);	
          redisFree(connect);	//释放redisConnect()所产生的连接。
        } 
        else 
        {

          printf("connect error: can't allocate redis context\n");
        
        }
        return 301;
    }
    //1链接测试部分


    //2写入redis中数据部分
    reply = (redisReply*)redisCommand(connect,"hmset user%d name %s sex %d age %s tel %s email %s pers_desc %s",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
    if(reply==NULL)
    {
        std::cout<<"reply获取失败"<<std::endl;
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 302;
    }
    else if(reply->type!=REDIS_REPLY_STATUS)
    {
        
        std::cout<<"reply->type 类型错误"<<std::endl;
        freeReplyObject(reply);
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 303;
    }
    else if(strncmp("OK",reply->str,2)!=0)
    {
        std::cout<<"数据插入失败"<<std::endl;
        freeReplyObject(reply);
        redisFree(connect);	//释放redisConnect()所产生的连接。
        return 304;
    }
    printf("成功修改进redis..修改成功!\n");
    freeReplyObject(reply);
    redisFree(connect);
    //-------------------操作redis部分--------------------
    int l_return_pro=Prod2Cons(msg);
    //传递给生产者
    if(l_return_pro!=0)
    {
      printf("\n传递给生产者出错...\n");
    }

    printf("\nUpdata函数结束...\n\n");
    return 0;

  }



  void GetUser(message& _return, const int32_t seqId) 
  {
    printf("\nGetUser...\n");
    //检查id//
    message l_msg_temp;
    l_msg_temp.seqId=-101;
    int32_t l_Query_return=0;
    //1检查id
    l_Query_return=QueryUser(seqId);
    if(l_Query_return==404)
    {
      printf("该id数据查找不存在\n");
      l_msg_temp.seqId=-404;
      _return=l_msg_temp;
      return ;
    }
    else if(l_Query_return!=0)
    {
      printf("QueryUser出错 l_Query_return=%d\n",l_Query_return);
      l_msg_temp.seqId=-403;
      _return=l_msg_temp;
      return ;
    }

    //id存在...此时redis中一定存在该数据 因为getuser前面是query
    //-------------------操作redis部分--------------------↓
    redisContext *connect=NULL;
    redisReply *reply=NULL;
    struct timeval timeout = { 1, 500000 }; // 1.5 seconds
    connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);

    if (connect == NULL || connect->err) 
    {
        if (connect) 
        {
            printf("connect error: %s\n", connect->errstr);	
            redisFree(connect);	//释放redisConnect()所产生的连接。
        } 
        else 
        {
            printf("connect error: can't allocate redis context\n");
        }
        l_msg_temp.seqId=-301;
        _return=l_msg_temp;
        
        return ;
    }
    //1链接测试部分


    //2得到redis中数据部分
    reply = (redisReply*)redisCommand(connect,"hgetall user%d",seqId);
    if(reply==NULL)
    {
        std::cout<<"reply获取失败"<<std::endl;
        l_msg_temp.seqId=-302;
      _return=l_msg_temp;
      redisFree(connect);
      return ;
    }
    else if(reply->type!=REDIS_REPLY_ARRAY)
    {
        
        std::cout<<"reply->type 类型错误"<<std::endl;
        freeReplyObject(reply);
        l_msg_temp.seqId=-303;
        _return=l_msg_temp;
        redisFree(connect);
        return ;
    }
    else if(reply->elements!=l_redis_size)//
    {
        std::cout<<"reply->elements数值错误"<<std::endl;
        freeReplyObject(reply);
        l_msg_temp.seqId=-304;
        _return=l_msg_temp;
        redisFree(connect);
        return ;
    }

    //3找到了
    std::cout<<"在redis中找到了"<<std::endl;
    for(int i=0;i<reply->elements;i++)
    {
        if(reply->element[i]==NULL)
        {
            std::cout<<"reply获取失败"<<std::endl;
            freeReplyObject(reply);
            l_msg_temp.seqId=-302;
            _return=l_msg_temp;
            redisFree(connect);
            return ;
        }
        else if(reply->element[i]->type!=REDIS_REPLY_STRING)
        {   
            std::cout<<"数据展示出错2"<<std::endl;
            freeReplyObject(reply);
            redisFree(connect);
            l_msg_temp.seqId=-303;
            _return=l_msg_temp;
            return ;
        }
        //1//输出一下
        std::string l_strs_temp(reply->element[i]->str,reply->element[i]->len);
        printf("index %d:%s\n",i,l_strs_temp.c_str());
        
        switch (i)
        {
          case 1:
            l_msg_temp.name=l_strs_temp;
            break;
          case 3:
            l_msg_temp.sex=l_strs_temp[0]-'0';
            break;
          case 5:
            l_msg_temp.age=l_strs_temp;
          break;
          case 7:
            l_msg_temp.tel=l_strs_temp;
          break;
          case 9:
            l_msg_temp.email=l_strs_temp;
          break;
          case 11:
            l_msg_temp.pers_desc=l_strs_temp;
          break;
          default:
            break;
        }
    }
    l_msg_temp.seqId=seqId;
    _return=l_msg_temp;
    freeReplyObject(reply);
    redisFree(connect);
    printf("\nGetUser函数结束...\n");
    return ;
    //-------------------操作redis部分--------------------上



    //query之后不需要在查询数据库

    // //-------------------操作数据库部分--------------------
    // //2 mysql指针初始化
    // MYSQL* mysql = mysql_init(NULL);    
    // if(mysql==NULL)
    // {
    //     printf("初始化环境失败\n");
    //     _return=l_msg_temp;
    //     return ;//
    // }

    // mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101", 
    //                             "exam2", 3306, NULL, 0);
    // if(mysql == NULL)
    // {
    //     printf("连接数据库服务器失败\n");
    //     _return=l_msg_temp;
    //     return ;//
    // }
    // //3 调用过程插入一个insert string吧
    // char l_call_cstr[100]={};
    // //std::string l_call_str="";
    // sprintf(l_call_cstr, "call queryuser (%d,@ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_desc);",seqId);
    // //l_call_str=l_call_cstr;
    // int l_ret1=mysql_query(mysql,l_call_cstr);
    // if(l_ret1!=0)
    // {
    //     std::cout<<"call_error"<<std::endl;
    //     _return=l_msg_temp;
    //     return ;
    // }

    // // //4调用out过程
    // // int l_ret2=mysql_query(mysql, " select @ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_desc;");
    // // if(l_ret2!=0)
    // // {
    // //     std::cout<<"error"<<std::endl;
    // //     _return=l_msg_temp;
    // //     return ;
    // // }

    // //5 返回结果测试  
    // MYSQL_RES* l_result=mysql_store_result(mysql);
    // if(l_result==NULL)
    // {
    //     printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
    //     mysql_freel_result_result(l_result);
    //     _return=l_msg_temp;
    //     return ;
    // }

    
    // //if()
    // //6展示
    // MYSQL_ROW row;
    // while((row = mysql_fetch_row(l_result))!=NULL) 
    // {
    //   std::string s_temp="404";
    //   if(row[0]==s_temp)
    //   {
    //     printf("没有当前查询\n");
    //     l_msg_temp.seqId=-404;
    //     _return=l_msg_temp;
    //     return ;//没有该id
    //   }
    //   else if(row[0]!=0)
    //   {
    //     printf("存储过程中的return出错\n");
    //     l_msg_temp.seqId=-403;
    //     _return=l_msg_temp;
    //     return ;//判断存储过程中的return是否出错
    //   }
    //   else printf("[%s]-[%s]\n", row[0], row[1]);
    //   l_msg_temp.seqId=seqId;
    //   l_msg_temp.name=row[1];
    //   l_msg_temp.sex=row[2];

    //   l_msg_temp.age=row[3];//
    //   l_msg_temp.tel=row[4];
    //   l_msg_temp.email=row[5];
    //   l_msg_temp.pers_desc=row[6];
    //   _return=l_msg_temp;
    //   } 
    // mysql_free_result(l_result);
    // mysql_close(mysql); 
    // return ;
    
  }

};

int main() {
  int l_thrift_port = 9090;
  ::std::shared_ptr<serDemoHandler> handler(new serDemoHandler());
  ::std::shared_ptr<TProcessor> processor(new serDemoProcessor(handler));
  ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(l_thrift_port));
  ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  // MYSQL* mysql = mysql_init(NULL);  
  // cout<<1<<endl;
  server.serve();
  return 0;
}

客户端部分

// 在同级目录下创建 client.cpp 文件
// ----------替换成自己的头文件----------
#include "serDemo.h"

// --------------------------------------
#include <thrift/transport/TSocket.h>
#include <iostream>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <map>
#include <string>
#include <pthread.h>
#include <vector>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <mysql8/mysql.h>
#include <cstdio>
#include <hiredis.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
// using boost::shared_ptr;   
//return -2没有该用户id 
//return -1id不合法
//return -101 服务端getuser失败
//return 1数据插入数据库失败
//return 2新数据修改数据库vec失败!
//return 3客户端输出失败
//return 401 mysql语句返回错误
//return 402 l_result返回错误
//return 403 判断存储过程中的return是否出错
//return 404 数据库没有该id的数据
int32_t CheckId(serDemoClient client,int32_t l_tmp_seqId)
{
  
  int32_t l_id_temp=l_tmp_seqId;
  int32_t l_cnt_now=client.QueryUser(l_id_temp);
  if(l_cnt_now==-1)
  {
    printf("用户id输入长度或数值不合法\n");
    return -1;
  }
  else if(l_cnt_now==404)
  {
    printf("没有该id的用户\n");
    return 404;//没有该用户
  }
  else if(l_cnt_now!=0)
  {
    printf("查询有其他错误\n");
    return l_cnt_now;
  }
  printf("该用户id存在...\n");
  return 0;
}
int32_t OutPut(const message& msg) {
  printf("id:%d\nname:%s\nsex:%d\nage:%s\ntel:%s\nemail:%s\npers_desc:%s\n",
      msg.seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
  return 0;
}
int32_t FindUser(serDemoClient client)
{
  int32_t l_tmp_seqId=0;
  // Your implementation goes here
  printf("正在查询用户...\n");
  printf("请输入你想查询的用户id,请键入数字...\n");
  while (1)
  {
    
    std::cin>>l_tmp_seqId;
    int32_t l_CheckId_return=CheckId(client,l_tmp_seqId);
    if(l_CheckId_return==0)//这个id既合法也存在
    {
      break;
    }
    
    printf("用户id输入不合法,请重新键入数字...\n");
  }
  
  //得到该用户
  message l_msg_temp;  
  client.GetUser(l_msg_temp,l_tmp_seqId);
  if(l_msg_temp.seqId<=0)
  {
    std::cout<<"GetUser失败"<<std::endl;
    return -101;
  }
  if(OutPut(l_msg_temp))
  {
    std::cout<<"客户端输出失败"<<std::endl;
    return 3;
  }
  std::cout<<"查找用户成功!"<<std::endl;

  return 0;
  
}
int32_t CreateUser(serDemoClient client)//
{
  printf("正在创建用户...\n");
  message l_msg_temp;
  int32_t l_id_temp=0,l_sex_temp;
  std::string l_name_temp,l_age_temp,l_tel_temp,l_email_temp,l_desc_temp;
  printf("请输入你想创建的用户id,请键入数字...\n");
  while (1)
  {
    std::cin>>l_id_temp;
    int32_t l_CheckId_return=CheckId(client,l_id_temp);
    if(l_CheckId_return==404)//用户id不存在
    {
      printf("用户id不存在 可以插入!\n");
        l_msg_temp.seqId=l_id_temp;
      break;
    }
    else if(l_CheckId_return==0)
    {
      printf("该id已被人占用,请重新输入!\n");
      continue;
    }
    else 
    {
      printf("数据输入不合理,请重新输入!\n");
      continue;
    }
  }

  printf("请输入你创建用户的name\n");
  while (1)
  {
    std::cin>>l_name_temp;
    if(l_name_temp.size()<=20)
    {
      printf("用户name输入成功!\n");
      l_msg_temp.name=l_name_temp;
      break;
    }
    else 
    {
      printf("name输入太长,请重新输入!\n");
      continue;
    }
  }
    
  printf("请输入你创建用户的sex:0代表男,1代表女\n");
  while (1)
  {
    std::cin>>l_sex_temp;
    if(l_sex_temp==0||l_sex_temp==1)
    {
      printf("用户sex输入成功!\n");
      l_msg_temp.sex=l_sex_temp;
      break;
    }
    else 
    {
      printf("sex输入错误,请重新输入!\n");
      continue;
    }
  }

  printf("请输入你创建用户的age\n");
  while (1)
  {
    std::cin>>l_age_temp;
    int l_flag_error=0;
    for(int i=0;i<l_age_temp.size();i++)
    {
      if(l_age_temp[i]>'9'||l_age_temp[i]<'0')
      {
        l_flag_error=1;
        break;
      }
    }

    if(l_flag_error==1)
    {
      printf("age输入错误,请重新输入!\n");
      continue;
    }
    else if(l_age_temp.size()<=3&&l_age_temp.size()>=1)
    {
      printf("用户age输入成功!\n");
      l_msg_temp.age=l_age_temp;
      break;
    }
    else 
    {
      printf("age输入有其他错误,请重新输入!\n");
      continue;
    }
  }

  printf("请输入你创建用户的tel\n");
  while (1)
  {
    std::cin>>l_tel_temp;
    int32_t l_check_flag=0;
    for(int i=0;i<l_tel_temp.size();i++)
    {
      if(l_tel_temp[i]<'0'||l_tel_temp[i]>'9')
      {
        printf("tel输入非法字符,请重新输入!\n");
        l_check_flag=1;
        break;
      }
    }
    if(l_check_flag)continue;
    if(l_tel_temp.size()<=20)
    {
      printf("用户tel输入成功!\n");
      l_msg_temp.tel=l_tel_temp;
      break;
    }
    else 
    {
      printf("tel输入过长错误,请重新输入!\n");
      continue;
    }
  }

  printf("请输入你创建用户的email\n");
  while (1)
  {
    std::cin>>l_email_temp;
    if(l_email_temp.size()<=30)
    {
      printf("用户email输入成功!\n");
      l_msg_temp.email=l_email_temp;
      break;
    }
    else 
    {
      printf("email输入过长错误,请重新输入!\n");
      continue;
    }
  }

  printf("请输入你创建用户的personal_describe\n");
  while (1)
  { 
    getchar();
    getline(std::cin,l_desc_temp);
    if(l_desc_temp.size()<=150)
    {
      printf("用户personal_describe输入成功!\n");
      l_msg_temp.pers_desc=l_desc_temp;
      break;
    }
    else 
    {
      printf("personal_describe输入过长错误,请重新输入!\n");
      continue;
    }
  }

  int32_t l_CreateUser_return=client.InsertUser(l_id_temp,l_msg_temp);
  //将数据插入数据库 

  if(l_CreateUser_return!=0)
  {
    //插入数据失败!
    std::cout<<"数据插入数据库失败!"<<std::endl;
    return l_CreateUser_return;
  }

  std::cout<<"创建用户成功!"<<std::endl;
  return 0;

}
int32_t UpdataUser(serDemoClient client)
{
  printf("正在修改用户...\n");
  //预处理一些数据供使用
  message l_msg_temp,l_msg_temp2;
  int32_t l_id_temp=0,l_sex_temp=0;
  std::string l_name_temp,l_age_temp,l_tel_temp,l_email_temp,l_desc_temp;
  //找到该用户在vec的下标
  printf("请输入你想修改的用户id,请键入数字...\n");
  while (1)
  {
    std::cin>>l_id_temp;
    int l_CheckId_return=CheckId(client,l_id_temp);
    if(l_CheckId_return==0)
    {
      printf("用户id输入成功!\n");
        l_msg_temp.seqId=l_id_temp;
      break;
    }
    else if(l_CheckId_return==404)
    {
      printf("该id不存在,请重新输入!\n");
      continue;
    }
    else 
    {
      printf("数据输入不合理,请重新输入!\n");
      continue;
    }
  }
  
  client.GetUser(l_msg_temp,l_id_temp);

  if(l_msg_temp.seqId<0)
  {
    std::cout<<"GetUser失败"<<std::endl;
    return l_msg_temp.seqId;
  }
  printf("你正在修改的数据内容如下:\n");
  
  if(OutPut(l_msg_temp))//得到服务端的该数据 拿到客户端
  {
    std::cout<<"客户端输出失败"<<std::endl;
    return 3;
  }

  //循环修改数据 客户端
  while (1)
  {
    
    printf("请键入数字以选择你想要修改的数据内容:\n");
    printf("1、name 2、sex 3、age 4、tel 5、email 6、personal describe 7、exit\n");
    int l_updata_type=0;
    
    std::cin>>l_updata_type;
    if(l_updata_type>7||l_updata_type<0)
    {
      //键入不合理的数字或其他字符 请重新输入
      printf("键入不合理的数字或其他字符 请重新输入!\n");
      continue;
    }
    if(l_updata_type==7)
    {
      break;
    }
    switch (l_updata_type)
    {
      case 1:
        printf("修改name中!\n");
        printf("请输入你想修改用户的新name\n");
        while (1)
        {
          std::cin>>l_name_temp;
          if(l_name_temp.size()<=20)
          {
            printf("用户name输入成功!\n");
            l_msg_temp.name=l_name_temp;
            break;
          }
          else 
          {
            printf("name输入太长,请重新输入!\n");
            continue;
          }
        }
        break;

      case 2:
        printf("修改sex中!\n");
        printf("请输入你修改用户的新sex:0代表男,1代表女\n");
        while (1)
        {
          std::cin>>l_sex_temp;
          if(l_sex_temp==0||l_sex_temp==1)
          {
            printf("用户sex输入成功!\n");
            l_msg_temp.sex=l_sex_temp;
            break;
          }
          else 
          {
            printf("sex输入错误,请重新输入!\n");
            continue;
          }
        }
        break;

      case 3:
        printf("修改age中!\n");
        printf("请输入你修改用户的新age\n");
        while (1)
        {
          std::cin>>l_age_temp;
          int l_flag_error=0;
          for(int i=0;i<l_age_temp.size();i++)
          {
            if(l_age_temp[i]>'9'||l_age_temp[i]<'0')
            {
              l_flag_error=1;
              break;
            }
          }

          if(l_flag_error==1)
          {
            printf("age输入错误,请重新输入!\n");
            continue;
          }
          else if(l_age_temp.size()<=3&&l_age_temp.size()>=1)
          {
            printf("用户age输入成功!\n");
            l_msg_temp.age=l_age_temp;
            break;
          }
          else 
          {
            printf("age输入有其他错误,请重新输入!\n");
            continue;
          }          
          
        }
        break;

      case 4:
        printf("修改tel中!\n");
        printf("请输入你修改用户的新tel\n");
        while (1)
        {
          std::cin>>l_tel_temp;

          int l_check_flag=0;
          for(int i=0;i<l_tel_temp.size();i++)
          {
            if(l_tel_temp[i]<'0'||l_tel_temp[i]>'9')
            {
              printf("tel输入非法字符,请重新输入!\n");
              l_check_flag=1;
              break;
            }
          }
          if(l_check_flag)continue;
          if(l_tel_temp.size()<=20)
          {
            printf("用户tel输入成功!\n");
            l_msg_temp.tel=l_tel_temp;
            break;
          }
          else 
          {
            printf("tel输入过长错误,请重新输入!\n");
            continue;
          }
        }
        break;

      case 5:here
        printf("修改email中!\n");
        printf("请输入你修改用户的新email\n");
        while (1)
        {
          std::cin>>l_email_temp;
          if(l_email_temp.size()<=30)
          {
            printf("用户email输入成功!\n");
            l_msg_temp.email=l_email_temp;
            break;
          }
          else 
          {
            printf("email输入过长错误,请重新输入!\n");
            continue;
          }
        }
        break;

      case 6:here
        printf("修改personal_describe中!\n");
        printf("请输入你修改用户的新personal_describe\n");
        while (1)
        {
          getchar();
          getline(std::cin,l_desc_temp);
          if(l_desc_temp.size()<=150)
          {
            printf("用户personal_describe输入成功!\n");
            l_msg_temp.pers_desc=l_desc_temp;
            break;
          }
          else 
          {
            printf("personal_describe输入过长错误,请重新输入!\n");
            continue;
          }
        }
        break;
      
      
      default:
        break;
    }
    printf("数据输入成功...可以继续输入你想修改的部分...想退出修改请键入7\n");
    
  }

  //将修改好的数据传入到服务端
  int l_UpdataUser_return=client.UpdataUser(l_id_temp,l_msg_temp);
  if(l_UpdataUser_return!=0)
  {
    printf("修改后数据写入数据库失败!\n");
    return l_UpdataUser_return;
  }
  client.GetUser(l_msg_temp,l_id_temp);
  printf("修改完毕,修改后数据如下...\n");
  if(OutPut(l_msg_temp))//得到服务端的该数据 拿到客户端
  {
    std::cout<<"客户端输出失败2"<<std::endl;
    return 3;
  }
  return 0;
}
int main() 
{   
  
  std::cout<<1<<std::endl;
  std::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));   
  std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));   
  std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));

  serDemoClient client(protocol);   
   
  transport->open();   
  
  // 代码
  // message msg;  
  // msg.seqId = 123456;  
  // msg.name = "Zed222";    
  // msg.sex =0;
  // msg.age=22;
  // msg.tel="1756915";
  // msg.email="1208086@qq.com";
  // msg.pers_desc="He is a handsome man";
  // CreateUser
  // std::cout<<client.OutPut(123456)<<std::endl;  
  while(1)
  {
    printf("请输入你想进行的操作 1、查询用户 2、创建用户 3、更新用户 0、退出\n");
    int flag=0;
    std::cin>>flag;
    if(flag>3||flag<0)
    {
      printf("键入数字错误 请重新输入\n");
      continue;
    }
    if(flag==0)break;
    switch (flag)
    {
    case 1:
      FindUser(client);
      break;
    case 2:
      CreateUser(client);
      break;
    case 3:
      UpdataUser(client);
      break;
    default:
      break;
    }
  }
  

  printf("退出系统\n");
  transport->close();   
  
  
  return 0;   
}  

生产者部分

#include "serDemo.h"
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include "rdkafka.h"
#include <fstream>    
#include <sstream> 
#include <map>
#include <string>
#include <pthread.h>
#include <vector>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <mysql8/mysql.h>
#include <cstdio>
#include <memory>   
#include <jsoncpp/json/json.h> 
#include <hiredis.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
volatile sig_atomic_t run = 1;
//return 501 反序列化失败
//return 502 rd_kafka_conf_set失败
//return 503 rd_kafka_new失败
void stop(int sig) 
{
        run=0;
        fclose(stdin); /* abort fgets() */
}

static void dr_msg_cb(rd_kafka_t *l_rk, const rd_kafka_message_t *l_rkmessage, void *l_opaque) 
{
        if (l_rkmessage->err)
        {
            fprintf(stderr,"%% Message delivery failed: %s\n",rd_kafka_err2str(l_rkmessage->err));
        }    
        else
        {
            fprintf(stderr,"%% Message delivered (%zd bytes, partition %" PRId32 ")\n",
            l_rkmessage->len, l_rkmessage->partition);
        }
        // l_rkmessage 能自动销毁by librdkafka 
}


int Prod2Cons(message msg)
{
    //-----------------------msg to json---------------
    Json::Value root,root2;
    root["ID"]=msg.seqId;
    root["name"]=msg.name;
    root["sex"]=msg.sex;
    root["age"]=msg.age;
    root["tel"]=msg.tel;
    root["email"]=msg.email;
    root["pers_desc"]=msg.pers_desc;
    
    //-----------------------msg to json---------------

    
    //-----------------------json to str---------------
   
    Json::StreamWriterBuilder swb;   
    std::shared_ptr <Json::StreamWriter > sw(swb.newStreamWriter() ) ; //sw指向swb
    
    //通过sw的write将root序列化成Json字符串,并存储到str中    
    std::stringstream str;    
    sw->write(root,&str);   
    
    std::string str_serialization=str.str(); 
    char * p=(char*)str_serialization.data();
    //char *str_char=str_serialization.c_str();
    if(p==NULL)
    {
        printf("反序列化失败\n");
        return 501;
    }
    //-----------------------json to str---------------



    //-----------------------producer模板---------------
    char l_brokers[100]="192.168.100.160:9092";
    char l_topic[100]="TestTopicForDocker";
     
    rd_kafka_t *l_rk=NULL;        // 生产者实例句柄
    rd_kafka_conf_t *l_conf=NULL; // 临时配置对象
    char l_errstr[512]={};      // API错误报告缓冲区
    char l_buf[512]={};         // 消息值临时缓冲区

    l_conf=rd_kafka_conf_new();//创建kafka 客户端配置

    //配置各项参数 其中kafka port默认为9022号端口
    if(rd_kafka_conf_set(l_conf,"bootstrap.servers", l_brokers, l_errstr,sizeof(l_errstr)) != RD_KAFKA_CONF_OK) 
    {//若返回结果不等于这个enum类型 错误结果打印
        fprintf(stderr, "%s\n", l_errstr);
        rd_kafka_conf_destroy(l_conf);
        //
        return 502;
    }

    //设置发送回调函数 用以反馈信息发送的成败
    rd_kafka_conf_set_dr_msg_cb(l_conf, dr_msg_cb); //设置发送回调函数 一旦生产的每条消息被rd_kafka_produce函数接收,投递报告回调函数会被立即调用

    //初始化一个顶层对象 的基础容器 用于全局配置和共享状态
    //conf在rd_kafka_new调之后不能被再次使用·
    l_rk=rd_kafka_new(RD_KAFKA_PRODUCER, l_conf, l_errstr, sizeof(l_errstr));
    if(!l_rk)
    {
        fprintf(stderr, "%% Failed to create new producer: %s\n",l_errstr);
        rd_kafka_conf_destroy(l_conf);//
        return 503;
    }

    //如果分配成功 临时配置对象会在rd_kafka_new中被释放 此时无需再rd_kafka_conf_destroy
    l_conf=NULL;
    /* Signal handler for clean shutdown */
    signal(SIGINT, stop);

    fprintf(stderr,//打印...
            "%% Type some text and hit enter to produce message\n"
            "%% Or just hit enter to only serve delivery reports\n"
            "%% Press Ctrl-C or Ctrl-D to exit\n");
    int continue_times=0;
    if(run)
    {
        rd_kafka_resp_err_t l_err;//错误代码提示
        
            // SendProduce message.
            // 这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列,
            // 对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb)
            // 用于在消息传递成功或失败时向应用程序发回信号
    
        while(1)
        {
            l_err=rd_kafka_producev(
            l_rk,//顶层对象 的基础容器 用于全局配置和共享状态,之前通过rd_kafka_topic_new()生成
            RD_KAFKA_V_TOPIC(l_topic),//topic
            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),//表示librdkafka 在信息发送前立即从 payload 做一份拷贝
            RD_KAFKA_V_VALUE(p, str_serialization.size()),//消息和长度//传信息
            RD_KAFKA_V_OPAQUE(NULL),//可选的,应用程序为每个消息提供的无类型指针,提供给消息发送回调函数,用于应用程序引用。
            RD_KAFKA_V_END); /* End sentinel */

            if(l_err)
            {//若该值不是0则为一个错误码
                fprintf(stderr,"%% Failed to produce to topic %s: %s\n", l_topic, rd_kafka_err2str(l_err));

                if(l_err==RD_KAFKA_RESP_ERR__QUEUE_FULL) 
                {

                //如果内部队列已满,请等待消息传递,然后重试。
                //内部队列表示要发送的消息和已经发送或失败的消息,等待调用它们的传递报告回调。
                
                    rd_kafka_poll(l_rk,1000 );//阻塞等待消息1000ms至发送完成或其它操作
                    continue_times++;
                    continue;//再传!
                }
            } 
            else 
            {//若该值为0 传送成功 打印出相应成果
                fprintf(stderr,"%% Enqueued message (%zd bytes) for topic %s\n",str_serialization.size(), l_topic);
                break;
            }
        }
                
        // producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为
        // 传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其
        // 发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll()
        // 仍然被调用
        rd_kafka_poll(l_rk, 0);
    }


    //   rd_kafka_flush是rd_kafka_poll()的抽象化,
    //   等待所有未完成的produce请求完成,通常在销毁producer实例前完成
    //   以确保所有排列中和正在传输的produce请求在销毁前完成
    fprintf(stderr, "%% Flushing final messages..\n");
    rd_kafka_flush(l_rk,10*1000);

    //如果输出队列仍然不为空,则存在向集群生成消息的问题。
    if(rd_kafka_outq_len(l_rk)>0)
    {
        fprintf(stderr,"%% %d message(s) were not delivered\n",rd_kafka_outq_len(l_rk));
    }
    
    rd_kafka_destroy(l_rk);//万事俱备 销毁实例
    //-----------------------producer模板---------------
    return 0;
}


消费者部分

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <iostream>
#include <cstdio>
#include <ctype.h>
#include "rdkafka.h"
#include<jsoncpp/json/json.h> 

volatile sig_atomic_t g_run = 1;

void stop(int sig) {
        g_run = 0;
}


int is_printable(void *buf1, size_t size)
{
        size_t i;
        char *l_buf=(char *)buf1;//
        for(i=0;i<size;i++)
            if(!isprint((int)l_buf[i]))
                return 0;

        return 1;
}


int main(int argc, char **argv) {
        rd_kafka_t *l_rk=NULL;          //生产者实例句柄
        rd_kafka_conf_t *l_conf=NULL;   //临时配置对象
        rd_kafka_resp_err_t l_err; // librdkafka API 错误代码
        char l_errstr[512]={0};        // librdkafka API 错误缓冲区内容
        char *l_brokers=NULL;       //  broker list 
        char *l_groupid=NULL;       //组id
        char **l_topics=NULL;       //要订阅的 topic 
        int l_topic_cnt=0;          //要订阅的topic数量
        rd_kafka_topic_partition_list_t *l_subscription=NULL; 
        int i=0;

        if (argc<4) 
        {
            fprintf(stderr,"%% Usage: ""%s <broker> <group.id> <topic1> <topic2>..\n",argv[0]);
            return 1;
        }

        l_brokers=argv[1];
        l_groupid=argv[2];
        l_topics=&argv[3];
        l_topic_cnt=argc-3;


        
        l_conf = rd_kafka_conf_new();

        //配置设置ip,端口 默认9092
        if(rd_kafka_conf_set(l_conf, "bootstrap.servers", l_brokers, l_errstr,sizeof(l_errstr)) != RD_KAFKA_CONF_OK)
        {
            fprintf(stderr, "%s\n", l_errstr);
            //销毁l_conf
            rd_kafka_conf_destroy(l_conf);
            return 2;
        }

        
        //配置消费者组id
         //共享相同组id的所有消费者将加入同一个组,订阅的topic'partitions将根据partition.assignment.strategy(消费者配置属性)分配给组中的消费者。
        if(rd_kafka_conf_set(l_conf, "group.id", l_groupid, l_errstr,sizeof(l_errstr)) != RD_KAFKA_CONF_OK)
        {
            fprintf(stderr, "%s\n", l_errstr);
            rd_kafka_conf_destroy(l_conf);
            return 3;
        }
        
        // 如果一个分区之前没有提交偏移量,则auto.offset.reset策略将用于决定在分区的哪个位置开始获取消息。
        // 通过将此设置为最早,如果之前没有提交偏移量,消费者将读取分区中的所有消息。

        if(rd_kafka_conf_set(l_conf, "auto.offset.reset", "earliest", l_errstr,
                              sizeof(l_errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%s\n", l_errstr);
                rd_kafka_conf_destroy(l_conf);//
                return 4;
        }

        // 创建使用者实例。实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态
        // 注意:rd_kafka_new()拥有conf对象的所有权,在调用后应用程序不能再次引用它。
        l_rk = rd_kafka_new(RD_KAFKA_CONSUMER, l_conf, l_errstr, sizeof(l_errstr));
        
        //在分配成功的情况下 这里l_conf的配置临时配置对象其实已经被rd_kafka_new释放
        
        
        if(!l_rk)
        {
            fprintf(stderr, "%% Failed to create new consumer: %s\n",l_errstr);
            
            //没分配成功的需要 再手动释放
            rd_kafka_conf_destroy(l_conf);
            return 5;
        }
        std::cout<<223<<std::endl;//-----------------
        //rd_kafka_conf_destroy(l_conf);这里不需要再销毁配置对象 
        l_conf=NULL; //配置对象现在由rd_kafka_t实例拥有和释放

         //重定向 rd_kafka_poll()队列到consumer_poll()队列
        rd_kafka_poll_set_consumer(l_rk);

        //将所有topic list转化为 适合kafka的格式
        l_subscription=rd_kafka_topic_partition_list_new(l_topic_cnt);
        for (i=0;i<l_topic_cnt;i++)
                rd_kafka_topic_partition_list_add(l_subscription, l_topics[i],
                                                  /* the partition is ignored
                                                   * by subscribe() */RD_KAFKA_PARTITION_UA);

        // 订阅 l_topics list
        l_err=rd_kafka_subscribe(l_rk, l_subscription);
        if (l_err)
        {//订阅出错
                
            fprintf(stderr, "%% Failed to subscribe to %d l_topics: %s\n",l_subscription->cnt,rd_kafka_err2str(l_err));
            rd_kafka_topic_partition_list_destroy(l_subscription);
            rd_kafka_destroy(l_rk);
            return 6;
        }

        fprintf(stderr,"%% Subscribed to %d topic(s),waiting for rebalance and messages...\n",l_subscription->cnt);

        rd_kafka_topic_partition_list_destroy(l_subscription);//释放l_topics list使用的所有资源和它自己

        /* Signal handler for clean shutdown */
        signal(SIGINT, stop);
        std::cout<<112<<std::endl;//-----------------
        while (g_run)
        {
            rd_kafka_message_t *l_rkm;
            //std::cout<<112<<std::endl;//-----------------
            l_rkm = rd_kafka_consumer_poll(l_rk, 100);//从consumer_poll读取信息
            if (!l_rkm)
                continue; //超时就重新尝试
            //返回错误的信息
            if (l_rkm->err) 
            {
                fprintf(stderr, "%% Consumer error: %s\n",
                        rd_kafka_message_errstr(l_rkm));
                rd_kafka_message_destroy(l_rkm);
                continue;
            }


            //正确就如下
            //打印 字段 分区 offset等信息
            printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
                rd_kafka_topic_name(l_rkm->rkt), l_rkm->partition,
                l_rkm->offset);

            // /* Print the message key. */
            // if (l_rkm->key && is_printable(l_rkm->key, l_rkm->key_len))
            //         printf(" Key: %.*s\n", (int)l_rkm->key_len,
            //                (const char *)l_rkm->key);
            // else if (l_rkm->key)
            //         printf(" Key: (%d bytes)\n", (int)l_rkm->key_len);
            //打印信息字段

            
            //--------------进行反序列化操作 ---------------
            Json::Reader reader(Json::Features::strictMode());
            Json::Value root; 
            
            if(reader.parse((const char *)l_rkm->payload,root)==0)
            {   
                std::cout<<"reader.parse赋值错误!"<<std::endl;
                return 7;
            }
            std::cout<<"ID:"<<root["ID"]<<"新得到的数据如下"<<std::endl;
            std::cout<<"ID:"<<root["ID"]<<std::endl;
            std::cout<<"name:"<<root["name"]<<std::endl;
            std::cout<<"sex:"<<(root["sex"].asInt())<<std::endl;
            std::cout<<"age:"<<root["age"]<<std::endl;
            std::cout<<"tel:"<<root["tel"]<<std::endl;
            std::cout<<"email:"<<root["email"]<<std::endl;
            std::cout<<"pers_desc:"<<root["pers_desc"]<<std::endl;
            // printf("ID:%d\nID:%d\n")
            //--------------进行反序列化操作 ---------------


            if (l_rkm->payload && is_printable(l_rkm->payload, l_rkm->len))
                printf(" Value: %.*s\n", (int)l_rkm->len,(const char *)l_rkm->payload);
            else if (l_rkm->payload)
                printf(" Value: (%d bytes)\n", (int)l_rkm->len);

            rd_kafka_message_destroy(l_rkm);
        }


        //关闭消费者 离开组
        fprintf(stderr, "%% Closing consumer\n");
        rd_kafka_consumer_close(l_rk);

        //销毁消费者
        rd_kafka_destroy(l_rk);

        return 0;
}

四、运行展示

运行方式

    1启动服务器 默认 redis端口6379 mysql端口3306 thirft 端口9090 
    ./server 
    2启动客户端
    ./client 
    3启动消费者 接受变化信息 默认 kafka端口9092
    ./consumer 192.168.100.160:9092 0 TestTopicForDocker 

客户端

服务端

消费者

 五、问题总结

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/439906.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP+Vue+java导师学生双选系统设计与实现springnboot+pyton

为了直观显示系统的功能&#xff0c;运用用例图这样的工具显示分析的结果。分析的管理员功能如下。管理员管理学员&#xff0c;导师&#xff0c;管理项目信息&#xff0c;管理项目提交&#xff0c;管理指导项目信息。运行环境:phpstudy/wamp/xammp等 卓越导师双选系统根据调研&…

GCC 常用命令

GCC 编译过程 一个 C/C文件要经过预处理(preprocessing)、编译(compilation)、汇编(assembly)和链接(linking) 等 4 步才能变成可执行文件 &#xff08;1&#xff09; 预处理 C/C源文件中&#xff0c;以“#”开头的命令被称为预处理命令&#xff0c;如包含命令“#include”、…

系统集成项目管理工程师案例分析考点汇总(沟通/干系人、风险、合同等)

沟通及干系人管理常见考点1. 沟通管理计划的内容2. 项目绩效报告的主要内容3. 沟通中容易出现的问题4. 如何采取有效措施改进沟通5. 如何召开有效的会议 合同管理常见考点1. 合同签订时应注意的内容及条款2. 合同管理常见的问题3. 合同管理问题的应对措施 采购管理常见考点1. 采…

IntelliJ 上 Azure Event Hubs 全新支持来了!

大家好&#xff0c;欢迎来到 Java on Azure Tooling 的3月更新。在这次更新中&#xff0c;我们将介绍 Azure Event Hubs 支持、Azure Functions 的模板增强&#xff0c;以及在 IntelliJ IDEA 中部署 Azure Spring Apps 时的日志流改进。要使用这些新功能&#xff0c;请下载并安…

使用docker搭建Milvus向量数据库

Milvus向量数据库是什么&#xff1f; 官网是这样说的&#xff1a; Milvus创建于2019年&#xff0c;目标单一&#xff1a;存储、索引和管理由深度神经网络和其他机器学习&#xff08;ML&#xff09;模型生成的大量嵌入向量。 作为一个专门用于处理输入向量查询的数据库&#…

操作系统原理 —— 什么是系统调用?(五)

什么是系统调用&#xff1f; 有什么用作用&#xff1f; 我们想想怎么是系统调用呢&#xff1f; 系统调用 是操作系统提供给应用程序(程序员)使用的接口&#xff0c;可以理解为一种可供应用程序调用的特殊函数&#xff0c;应用程序可以发出系统调用请求来获得操作系统的服务。…

编译原理复习(2023.4.25考试版本)

本次复习采用的是这本书&#xff0c;如有书写不当的地方&#xff0c;欢迎批评指正&#xff01; 第一章 第二章 符号串的运算 相等&#xff1a;两个符号串一模一样的 长度&#xff1a;数他有几个就行了 连接&#xff1a;跟在后面直接写就行了 符号传串的逆&#xff1a;在符…

详解UDP协议与实现UDP版本字典翻译客户端与服务器

文章目录 前言1. UDP协议介绍2.UDP Socket的介绍3. UDP版本字典翻译服务器4. UDP版本字典翻译客户端 前言 UDP协议也是传输层的一种协议&#xff0c;上篇文章我们介绍了TCP协议可以参考我的另一篇博客详解TCP协议以及实现TCP版本的字典翻译服务器客户端&#xff0c;以下来介绍…

【k8s】Wordpress(PHP+nginx+mysql)迁移到k8s

一、迁移思路&#xff1a; 1、制作服务镜像; 1.1 挑选合适的基础镜像; 1.2 准备代码相关的文件; 1.3 通过dockerfile构建镜像;2、制作Kubernetes服务&#xff0c;并完成调度; 2.1确定服务运行的模式&#xff08;内部运行or 对外提供); 2.2确定服务所使用的控制器; 2.3服务是否…

【邀请函】第四届宁德国际新能源电池与智造技术产业大会(4月26 宁德)| 达索系统百世慧®

未来5-10年&#xff0c;新能源电池行业将呈现“一大支柱、两大应用场景、多元化技术线路”的发展特征。动力锂电池仍将主导新能源电池产业&#xff0c;并加速乘用场景落地。随着技术的日趋成熟&#xff0c;量产后成本下降&#xff0c;优势逐步凸显。 但随着技术迭代&#xff0…

Maya 贴图链接检测重链打包插件tjh_lost_textures_finder 1.3.3

一、问题描述&#xff1a; maya在模型材质贴图及渲染制作流程中&#xff0c;经常会遇到工程文件路径更改后&#xff0c;图片链接失效的问题&#xff0c;还有就是萌新们不懂规矩&#xff0c;图片路径乱放&#xff0c;而造成的图片打包时巨大工作量&#xff0c;南无阿弥陀佛。此…

vue关于echarts后端返回格式取值方法

在vue中&#xff0c;接口返回如下数据&#xff1a; data: {充电桩: [0, 0, 78], 红外摄像头: [0, 0, 0], 火焰探测器: [0, 1, 0], 烟雾传感器: [0, 1, 1], 限流保护器: [0, 0, 1]}&#xff0c; 其中数组里第一个值应该放在data1&#xff0c;第二个放在data2&#xff0c;第三个…

LCHub:微软Power Apps成为保险行业最受欢迎低代码平台

全球领先的保险行业软件服务商Adacta发布了《保险行业低代码应用情况》报告,深度揭示了保险领域对低代码的应用、市场增长、发展趋势等。 该报告对来自德国、奥地利、瑞士、法国、英国、比利时、荷兰和卢森堡等国家的100名保险行业的高管进行了深度调查。(发送“保险低代码”…

社科院杜兰大学金融管理硕士项目——不要停止奔跑,前方更值得期待

不要停止奔跑&#xff0c;不要回顾来路&#xff0c;来路无可眷恋&#xff0c;唯有前方值得期待。——《马男波杰克》。这段话很适用在职的我们&#xff0c;当我们在职场经过打磨&#xff0c;我们被磨平了棱角&#xff0c;被磨掉了脾气&#xff0c;但我们依然相信前方会更值得我…

安装和部署elasticsearch

文章目录 一、安装elasticsearch1.1 部署单点elasticsearch1.1.1 创建网络1.1.2 安装镜像1.1.3 部署1.1.4 测试 1.2 部署kibana1.2.1 安装镜像1.2.2 部署1.2.3 测试1.2.4 DevTools工具 1.3 安装IK分词器1.3.1 安装ik插件1.3.2 查找数据卷目录1.3.3 上传至容器1.3.4 测试 一、安…

腾讯优图入选人工智能顶级会议AAAI论文--10篇

人工智能领域的国际顶级会议AAAI 2020将于2月7日-2月12日在美国纽约举办。近年来随着人工智能的兴起&#xff0c;AAAI每年举办的学术会议也变得越来越火热&#xff0c;每年都吸引了大量来自学术界、产业界的研究员、开发者投稿、参会。 以AAAI2019为例&#xff0c;论文提交数量…

工单管理系统的好处

工单管理系统是一款能够实现业务流程管理&#xff0c;工作任务管理&#xff0c;服务项目管理的工具。该系统能够帮助企业提高客户服务质量、加快企业服务响应速度、降低企业运营成本。 工单管理系统能通过制定计划和控制流程来减少客户投诉&#xff0c;提高客户满意度。可以说…

为什么安全从业者应该了解业务

不久之前&#xff0c;网络安全还被视为独立于企业其他部分的事物。但在过去的十年里&#xff0c;它终于得到了当之无愧、渴望已久的认可和关注。 越来越多的公司聘请首席信息安全官来帮助制定整体业务战略&#xff0c;使安全成为公司董事会的首要任务。 最后&#xff0c;首席…

k8s允许master节点参与调度的设置方法

默认情况下&#xff0c;master节点是不参与调度的&#xff0c;且在master节点上有一个污点NoSchedule&#xff08;表示k8s将不会将Pod调度到具有该污点的Node上&#xff09;&#xff0c;有关污点章节可参考https://www.cnblogs.com/panw/p/16343392.html 如果想让master节点参与…

云原生可观测性平台deepflow部署

deepflow是什么 DeepFlow 是云杉网络 (opens new window)开源的一款高度自动化的可观测性平台&#xff0c;是为云原生应用开发者建设可观测性能力而量身打造的全栈、全链路、高性能数据引擎。DeepFlow 使用 eBPF、WASM、OpenTelemetry 等新技术&#xff0c;创新的实现了 AutoT…