不知不觉入职已经一个月了,近期提交了考核2,要求如下:
1、编写一个管理用户信息的服务,通过thrift的远程过程调用实现用户信息管理功能
2、用户信息至少包括 唯一ID、用户名、性别、年龄、手机号、邮箱地址、个人描述
3、提供创建用户、查询用户信息、修改用户信息接口,其中修改用户信息要求除用户ID外其它信息可单独修改
4、数据存储要求使用mysql落地数据(使用存储过程)、使用redis做数据缓存、数据发生变化时通过kafka发送变化消息(使用json数据格式)
5、实现用户信息操作客户端(通过命令行操作)
6、实现用户信息变化处理服务,用户信息变化时输出变化的内容
一、主要思路
首先是用Thrift框架搭建服务端和客户端,其中包括各模块功能,然后服务端将数据同MySQL服务器和redis缓存服务器进行连接。并在用户信息变化时通过生产者传递信息给kafka中的消费者。
查询模块流程
查询(先查询redis 再从查询mysql)
1客户端选择想要进行的操作 ->
2调用服务端函数 ->
3服务端查询redis是否存在数据 若没有->4 若有->6
4服务端查询mysql 若没有->7 若有->5
5写回redis ->
6返回数据给客户端 ->
7返回错误代码 ->
创建模块流程
插入 (先插入mysql 再插入redis 再传递kafka)
1客户端选择想要进行的操作 ->
2调用服务端函数 ->
3服务端查询mysql是否存在数据 若没有->4 若有->10
4插入mysql ->
5插入redis ->
6传递数据给producer->
7producer序列化并传递数据给consumer->
8consumer反序化并输出->
9返回数据给客户端 ->
10返回错误代码 ->
修改模块流程
修改 (先修改mysql 再修改redis 再传递kafka)
1客户端选择想要进行的操作 ->
2调用服务端函数 ->
3服务端查询mysql是否存在数据 若没有->4 若有->10
4修改mysql ->
5修改redis ->
6传递数据给producer->
7producer序列化并传递数据给consumer->
8consumer反序化并输出->
9返回数据给客户端 ->
10返回错误代码 ->
函数返回值错误代码:
return -2没有该用户id
return -1id不合法
return -101 服务端getuser失败
return 1数据插入数据库失败
return 2新数据修改数据库vec失败!
return 3客户端输出失败
return 301 redisContext链接失败
return 302 reply 获取失败
return 303 reply->type 类型不对
return 304 reply 数值不对
return 401 mysql语句返回错误
return 402 l_result返回错误
return 403 判断存储过程中的return是否出错
return 404 数据库没有该id的数据
return 405 row[0]为null
return 406 初始化mysql失败
return 407 mysql连接失败
return 501 反序列化失败
return 502 rd_kafka_conf_set失败
return 503 rd_kafka_new失败
二、数据格式
MYSQL数据表中的数据格式
数据库 数据表格式:
create table test(id int primary key ,name varchar(20) not null DEFAULT 'unknow',sex bool, age varchar(20) not null DEFAULT 'unknow', tel varchar(20) not null DEFAULT 'unknow', email varchar(30) not null DEFAULT 'unknow', pers_desc varchar(150) not null DEFAULT 'unknow')
MYSQL中的存储过程
存储过程:
queryuser:
mysql> create procedure queryuser (in in_id int,out out_ret int,out out_name varchar(20),out out_sex bool,out out_age varchar(20),out out_tel varchar(20),out out_email varchar(30),out out_pers_desc varchar(150))
-> label_a:begin
-> declare v_name varchar(20) ;
-> declare v_sex bool ;
-> declare v_age varchar(20) ;
-> declare v_tel varchar(20) ;
-> declare v_email varchar(30) ;
-> declare v_pers_desc varchar(150) ;
-> if (in_id<=0) then
-> set out_ret=-1; #id error
-> SELECT out_ret;
-> leave label_a;
-> end if;
-> SELECT name,sex,age,tel,email,pers_desc into v_name,v_sex,v_age,v_tel,v_email,v_pers_desc from test where id=in_id limit 1;
-> if v_name is NULL then
-> set out_ret=-2; #id null
-> SELECT out_ret;
-> leave label_a;
-> end if;
-> set out_ret=0;
-> set out_name=v_name;
-> set out_sex=v_sex;
-> set out_age=v_age;
-> set out_tel=v_tel;
-> set out_email=v_email;
-> set out_pers_desc=v_pers_desc;
-> SELECT out_ret,out_name,out_sex,out_age,out_tel,out_email,out_pers_desc;
-> end;
//call queryuser (1, @ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_pers_desc);
insertuser:
mysql> create procedure insertuser (in in_id int,in in_name varchar(20),in in_sex bool,in in_age varchar(20),in in_tel varchar(20),in in_email varchar(30),in in_pers_desc varchar(150),out out_ret int)
-> label_a:begin
-> if (in_id<=0) then
-> set out_ret=-1; #id error
-> SELECT out_ret;
-> leave label_a;
-> end if;
-> insert into test (id,name,sex,age,tel,email,pers_desc) values (in_id,in_name,in_sex,in_age,in_tel,in_email,in_pers_desc);
-> set out_ret=0;
-> SELECT out_ret;
-> end;
//call insertuser (2,'renjianxu',0,22,123,123,cool man,@ret);
updatauser:
mysql> create procedure updatauser (in in_id int,in in_name varchar(20),in in_sex bool,in in_age varchar(20),in in_tel varchar(20),in in_email varchar(30),in in_pers_desc varchar(150),out out_ret int)
-> label_a:begin
-> if (in_id<=0) then
-> set out_ret=-1; #id error
-> SELECT out_ret;
-> leave label_a;
-> end if;
-> update test set name=in_name,sex=in_sex,age=in_age,tel=in_tel,email=in_email,pers_desc=in_pers_desc where id=in_id;
-> set out_ret=0;
-> SELECT out_ret;
-> end;
//call updatauser (2,'renjianxu',0,23,123,123,cool man,@ret);
Redis数据格式
Redis数据格式:
127.0.0.1:6379> hgetall user1
1) "name"
2) "zed"
3) "sex"
4) "0"
5) "age"
6) "22"
7) "tel"
8) "1756915"
9) "email"
10) "1208086"
11) "pers_desc"
12) "handsome man"
Json数据格式
Json数据格式:
root["ID"]=msg.seqId;
root["name"]=msg.name;
root["sex"]=msg.sex;
root["age"]=msg.age;
root["tel"]=msg.tel;
root["email"]=msg.email;
root["pers_desc"]=msg.pers_desc;
举例:
ID:1
name:"zed"
sex:0
age:"22"
tel:"17569150186"
email:"1208086"
pers_desc:"handsome man"
三、代码
代码结构
Datainfo.thrift
client.cpp
consumer.cpp
gen-cpp/Datainfo_types.cpp
gen-cpp/Datainfo_types.h
gen-cpp/Makefile
gen-cpp/Producer.cpp
gen-cpp/Producer.h
gen-cpp/serDemo.cpp
gen-cpp/serDemo.h
gen-cpp/serDemo_server.skeleton.cpp
readme
编译方式
对serDemo_server.skeleton.cpp client.cpp consumer.cpp 分别编译
编译serDemo_server.skeleton.cpp和client.cpp时 分别将cpp放在gen-cpp文件夹里 然后修改Makefile文件中的SRCS和TARGER
编译consumer.cpp时不需要串联其他cpp 只需要编译自身即可
Makefile部分
#
SRCS := serDemo_server.skeleton.cpp Datainfo_types.cpp serDemo.cpp Producer.cpp
OBJS := $(patsubst %.cpp,%.o,$(SRCS))#得到所有生成终极目标的依赖:(.o文件)
FLAG1 :=-lmysqlclient -lthrift -lhiredis -lrdkafka -ljsoncpp
FLAG2 :=-L/root/thrift-0.18.1/lib/cpp/.libs -L/usr/lib/mysql
FLAG3 :=-I/root/thrift-0.18.1/lib/cpp/src -I/usr/include/mysql8
FLAG4 :=-std=c++11
TARGER := server
$(TARGER):$(OBJS) #终极目标
g++ -o $@ $^ $(FLAG1) $(FLAG2) $(FLAG3)
#DEPS := $(patsubst %.o, %.d, $(OBJS))
DEPS := $(patsubst %.cpp, %.d, $(SRCS))#将所有 .cpp文件 对应命名.d文件赋值给deps
ifneq ($(MAKECMDGOALS), clean)
-include $(DEPS)
endif
%.d: %.cpp
g++ -MM $< > $@ $(FLAG4)
%.o: %.cpp
g++ -c $< $(FLAG4)
.PHONY: clean
#清除规则 加入PHONY防止同名clean文件干扰
clean:
rm -f $(DEPS)
rm -f $(OBJS)
rm -f TARGER
服务器部分
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "serDemo.h"
#include "Producer.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <map>
#include <string>
#include <pthread.h>
#include <vector>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <mysql8/mysql.h>
#include <cstdio>
#include <hiredis.h>
// 读: 读redis->没有,读mysql->把mysql数据写回redis,有的话直接从redis中取;
// 写: 写mysql->成功,再写redis;
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
//l_msg_temp.id =-1 getuser 返回了一个空值
// return -2没有该用户id
// return -1id不合法
// return -101 服务端getuser失败
// return 1数据插入数据库失败
// return 2新数据修改数据库vec失败!
// return 3客户端输出失败
// return 301 redisContext链接失败
// return 302 reply 获取失败
// return 303 reply->type 类型不对
// return 304 reply 数值不对
// return 401 mysql语句返回错误
// return 402 l_result返回错误
// return 403 判断存储过程中的return是否出错
// return 404 数据库没有该id的数据
// return 405 row[0]为null
// return 406 初始化mysql失败
// return 407 mysql连接失败
// return 501 反序列化失败
// return 502 rd_kafka_conf_set失败
// return 503 rd_kafka_new失败
class serDemoHandler : virtual public serDemoIf {
public:
const char *hostname = "127.0.0.1";
const int l_redis_size=12;
const int l_redis_port=6379;
serDemoHandler() {
// Your initialization goes here
}
//查询用户
int32_t QueryUser(const int32_t seqId)
{
printf("\nQueryMysql...\n");
if(seqId<1||seqId>=0x7fffffff)
{
printf("用户id输入长度或数值不合法,请重新键入数字...\n");
return -1;
}
//先找redis
//-------------------操作redis部分--------------------↓
//
//1链接测试部分
printf("进行redis链接...先查询redis是否存在数据\n");
redisContext *connect=NULL;
redisReply *reply=NULL;
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);
if (connect == NULL || connect->err)
{
if (connect)
{
printf("connect error: %s\n", connect->errstr);
redisFree(connect); //释放redisConnect()所产生的连接。
}
else
{
printf("connect error: can't allocate redis context\n");
}
return 301;
}
//1链接测试部分
//2从redis中找数据部分
reply = (redisReply*)redisCommand(connect,"hgetall user%d",seqId);
if(reply==NULL)
{
std::cout<<"reply获取失败"<<std::endl;
redisFree(connect); //释放redisConnect()所产生的连接。
return 302;
}
else if(reply->type!=REDIS_REPLY_ARRAY)
{
std::cout<<"reply->type 类型错误"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 2;
}
else if(reply->elements==l_redis_size)//在redis中找到了
{
std::cout<<"queryuser函数中在redis中找到了"<<std::endl;
for(int i=0;i<reply->elements;i++)
{
if(reply->element[i]==NULL)
{
std::cout<<"reply获取失败"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 302;
}
else if(reply->element[i]->type!=REDIS_REPLY_STRING)
{
std::cout<<"数据展示出错2"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 303;
}
//1//输出一下
std::string l_strs_temp(reply->element[i]->str,reply->element[i]->len);
printf("index %d:%s\n",i,l_strs_temp.c_str());
if(i%2==1)//数据部分
{
//对于数据部分一些操作...
}
}
freeReplyObject(reply);
printf("查询redis完毕 找到数据结束redis链接...\n");
return 0;
}
printf("查询redis完毕 没找到数据结束redis链接...\n");
freeReplyObject(reply);
redisFree(connect);
//-------------------操作redis部分--------------------↑
//再找数据库
//-------------------操作数据库部分--------------------
printf("开始查找数据库...\n");
MYSQL* mysql = mysql_init(NULL);
//MYSQL* mysql;
if(mysql==NULL)
{
printf("初始化环境失败\n");
return 406;//
}
// 2. 连接数据库服务器
mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101",
"exam2", 3306, NULL, 0);
if(mysql == NULL)
{
printf("连接数据库服务器失败\n");
return 407;//
}
//4调用过程//
char l_call_cstr[100]={};
//std::string l_call_str="";
sprintf(l_call_cstr, "call queryuser (%d,@ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_desc);",seqId);
//l_call_str=l_call_cstr;
int l_ret1=mysql_query(mysql,l_call_cstr);
if(l_ret1!=0)
{
std::cout<<"error"<<std::endl;
return 401;
}
//返回结果测试
MYSQL_RES* l_result=mysql_store_result(mysql);
if(l_result==NULL)
{
printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
//mysql_free_result(l_result);
return 402;
}
//6展示
MYSQL_ROW row;
row = mysql_fetch_row(l_result);
int num=mysql_num_fields(l_result);
mysql_free_result(l_result);
//std::string s_temp="404";
if(strncmp(row[0],"404",3)==0)//strncmp(row[0],"0",1)!=0
{
printf("没有当前查询id\n");
return 404;//没有该id
}
else if(strncmp(row[0],"0",1)!=0)
{
printf("存储过程中的return出错%s\n",row[0]);
return 403;//判断存储过程中的return是否出错
}
else if(num!=7||row[0]==NULL||row[1]==NULL||row[2]==NULL||row[3]==NULL||row[4]==NULL||row[5]==NULL||row[6]==NULL)
{
std::cout<<num<<std::endl;
printf("row返回出错或row->num出错\n");
return 405;//判断存储过程中的return是否出错
}
else printf("存储过程返回值为[%s]\n数据为:%s\n%s\n%s\n%s\n%s\n%s\n%s\n",
row[0], row[1],row[2],row[3],row[4],row[5],row[6],row[7]);
//释放mysql资源
mysql_close(mysql);
printf("queryuser 函数在数据库中找到相关数据...\n");
//-------------------操作数据库部分--------------------↑
//-------------------操作redis部分--------------------↓
printf("插入mysql完毕...将数据插入到redis...\n");
connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);
//1链接测试部分
if (connect == NULL || connect->err)
{
if (connect)
{
printf("connect error: %s\n", connect->errstr);
redisFree(connect); //释放redisConnect()所产生的连接。
}
else
{
printf("connect error: can't allocate redis context\n");
}
return 301;
}
//2写入redis中数据部分
reply = (redisReply*)redisCommand(connect,"hmset user%d name %s sex %d age %s tel %s email %s pers_desc %s",seqId,row[1],(*(row[2])=='1'),row[3],row[4],row[5],row[6]);
if(reply==NULL)
{
std::cout<<"reply获取失败"<<std::endl;
redisFree(connect); //释放redisConnect()所产生的连接。
return 302;
}
else if(reply->type!=REDIS_REPLY_STATUS)
{
std::cout<<"reply->type 类型错误"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 303;
}
else if(strncmp("OK",reply->str,2)!=0)
{
std::cout<<"数据插入失败"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 304;
}
printf("query将数据插入redis完毕 ...\n");
//-------------------操作redis部分--------------------上
//3释放redis资源
freeReplyObject(reply);
redisFree(connect);
printf("QueryUser函数结束...\n\n");
return 0;//该id存在
}
int32_t InsertUser(const int32_t seqId,const message& msg)
{
printf("\nInsertUser...\n");
message l_temp_msg=msg;
int32_t l_Query_return=0;
//1 判断一下该id是否存在
l_Query_return=QueryUser(seqId);
if(l_Query_return==0)
{
printf("该id数据已经存在,插入失败!\n");
return l_Query_return;
}
else if(l_Query_return!=404)
{
printf("QueryMysql出错 l_Query_return=%d\n",l_Query_return);
return l_Query_return;
}
//先插入mysql
//-------------------操作数据库部分--------------------
//mysql指针初始化
MYSQL* mysql = mysql_init(NULL);
if(mysql==NULL)
{
printf("初始化环境失败\n");
return 1;//
}
// 2. 连接数据库服务器
mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101",
"exam2", 3306, NULL, 0);
if(mysql == NULL)
{
printf("连接数据库服务器失败\n");
return 2;//
}
//4调用过程
char l_call_cstr[100]={};
//std::string l_call_str="";
sprintf(l_call_cstr, "call insertuser (%d,'%s',%d,'%s','%s','%s','%s',@ret);",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
//l_call_str=l_call_cstr;
//printf("%s\n",l_call_cstr);
int32_t l_ret1=mysql_query(mysql, l_call_cstr);
if(l_ret1!=0)
{
std::cout<<"call_insertstudent_error"<<std::endl;
return 401;
}
//返回结果测试
// MYSQL_RES* l_result=mysql_store_result(mysql);
// if(l_result==NULL)
// {
// printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
// mysql_free_result(l_result);
// return 402;
// }
// mysql_free_result(l_result);
//返回结果测试
l_Query_return=QueryUser(seqId);
if(l_Query_return!=0)
{
printf("并没有插入进数据库..插入失败!\n");
return l_Query_return;
}
mysql_close(mysql);
printf("成功插入进数据库..插入成功!\n");
//-------------------操作数据库部分--------------------↑
//然后再插入redis
//-------------------操作redis部分--------------------↓
redisContext *connect=NULL;
redisReply *reply=NULL;
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);
if (connect == NULL || connect->err)
{
if (connect)
{
printf("connect error: %s\n", connect->errstr);
redisFree(connect); //释放redisConnect()所产生的连接。
}
else
{
printf("connect error: can't allocate redis context\n");
}
return 301;
}
//1链接测试部分
//2写入redis中数据部分
reply = (redisReply*)redisCommand(connect,"hmset user%d name %s sex %d age %s tel %s email %s pers_desc %s",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
if(reply==NULL)
{
std::cout<<"reply获取失败"<<std::endl;
redisFree(connect); //释放redisConnect()所产生的连接。
return 302;
}
else if(reply->type!=REDIS_REPLY_STATUS)
{
std::cout<<"reply->type 类型错误"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 303;
}
else if(strncmp("OK",reply->str,2)!=0)
{
std::cout<<"数据插入失败"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 304;
}
printf("成功插入进redis..插入成功!\n");
freeReplyObject(reply);
redisFree(connect);
//-------------------操作redis部分--------------------↑
int l_return_pro=Prod2Cons(msg);
//传递给生产者
if(l_return_pro!=0)
{
printf("\n传递给生产者出错...\n");
}
printf("\nInsertUser函数结束...\n\n");
return 0;
}
int32_t UpdataUser(const int32_t seqId,const message& msg)
{
printf("\nUpdataUser...\n");
message l_temp_msg=msg;
int32_t l_Query_return=0;
//1 判断一下该id是否存在
l_Query_return=QueryUser(seqId);
if(l_Query_return==404)
{
printf("该id数据不存在,修改失败!\n");
return l_Query_return;
}
else if(l_Query_return!=0)
{
printf("QueryMysql出错 l_Query_return=%d\n",l_Query_return);
return l_Query_return;
}
//-------------------操作数据库部分--------------------
//mysql指针初始化
MYSQL* mysql = mysql_init(NULL);
if(mysql==NULL)
{
printf("初始化环境失败\n");
return 1;//
}
// 2. 连接数据库服务器
mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101",
"exam2", 3306, NULL, 0);
if(mysql == NULL)
{
printf("连接数据库服务器失败\n");
return 2;//
}
//4调用过程
char l_call_cstr[100]={0};
//std::string l_call_str="";
sprintf(l_call_cstr, "call updatauser (%d,'%s',%d,'%s','%s','%s','%s',@ret);",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
//l_call_str=l_call_cstr;
int32_t l_ret1=mysql_query(mysql, l_call_cstr);
if(l_ret1!=0)
{
std::cout<<"call_updatastudent_error"<<std::endl;
return 401;
}
// //返回结果测试
// MYSQL_RES* l_result=mysql_store_result(mysql);
// if(l_result==NULL)
// {
// printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
// mysql_free_result(l_result);
// return 402;
// }
// mysql_free_result(l_result);
l_Query_return=QueryUser(seqId);
if(l_Query_return!=0)
{
printf("并没有修改进数据库..插入失败!\n");
return l_Query_return;
}
mysql_close(mysql);
printf("成功修改进数据库..插入成功!\n");
//-------------------操作数据库部分--------------------
//-------------------操作redis部分--------------------
redisContext *connect=NULL;
redisReply *reply=NULL;
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);
if (connect == NULL || connect->err)
{
if (connect)
{
printf("connect error: %s\n", connect->errstr);
redisFree(connect); //释放redisConnect()所产生的连接。
}
else
{
printf("connect error: can't allocate redis context\n");
}
return 301;
}
//1链接测试部分
//2写入redis中数据部分
reply = (redisReply*)redisCommand(connect,"hmset user%d name %s sex %d age %s tel %s email %s pers_desc %s",seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
if(reply==NULL)
{
std::cout<<"reply获取失败"<<std::endl;
redisFree(connect); //释放redisConnect()所产生的连接。
return 302;
}
else if(reply->type!=REDIS_REPLY_STATUS)
{
std::cout<<"reply->type 类型错误"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 303;
}
else if(strncmp("OK",reply->str,2)!=0)
{
std::cout<<"数据插入失败"<<std::endl;
freeReplyObject(reply);
redisFree(connect); //释放redisConnect()所产生的连接。
return 304;
}
printf("成功修改进redis..修改成功!\n");
freeReplyObject(reply);
redisFree(connect);
//-------------------操作redis部分--------------------
int l_return_pro=Prod2Cons(msg);
//传递给生产者
if(l_return_pro!=0)
{
printf("\n传递给生产者出错...\n");
}
printf("\nUpdata函数结束...\n\n");
return 0;
}
void GetUser(message& _return, const int32_t seqId)
{
printf("\nGetUser...\n");
//检查id//
message l_msg_temp;
l_msg_temp.seqId=-101;
int32_t l_Query_return=0;
//1检查id
l_Query_return=QueryUser(seqId);
if(l_Query_return==404)
{
printf("该id数据查找不存在\n");
l_msg_temp.seqId=-404;
_return=l_msg_temp;
return ;
}
else if(l_Query_return!=0)
{
printf("QueryUser出错 l_Query_return=%d\n",l_Query_return);
l_msg_temp.seqId=-403;
_return=l_msg_temp;
return ;
}
//id存在...此时redis中一定存在该数据 因为getuser前面是query
//-------------------操作redis部分--------------------↓
redisContext *connect=NULL;
redisReply *reply=NULL;
struct timeval timeout = { 1, 500000 }; // 1.5 seconds
connect = redisConnectWithTimeout(hostname, l_redis_port, timeout);
if (connect == NULL || connect->err)
{
if (connect)
{
printf("connect error: %s\n", connect->errstr);
redisFree(connect); //释放redisConnect()所产生的连接。
}
else
{
printf("connect error: can't allocate redis context\n");
}
l_msg_temp.seqId=-301;
_return=l_msg_temp;
return ;
}
//1链接测试部分
//2得到redis中数据部分
reply = (redisReply*)redisCommand(connect,"hgetall user%d",seqId);
if(reply==NULL)
{
std::cout<<"reply获取失败"<<std::endl;
l_msg_temp.seqId=-302;
_return=l_msg_temp;
redisFree(connect);
return ;
}
else if(reply->type!=REDIS_REPLY_ARRAY)
{
std::cout<<"reply->type 类型错误"<<std::endl;
freeReplyObject(reply);
l_msg_temp.seqId=-303;
_return=l_msg_temp;
redisFree(connect);
return ;
}
else if(reply->elements!=l_redis_size)//
{
std::cout<<"reply->elements数值错误"<<std::endl;
freeReplyObject(reply);
l_msg_temp.seqId=-304;
_return=l_msg_temp;
redisFree(connect);
return ;
}
//3找到了
std::cout<<"在redis中找到了"<<std::endl;
for(int i=0;i<reply->elements;i++)
{
if(reply->element[i]==NULL)
{
std::cout<<"reply获取失败"<<std::endl;
freeReplyObject(reply);
l_msg_temp.seqId=-302;
_return=l_msg_temp;
redisFree(connect);
return ;
}
else if(reply->element[i]->type!=REDIS_REPLY_STRING)
{
std::cout<<"数据展示出错2"<<std::endl;
freeReplyObject(reply);
redisFree(connect);
l_msg_temp.seqId=-303;
_return=l_msg_temp;
return ;
}
//1//输出一下
std::string l_strs_temp(reply->element[i]->str,reply->element[i]->len);
printf("index %d:%s\n",i,l_strs_temp.c_str());
switch (i)
{
case 1:
l_msg_temp.name=l_strs_temp;
break;
case 3:
l_msg_temp.sex=l_strs_temp[0]-'0';
break;
case 5:
l_msg_temp.age=l_strs_temp;
break;
case 7:
l_msg_temp.tel=l_strs_temp;
break;
case 9:
l_msg_temp.email=l_strs_temp;
break;
case 11:
l_msg_temp.pers_desc=l_strs_temp;
break;
default:
break;
}
}
l_msg_temp.seqId=seqId;
_return=l_msg_temp;
freeReplyObject(reply);
redisFree(connect);
printf("\nGetUser函数结束...\n");
return ;
//-------------------操作redis部分--------------------上
//query之后不需要在查询数据库
// //-------------------操作数据库部分--------------------
// //2 mysql指针初始化
// MYSQL* mysql = mysql_init(NULL);
// if(mysql==NULL)
// {
// printf("初始化环境失败\n");
// _return=l_msg_temp;
// return ;//
// }
// mysql=mysql_real_connect(mysql, "192.168.100.160", "root", "200101",
// "exam2", 3306, NULL, 0);
// if(mysql == NULL)
// {
// printf("连接数据库服务器失败\n");
// _return=l_msg_temp;
// return ;//
// }
// //3 调用过程插入一个insert string吧
// char l_call_cstr[100]={};
// //std::string l_call_str="";
// sprintf(l_call_cstr, "call queryuser (%d,@ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_desc);",seqId);
// //l_call_str=l_call_cstr;
// int l_ret1=mysql_query(mysql,l_call_cstr);
// if(l_ret1!=0)
// {
// std::cout<<"call_error"<<std::endl;
// _return=l_msg_temp;
// return ;
// }
// // //4调用out过程
// // int l_ret2=mysql_query(mysql, " select @ret,@out_name,@out_sex,@out_age,@out_tel,@out_email,@out_desc;");
// // if(l_ret2!=0)
// // {
// // std::cout<<"error"<<std::endl;
// // _return=l_msg_temp;
// // return ;
// // }
// //5 返回结果测试
// MYSQL_RES* l_result=mysql_store_result(mysql);
// if(l_result==NULL)
// {
// printf("mysql_store_result() 失败了, 原因: %s\n", mysql_error(mysql));
// mysql_freel_result_result(l_result);
// _return=l_msg_temp;
// return ;
// }
// //if()
// //6展示
// MYSQL_ROW row;
// while((row = mysql_fetch_row(l_result))!=NULL)
// {
// std::string s_temp="404";
// if(row[0]==s_temp)
// {
// printf("没有当前查询\n");
// l_msg_temp.seqId=-404;
// _return=l_msg_temp;
// return ;//没有该id
// }
// else if(row[0]!=0)
// {
// printf("存储过程中的return出错\n");
// l_msg_temp.seqId=-403;
// _return=l_msg_temp;
// return ;//判断存储过程中的return是否出错
// }
// else printf("[%s]-[%s]\n", row[0], row[1]);
// l_msg_temp.seqId=seqId;
// l_msg_temp.name=row[1];
// l_msg_temp.sex=row[2];
// l_msg_temp.age=row[3];//
// l_msg_temp.tel=row[4];
// l_msg_temp.email=row[5];
// l_msg_temp.pers_desc=row[6];
// _return=l_msg_temp;
// }
// mysql_free_result(l_result);
// mysql_close(mysql);
// return ;
}
};
int main() {
int l_thrift_port = 9090;
::std::shared_ptr<serDemoHandler> handler(new serDemoHandler());
::std::shared_ptr<TProcessor> processor(new serDemoProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(l_thrift_port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
// MYSQL* mysql = mysql_init(NULL);
// cout<<1<<endl;
server.serve();
return 0;
}
客户端部分
// 在同级目录下创建 client.cpp 文件
// ----------替换成自己的头文件----------
#include "serDemo.h"
// --------------------------------------
#include <thrift/transport/TSocket.h>
#include <iostream>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <map>
#include <string>
#include <pthread.h>
#include <vector>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <mysql8/mysql.h>
#include <cstdio>
#include <hiredis.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
// using boost::shared_ptr;
//return -2没有该用户id
//return -1id不合法
//return -101 服务端getuser失败
//return 1数据插入数据库失败
//return 2新数据修改数据库vec失败!
//return 3客户端输出失败
//return 401 mysql语句返回错误
//return 402 l_result返回错误
//return 403 判断存储过程中的return是否出错
//return 404 数据库没有该id的数据
int32_t CheckId(serDemoClient client,int32_t l_tmp_seqId)
{
int32_t l_id_temp=l_tmp_seqId;
int32_t l_cnt_now=client.QueryUser(l_id_temp);
if(l_cnt_now==-1)
{
printf("用户id输入长度或数值不合法\n");
return -1;
}
else if(l_cnt_now==404)
{
printf("没有该id的用户\n");
return 404;//没有该用户
}
else if(l_cnt_now!=0)
{
printf("查询有其他错误\n");
return l_cnt_now;
}
printf("该用户id存在...\n");
return 0;
}
int32_t OutPut(const message& msg) {
printf("id:%d\nname:%s\nsex:%d\nage:%s\ntel:%s\nemail:%s\npers_desc:%s\n",
msg.seqId,msg.name.c_str(),msg.sex,msg.age.c_str(),msg.tel.c_str(),msg.email.c_str(),msg.pers_desc.c_str());
return 0;
}
int32_t FindUser(serDemoClient client)
{
int32_t l_tmp_seqId=0;
// Your implementation goes here
printf("正在查询用户...\n");
printf("请输入你想查询的用户id,请键入数字...\n");
while (1)
{
std::cin>>l_tmp_seqId;
int32_t l_CheckId_return=CheckId(client,l_tmp_seqId);
if(l_CheckId_return==0)//这个id既合法也存在
{
break;
}
printf("用户id输入不合法,请重新键入数字...\n");
}
//得到该用户
message l_msg_temp;
client.GetUser(l_msg_temp,l_tmp_seqId);
if(l_msg_temp.seqId<=0)
{
std::cout<<"GetUser失败"<<std::endl;
return -101;
}
if(OutPut(l_msg_temp))
{
std::cout<<"客户端输出失败"<<std::endl;
return 3;
}
std::cout<<"查找用户成功!"<<std::endl;
return 0;
}
int32_t CreateUser(serDemoClient client)//
{
printf("正在创建用户...\n");
message l_msg_temp;
int32_t l_id_temp=0,l_sex_temp;
std::string l_name_temp,l_age_temp,l_tel_temp,l_email_temp,l_desc_temp;
printf("请输入你想创建的用户id,请键入数字...\n");
while (1)
{
std::cin>>l_id_temp;
int32_t l_CheckId_return=CheckId(client,l_id_temp);
if(l_CheckId_return==404)//用户id不存在
{
printf("用户id不存在 可以插入!\n");
l_msg_temp.seqId=l_id_temp;
break;
}
else if(l_CheckId_return==0)
{
printf("该id已被人占用,请重新输入!\n");
continue;
}
else
{
printf("数据输入不合理,请重新输入!\n");
continue;
}
}
printf("请输入你创建用户的name\n");
while (1)
{
std::cin>>l_name_temp;
if(l_name_temp.size()<=20)
{
printf("用户name输入成功!\n");
l_msg_temp.name=l_name_temp;
break;
}
else
{
printf("name输入太长,请重新输入!\n");
continue;
}
}
printf("请输入你创建用户的sex:0代表男,1代表女\n");
while (1)
{
std::cin>>l_sex_temp;
if(l_sex_temp==0||l_sex_temp==1)
{
printf("用户sex输入成功!\n");
l_msg_temp.sex=l_sex_temp;
break;
}
else
{
printf("sex输入错误,请重新输入!\n");
continue;
}
}
printf("请输入你创建用户的age\n");
while (1)
{
std::cin>>l_age_temp;
int l_flag_error=0;
for(int i=0;i<l_age_temp.size();i++)
{
if(l_age_temp[i]>'9'||l_age_temp[i]<'0')
{
l_flag_error=1;
break;
}
}
if(l_flag_error==1)
{
printf("age输入错误,请重新输入!\n");
continue;
}
else if(l_age_temp.size()<=3&&l_age_temp.size()>=1)
{
printf("用户age输入成功!\n");
l_msg_temp.age=l_age_temp;
break;
}
else
{
printf("age输入有其他错误,请重新输入!\n");
continue;
}
}
printf("请输入你创建用户的tel\n");
while (1)
{
std::cin>>l_tel_temp;
int32_t l_check_flag=0;
for(int i=0;i<l_tel_temp.size();i++)
{
if(l_tel_temp[i]<'0'||l_tel_temp[i]>'9')
{
printf("tel输入非法字符,请重新输入!\n");
l_check_flag=1;
break;
}
}
if(l_check_flag)continue;
if(l_tel_temp.size()<=20)
{
printf("用户tel输入成功!\n");
l_msg_temp.tel=l_tel_temp;
break;
}
else
{
printf("tel输入过长错误,请重新输入!\n");
continue;
}
}
printf("请输入你创建用户的email\n");
while (1)
{
std::cin>>l_email_temp;
if(l_email_temp.size()<=30)
{
printf("用户email输入成功!\n");
l_msg_temp.email=l_email_temp;
break;
}
else
{
printf("email输入过长错误,请重新输入!\n");
continue;
}
}
printf("请输入你创建用户的personal_describe\n");
while (1)
{
getchar();
getline(std::cin,l_desc_temp);
if(l_desc_temp.size()<=150)
{
printf("用户personal_describe输入成功!\n");
l_msg_temp.pers_desc=l_desc_temp;
break;
}
else
{
printf("personal_describe输入过长错误,请重新输入!\n");
continue;
}
}
int32_t l_CreateUser_return=client.InsertUser(l_id_temp,l_msg_temp);
//将数据插入数据库
if(l_CreateUser_return!=0)
{
//插入数据失败!
std::cout<<"数据插入数据库失败!"<<std::endl;
return l_CreateUser_return;
}
std::cout<<"创建用户成功!"<<std::endl;
return 0;
}
int32_t UpdataUser(serDemoClient client)
{
printf("正在修改用户...\n");
//预处理一些数据供使用
message l_msg_temp,l_msg_temp2;
int32_t l_id_temp=0,l_sex_temp=0;
std::string l_name_temp,l_age_temp,l_tel_temp,l_email_temp,l_desc_temp;
//找到该用户在vec的下标
printf("请输入你想修改的用户id,请键入数字...\n");
while (1)
{
std::cin>>l_id_temp;
int l_CheckId_return=CheckId(client,l_id_temp);
if(l_CheckId_return==0)
{
printf("用户id输入成功!\n");
l_msg_temp.seqId=l_id_temp;
break;
}
else if(l_CheckId_return==404)
{
printf("该id不存在,请重新输入!\n");
continue;
}
else
{
printf("数据输入不合理,请重新输入!\n");
continue;
}
}
client.GetUser(l_msg_temp,l_id_temp);
if(l_msg_temp.seqId<0)
{
std::cout<<"GetUser失败"<<std::endl;
return l_msg_temp.seqId;
}
printf("你正在修改的数据内容如下:\n");
if(OutPut(l_msg_temp))//得到服务端的该数据 拿到客户端
{
std::cout<<"客户端输出失败"<<std::endl;
return 3;
}
//循环修改数据 客户端
while (1)
{
printf("请键入数字以选择你想要修改的数据内容:\n");
printf("1、name 2、sex 3、age 4、tel 5、email 6、personal describe 7、exit\n");
int l_updata_type=0;
std::cin>>l_updata_type;
if(l_updata_type>7||l_updata_type<0)
{
//键入不合理的数字或其他字符 请重新输入
printf("键入不合理的数字或其他字符 请重新输入!\n");
continue;
}
if(l_updata_type==7)
{
break;
}
switch (l_updata_type)
{
case 1:
printf("修改name中!\n");
printf("请输入你想修改用户的新name\n");
while (1)
{
std::cin>>l_name_temp;
if(l_name_temp.size()<=20)
{
printf("用户name输入成功!\n");
l_msg_temp.name=l_name_temp;
break;
}
else
{
printf("name输入太长,请重新输入!\n");
continue;
}
}
break;
case 2:
printf("修改sex中!\n");
printf("请输入你修改用户的新sex:0代表男,1代表女\n");
while (1)
{
std::cin>>l_sex_temp;
if(l_sex_temp==0||l_sex_temp==1)
{
printf("用户sex输入成功!\n");
l_msg_temp.sex=l_sex_temp;
break;
}
else
{
printf("sex输入错误,请重新输入!\n");
continue;
}
}
break;
case 3:
printf("修改age中!\n");
printf("请输入你修改用户的新age\n");
while (1)
{
std::cin>>l_age_temp;
int l_flag_error=0;
for(int i=0;i<l_age_temp.size();i++)
{
if(l_age_temp[i]>'9'||l_age_temp[i]<'0')
{
l_flag_error=1;
break;
}
}
if(l_flag_error==1)
{
printf("age输入错误,请重新输入!\n");
continue;
}
else if(l_age_temp.size()<=3&&l_age_temp.size()>=1)
{
printf("用户age输入成功!\n");
l_msg_temp.age=l_age_temp;
break;
}
else
{
printf("age输入有其他错误,请重新输入!\n");
continue;
}
}
break;
case 4:
printf("修改tel中!\n");
printf("请输入你修改用户的新tel\n");
while (1)
{
std::cin>>l_tel_temp;
int l_check_flag=0;
for(int i=0;i<l_tel_temp.size();i++)
{
if(l_tel_temp[i]<'0'||l_tel_temp[i]>'9')
{
printf("tel输入非法字符,请重新输入!\n");
l_check_flag=1;
break;
}
}
if(l_check_flag)continue;
if(l_tel_temp.size()<=20)
{
printf("用户tel输入成功!\n");
l_msg_temp.tel=l_tel_temp;
break;
}
else
{
printf("tel输入过长错误,请重新输入!\n");
continue;
}
}
break;
case 5:here
printf("修改email中!\n");
printf("请输入你修改用户的新email\n");
while (1)
{
std::cin>>l_email_temp;
if(l_email_temp.size()<=30)
{
printf("用户email输入成功!\n");
l_msg_temp.email=l_email_temp;
break;
}
else
{
printf("email输入过长错误,请重新输入!\n");
continue;
}
}
break;
case 6:here
printf("修改personal_describe中!\n");
printf("请输入你修改用户的新personal_describe\n");
while (1)
{
getchar();
getline(std::cin,l_desc_temp);
if(l_desc_temp.size()<=150)
{
printf("用户personal_describe输入成功!\n");
l_msg_temp.pers_desc=l_desc_temp;
break;
}
else
{
printf("personal_describe输入过长错误,请重新输入!\n");
continue;
}
}
break;
default:
break;
}
printf("数据输入成功...可以继续输入你想修改的部分...想退出修改请键入7\n");
}
//将修改好的数据传入到服务端
int l_UpdataUser_return=client.UpdataUser(l_id_temp,l_msg_temp);
if(l_UpdataUser_return!=0)
{
printf("修改后数据写入数据库失败!\n");
return l_UpdataUser_return;
}
client.GetUser(l_msg_temp,l_id_temp);
printf("修改完毕,修改后数据如下...\n");
if(OutPut(l_msg_temp))//得到服务端的该数据 拿到客户端
{
std::cout<<"客户端输出失败2"<<std::endl;
return 3;
}
return 0;
}
int main()
{
std::cout<<1<<std::endl;
std::shared_ptr<TSocket> socket(new TSocket("localhost", 9090));
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
serDemoClient client(protocol);
transport->open();
// 代码
// message msg;
// msg.seqId = 123456;
// msg.name = "Zed222";
// msg.sex =0;
// msg.age=22;
// msg.tel="1756915";
// msg.email="1208086@qq.com";
// msg.pers_desc="He is a handsome man";
// CreateUser
// std::cout<<client.OutPut(123456)<<std::endl;
while(1)
{
printf("请输入你想进行的操作 1、查询用户 2、创建用户 3、更新用户 0、退出\n");
int flag=0;
std::cin>>flag;
if(flag>3||flag<0)
{
printf("键入数字错误 请重新输入\n");
continue;
}
if(flag==0)break;
switch (flag)
{
case 1:
FindUser(client);
break;
case 2:
CreateUser(client);
break;
case 3:
UpdataUser(client);
break;
default:
break;
}
}
printf("退出系统\n");
transport->close();
return 0;
}
生产者部分
#include "serDemo.h"
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include "rdkafka.h"
#include <fstream>
#include <sstream>
#include <map>
#include <string>
#include <pthread.h>
#include <vector>
#include <cstring>
#include <iostream>
#include <algorithm>
#include <mysql8/mysql.h>
#include <cstdio>
#include <memory>
#include <jsoncpp/json/json.h>
#include <hiredis.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
volatile sig_atomic_t run = 1;
//return 501 反序列化失败
//return 502 rd_kafka_conf_set失败
//return 503 rd_kafka_new失败
void stop(int sig)
{
run=0;
fclose(stdin); /* abort fgets() */
}
static void dr_msg_cb(rd_kafka_t *l_rk, const rd_kafka_message_t *l_rkmessage, void *l_opaque)
{
if (l_rkmessage->err)
{
fprintf(stderr,"%% Message delivery failed: %s\n",rd_kafka_err2str(l_rkmessage->err));
}
else
{
fprintf(stderr,"%% Message delivered (%zd bytes, partition %" PRId32 ")\n",
l_rkmessage->len, l_rkmessage->partition);
}
// l_rkmessage 能自动销毁by librdkafka
}
int Prod2Cons(message msg)
{
//-----------------------msg to json---------------
Json::Value root,root2;
root["ID"]=msg.seqId;
root["name"]=msg.name;
root["sex"]=msg.sex;
root["age"]=msg.age;
root["tel"]=msg.tel;
root["email"]=msg.email;
root["pers_desc"]=msg.pers_desc;
//-----------------------msg to json---------------
//-----------------------json to str---------------
Json::StreamWriterBuilder swb;
std::shared_ptr <Json::StreamWriter > sw(swb.newStreamWriter() ) ; //sw指向swb
//通过sw的write将root序列化成Json字符串,并存储到str中
std::stringstream str;
sw->write(root,&str);
std::string str_serialization=str.str();
char * p=(char*)str_serialization.data();
//char *str_char=str_serialization.c_str();
if(p==NULL)
{
printf("反序列化失败\n");
return 501;
}
//-----------------------json to str---------------
//-----------------------producer模板---------------
char l_brokers[100]="192.168.100.160:9092";
char l_topic[100]="TestTopicForDocker";
rd_kafka_t *l_rk=NULL; // 生产者实例句柄
rd_kafka_conf_t *l_conf=NULL; // 临时配置对象
char l_errstr[512]={}; // API错误报告缓冲区
char l_buf[512]={}; // 消息值临时缓冲区
l_conf=rd_kafka_conf_new();//创建kafka 客户端配置
//配置各项参数 其中kafka port默认为9022号端口
if(rd_kafka_conf_set(l_conf,"bootstrap.servers", l_brokers, l_errstr,sizeof(l_errstr)) != RD_KAFKA_CONF_OK)
{//若返回结果不等于这个enum类型 错误结果打印
fprintf(stderr, "%s\n", l_errstr);
rd_kafka_conf_destroy(l_conf);
//
return 502;
}
//设置发送回调函数 用以反馈信息发送的成败
rd_kafka_conf_set_dr_msg_cb(l_conf, dr_msg_cb); //设置发送回调函数 一旦生产的每条消息被rd_kafka_produce函数接收,投递报告回调函数会被立即调用
//初始化一个顶层对象 的基础容器 用于全局配置和共享状态
//conf在rd_kafka_new调之后不能被再次使用·
l_rk=rd_kafka_new(RD_KAFKA_PRODUCER, l_conf, l_errstr, sizeof(l_errstr));
if(!l_rk)
{
fprintf(stderr, "%% Failed to create new producer: %s\n",l_errstr);
rd_kafka_conf_destroy(l_conf);//
return 503;
}
//如果分配成功 临时配置对象会在rd_kafka_new中被释放 此时无需再rd_kafka_conf_destroy
l_conf=NULL;
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
fprintf(stderr,//打印...
"%% Type some text and hit enter to produce message\n"
"%% Or just hit enter to only serve delivery reports\n"
"%% Press Ctrl-C or Ctrl-D to exit\n");
int continue_times=0;
if(run)
{
rd_kafka_resp_err_t l_err;//错误代码提示
// SendProduce message.
// 这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列,
// 对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb)
// 用于在消息传递成功或失败时向应用程序发回信号
while(1)
{
l_err=rd_kafka_producev(
l_rk,//顶层对象 的基础容器 用于全局配置和共享状态,之前通过rd_kafka_topic_new()生成
RD_KAFKA_V_TOPIC(l_topic),//topic
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),//表示librdkafka 在信息发送前立即从 payload 做一份拷贝
RD_KAFKA_V_VALUE(p, str_serialization.size()),//消息和长度//传信息
RD_KAFKA_V_OPAQUE(NULL),//可选的,应用程序为每个消息提供的无类型指针,提供给消息发送回调函数,用于应用程序引用。
RD_KAFKA_V_END); /* End sentinel */
if(l_err)
{//若该值不是0则为一个错误码
fprintf(stderr,"%% Failed to produce to topic %s: %s\n", l_topic, rd_kafka_err2str(l_err));
if(l_err==RD_KAFKA_RESP_ERR__QUEUE_FULL)
{
//如果内部队列已满,请等待消息传递,然后重试。
//内部队列表示要发送的消息和已经发送或失败的消息,等待调用它们的传递报告回调。
rd_kafka_poll(l_rk,1000 );//阻塞等待消息1000ms至发送完成或其它操作
continue_times++;
continue;//再传!
}
}
else
{//若该值为0 传送成功 打印出相应成果
fprintf(stderr,"%% Enqueued message (%zd bytes) for topic %s\n",str_serialization.size(), l_topic);
break;
}
}
// producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为
// 传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其
// 发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll()
// 仍然被调用
rd_kafka_poll(l_rk, 0);
}
// rd_kafka_flush是rd_kafka_poll()的抽象化,
// 等待所有未完成的produce请求完成,通常在销毁producer实例前完成
// 以确保所有排列中和正在传输的produce请求在销毁前完成
fprintf(stderr, "%% Flushing final messages..\n");
rd_kafka_flush(l_rk,10*1000);
//如果输出队列仍然不为空,则存在向集群生成消息的问题。
if(rd_kafka_outq_len(l_rk)>0)
{
fprintf(stderr,"%% %d message(s) were not delivered\n",rd_kafka_outq_len(l_rk));
}
rd_kafka_destroy(l_rk);//万事俱备 销毁实例
//-----------------------producer模板---------------
return 0;
}
消费者部分
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <iostream>
#include <cstdio>
#include <ctype.h>
#include "rdkafka.h"
#include<jsoncpp/json/json.h>
volatile sig_atomic_t g_run = 1;
void stop(int sig) {
g_run = 0;
}
int is_printable(void *buf1, size_t size)
{
size_t i;
char *l_buf=(char *)buf1;//
for(i=0;i<size;i++)
if(!isprint((int)l_buf[i]))
return 0;
return 1;
}
int main(int argc, char **argv) {
rd_kafka_t *l_rk=NULL; //生产者实例句柄
rd_kafka_conf_t *l_conf=NULL; //临时配置对象
rd_kafka_resp_err_t l_err; // librdkafka API 错误代码
char l_errstr[512]={0}; // librdkafka API 错误缓冲区内容
char *l_brokers=NULL; // broker list
char *l_groupid=NULL; //组id
char **l_topics=NULL; //要订阅的 topic
int l_topic_cnt=0; //要订阅的topic数量
rd_kafka_topic_partition_list_t *l_subscription=NULL;
int i=0;
if (argc<4)
{
fprintf(stderr,"%% Usage: ""%s <broker> <group.id> <topic1> <topic2>..\n",argv[0]);
return 1;
}
l_brokers=argv[1];
l_groupid=argv[2];
l_topics=&argv[3];
l_topic_cnt=argc-3;
l_conf = rd_kafka_conf_new();
//配置设置ip,端口 默认9092
if(rd_kafka_conf_set(l_conf, "bootstrap.servers", l_brokers, l_errstr,sizeof(l_errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", l_errstr);
//销毁l_conf
rd_kafka_conf_destroy(l_conf);
return 2;
}
//配置消费者组id
//共享相同组id的所有消费者将加入同一个组,订阅的topic'partitions将根据partition.assignment.strategy(消费者配置属性)分配给组中的消费者。
if(rd_kafka_conf_set(l_conf, "group.id", l_groupid, l_errstr,sizeof(l_errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", l_errstr);
rd_kafka_conf_destroy(l_conf);
return 3;
}
// 如果一个分区之前没有提交偏移量,则auto.offset.reset策略将用于决定在分区的哪个位置开始获取消息。
// 通过将此设置为最早,如果之前没有提交偏移量,消费者将读取分区中的所有消息。
if(rd_kafka_conf_set(l_conf, "auto.offset.reset", "earliest", l_errstr,
sizeof(l_errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", l_errstr);
rd_kafka_conf_destroy(l_conf);//
return 4;
}
// 创建使用者实例。实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态
// 注意:rd_kafka_new()拥有conf对象的所有权,在调用后应用程序不能再次引用它。
l_rk = rd_kafka_new(RD_KAFKA_CONSUMER, l_conf, l_errstr, sizeof(l_errstr));
//在分配成功的情况下 这里l_conf的配置临时配置对象其实已经被rd_kafka_new释放
if(!l_rk)
{
fprintf(stderr, "%% Failed to create new consumer: %s\n",l_errstr);
//没分配成功的需要 再手动释放
rd_kafka_conf_destroy(l_conf);
return 5;
}
std::cout<<223<<std::endl;//-----------------
//rd_kafka_conf_destroy(l_conf);这里不需要再销毁配置对象
l_conf=NULL; //配置对象现在由rd_kafka_t实例拥有和释放
//重定向 rd_kafka_poll()队列到consumer_poll()队列
rd_kafka_poll_set_consumer(l_rk);
//将所有topic list转化为 适合kafka的格式
l_subscription=rd_kafka_topic_partition_list_new(l_topic_cnt);
for (i=0;i<l_topic_cnt;i++)
rd_kafka_topic_partition_list_add(l_subscription, l_topics[i],
/* the partition is ignored
* by subscribe() */RD_KAFKA_PARTITION_UA);
// 订阅 l_topics list
l_err=rd_kafka_subscribe(l_rk, l_subscription);
if (l_err)
{//订阅出错
fprintf(stderr, "%% Failed to subscribe to %d l_topics: %s\n",l_subscription->cnt,rd_kafka_err2str(l_err));
rd_kafka_topic_partition_list_destroy(l_subscription);
rd_kafka_destroy(l_rk);
return 6;
}
fprintf(stderr,"%% Subscribed to %d topic(s),waiting for rebalance and messages...\n",l_subscription->cnt);
rd_kafka_topic_partition_list_destroy(l_subscription);//释放l_topics list使用的所有资源和它自己
/* Signal handler for clean shutdown */
signal(SIGINT, stop);
std::cout<<112<<std::endl;//-----------------
while (g_run)
{
rd_kafka_message_t *l_rkm;
//std::cout<<112<<std::endl;//-----------------
l_rkm = rd_kafka_consumer_poll(l_rk, 100);//从consumer_poll读取信息
if (!l_rkm)
continue; //超时就重新尝试
//返回错误的信息
if (l_rkm->err)
{
fprintf(stderr, "%% Consumer error: %s\n",
rd_kafka_message_errstr(l_rkm));
rd_kafka_message_destroy(l_rkm);
continue;
}
//正确就如下
//打印 字段 分区 offset等信息
printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",
rd_kafka_topic_name(l_rkm->rkt), l_rkm->partition,
l_rkm->offset);
// /* Print the message key. */
// if (l_rkm->key && is_printable(l_rkm->key, l_rkm->key_len))
// printf(" Key: %.*s\n", (int)l_rkm->key_len,
// (const char *)l_rkm->key);
// else if (l_rkm->key)
// printf(" Key: (%d bytes)\n", (int)l_rkm->key_len);
//打印信息字段
//--------------进行反序列化操作 ---------------
Json::Reader reader(Json::Features::strictMode());
Json::Value root;
if(reader.parse((const char *)l_rkm->payload,root)==0)
{
std::cout<<"reader.parse赋值错误!"<<std::endl;
return 7;
}
std::cout<<"ID:"<<root["ID"]<<"新得到的数据如下"<<std::endl;
std::cout<<"ID:"<<root["ID"]<<std::endl;
std::cout<<"name:"<<root["name"]<<std::endl;
std::cout<<"sex:"<<(root["sex"].asInt())<<std::endl;
std::cout<<"age:"<<root["age"]<<std::endl;
std::cout<<"tel:"<<root["tel"]<<std::endl;
std::cout<<"email:"<<root["email"]<<std::endl;
std::cout<<"pers_desc:"<<root["pers_desc"]<<std::endl;
// printf("ID:%d\nID:%d\n")
//--------------进行反序列化操作 ---------------
if (l_rkm->payload && is_printable(l_rkm->payload, l_rkm->len))
printf(" Value: %.*s\n", (int)l_rkm->len,(const char *)l_rkm->payload);
else if (l_rkm->payload)
printf(" Value: (%d bytes)\n", (int)l_rkm->len);
rd_kafka_message_destroy(l_rkm);
}
//关闭消费者 离开组
fprintf(stderr, "%% Closing consumer\n");
rd_kafka_consumer_close(l_rk);
//销毁消费者
rd_kafka_destroy(l_rk);
return 0;
}
四、运行展示
运行方式
1启动服务器 默认 redis端口6379 mysql端口3306 thirft 端口9090
./server
2启动客户端
./client
3启动消费者 接受变化信息 默认 kafka端口9092
./consumer 192.168.100.160:9092 0 TestTopicForDocker