protobuf是一种比json和xml等序列化工具更加轻量和高效的结构化数据存储格式,性能比json和xml真的强很多,毕竟google出品。
protobuf原理
protobuf如何使用
创建xxx.proto文件
开头写上
syntax="proto2"
package tutorial;
表明使用的proto版本和导入tutorial包
以充值查询响应为例,对应proto文件中代码如下
message list_account_records_response
{
required int32 code = 1; // 响应代号
optional string desc = 2; // 验证码
message account_record
{
required int32 type = 1; // 0 : 骑行消费, 1 : 充值, 2 : 退款
required int32 limit = 2; // 消费或者充值金额
required uint64 timestamp = 3; // 记录发生时的时间戳
}
repeated account_record records = 3;
}
PS:
可在message中自定义message类型,如上图中的account_record
编译message文件
编译语法:
protoc -I=$SRC_DIR --cpp_out=$DST_DIR bike.proto
SRC_DIR 表示proto文件所在的目录,cpp_out指定了生成的代码的路径, bike.proto指proto文件名。
protoc -I=./ --cpp_out=./ bike.proto
这样在当前目录生成了bike.pb.cc和bike.pb.h两个文件。
编译生成的c++文件
g++ -std=c++11 example.cc bike.pb.cc -lprotobuf
std=c++11表示使用c++11的标准
protobuf使用示例代码
example.cc
#include"bike.pb.h"
#include<string>
#include<iostream>
using namespace std;
using namespace tutorial;
int main(void)
{
std::string data;//存储序列化的消息
//客户端发送请求
{
mobile_request mr;
mr.set_mobile("18106050285");
mr.SerializeToString(&data);//格式化,变成序列化的消息
//客户端发送data send(sockfd,data.c_str(),data.length())
}
//服务器端接收请求
{
//receive(sockfd,data,....);
mobile_request mr;
mr.ParseFromString(data);
cout << "客户端手机号码:" << mr.mobile() << endl;
}
return 0;
}
进行编译
g++ -std=c++11 example.cc bike.pb.cc -lprotobuf
生成a.out
执行a.out:
复杂一些的使用示例:
list_account_records_response对象中的records对象为可重复的,如何在一个list_account_records_response中添加多个records呢?
example1.cc
#include"bike.pb.h"
#include<string>
#include<iostream>
using namespace std;
using namespace tutorial;
int main(void)
{
std::string data;//存储序列化的消息
//客户端发送请求
{
list_account_records_response larr;
larr.set_code(200);
larr.set_desc("ok");
for (size_t i = 0; i < 5; i++)
{
list_account_records_response_account_record* ar = larr.add_records();
ar->set_type(0);
ar->set_limit(i*100);
ar->set_timestamp(NULL);
}
cout << "records size :" << larr.records_size() << endl;
larr.SerializeToString(&data);
//客户端发送data send(sockfd,data.c_str(),data.length())
}
//服务器端接收请求
{
//receive(sockfd,data,....);
list_account_records_response larr;
larr.ParseFromString(data);
cout << "records size :" << larr.records_size() << endl;
printf("code : %d \n",larr.code());
for (int i = 0; i < larr.records_size(); i++)
{
const list_account_records_response_account_record& ar = larr.records(i);
printf("limit: %d \n",ar.limit());
}
}
return 0;
}
Libevent与Protobuf一起协作
客户端代码
client.cc
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<errno.h>
#include<unistd.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<event.h>
#include<event2/bufferevent.h>
#include<event2/buffer.h>
#include<event2/util.h>
#include"bike.pb.h"
using namespace std;
using namespace tutorial;
int tcp_connect_server(const char* server_ip, int port);
void cmd_msg_cb(int fd,short events,void* arg);
void server_msg_cb(int fd, short events, void* arg);
void event_cb(struct bufferevent *bev,short event,void *arg);
int main(int argc, char** argv)
{
if( argc < 3 )
{
printf("please input 2 parameters\n");
return -1;
}
//两个参数依次是服务器端的IP地址、端口号
int sockfd = tcp_connect_server(argv[1], atoi(argv[2]));
if( sockfd == -1)
{
perror("tcp_connect error ");
return -1;
}
printf("connect to server successfully\n");
struct event_base* base = event_base_new();
struct bufferevent* bev = bufferevent_socket_new(base, sockfd, BEV_OPT_CLOSE_ON_FREE);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
struct event *ev_sockfd = event_new(base, sockfd,
EV_READ | EV_PERSIST,server_msg_cb, NULL);
event_add(ev_sockfd, NULL);
//监听终端输入事件
struct event* ev_cmd = event_new(base, STDIN_FILENO,
EV_READ | EV_PERSIST, cmd_msg_cb,
(void*)bev);
event_add(ev_cmd, NULL);
event_base_dispatch(base);
printf("finished \n");
return 0;
}
void cmd_msg_cb(int fd, short events, void* arg)
{
char msg[1024];
string proto_msg;
int ret = read(fd, msg, sizeof(msg));
if( ret <= 0 )
{
perror("read fail ");
exit(1);
}
cout <<"终端输入:" << msg << endl;
struct bufferevent* bev = (struct bufferevent*)arg;
list_account_records_request larr;
larr.set_mobile("18106050285");
larr.SerializeToString(&proto_msg);
cout << "proto_msg 内容:"<<proto_msg.c_str() << endl;
cout << "proto_msg长度为:"<<proto_msg.length() << endl;
if (bev==NULL)
cout << "bev为空" << endl;
bufferevent_write(bev,proto_msg.c_str(),proto_msg.length());
}
void server_msg_cb(int fd, short events, void* arg)
{
char msg[1024];
//为了简单起见,不考虑读一半数据的情况
int len = read(fd, msg, sizeof(msg)-1);
msg[len] = '\0';
if( len == 0 )
{
printf("connection close. exit~\n");
exit(1);
}else if(len < 0){
perror("read fail ");
return ;
}
msg[len] = '\0';
printf("recv from server<<<<< [%s] \n", msg);
}
typedef struct sockaddr SA;
int tcp_connect_server(const char* server_ip, int port)
{
int sockfd, status, save_errno;
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr) );
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
status = inet_aton(server_ip, &server_addr.sin_addr);
if( status == 0 ) //the server_ip is not valid value
{
errno = EINVAL;
return -1;
}
sockfd = socket(PF_INET, SOCK_STREAM, 0);
if( sockfd == -1 )
return sockfd;
status = connect(sockfd, (SA*)&server_addr, sizeof(server_addr) );
if( status == -1 )
{
save_errno = errno;
close(sockfd);
errno = save_errno; //the close may be error
return -1;
}
//evutil_make_socket_nonblocking(sockfd);
return sockfd;
}
编译
g++ -std=c++11 client.cc bike.pb.cc -lprotobuf -levent -o client.exe
要加上-lprotobuf -levent 和 C++11标准
服务端代码
server.cc
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <event.h>
#include <event2/event.h>
#include<event2/listener.h>
#include <assert.h>
#include<string>
#include"bike.pb.h"
#define BUFLEN 1024
using namespace std;
using namespace tutorial;
typedef struct _ConnectStat {
//struct event* ev;
struct bufferevent* bev;
char buf[BUFLEN];
}ConnectStat;
//echo 服务实现相关代码
ConnectStat * stat_init(int fd, struct bufferevent *bev);
void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg);
//accept_connection(int fd, short events, void* arg);
void do_echo_request(struct bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
//void do_echo_response(int fd, short events, void *arg);
int tcp_server_init(int port, int listen_num);
struct event_base* base;
int main(int argc, char** argv)
{
struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(9999);
base = event_base_new();
struct evconnlistener *listener
= evconnlistener_new_bind(base, listener_cb, base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,
10, (struct sockaddr*)&sin,
sizeof(struct sockaddr_in));
event_base_dispatch(base);
evconnlistener_free(listener);
event_base_free(base);
return 0;
}
ConnectStat * stat_init(int fd, struct bufferevent *bev) {
ConnectStat * temp = NULL;
temp = (ConnectStat *)malloc(sizeof(ConnectStat));
if (!temp) {
fprintf(stderr, "malloc failed. reason: %m\n");
return NULL;
}
memset(temp, '\0', sizeof(ConnectStat));
temp->bev = bev;
}
/*
一个新客户端连接上服务器此函数就会被调用,当此函数被调用时,libevent已经帮我们accept了这个客户端。
该客户端的文件描述符为fd
*/
void listener_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg)
{
printf("accept a client %d\n", fd);
struct event_base *base = (struct event_base*)arg;
//为这个客户端分配一个bufferevent
struct bufferevent *bev = bufferevent_socket_new(base, fd,
BEV_OPT_CLOSE_ON_FREE);
ConnectStat *stat = stat_init(fd, bev);
bufferevent_setcb(bev, do_echo_request, NULL, event_cb, stat);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
}
void do_echo_request(struct bufferevent* bev, void* arg)
{
string request;
ConnectStat *stat = (ConnectStat *)arg;
char *msg = stat->buf;
printf("do echo request ...\n");
size_t len = bufferevent_read(bev, msg, BUFLEN);
msg[len] = '\0';
//解析protobuf 请求
{
list_account_records_request larr;
request = msg;
larr.ParseFromString(request);//反序列化
printf("recv from client<<<< %s\n", larr.mobile().c_str());
bufferevent_write(bev, larr.mobile().c_str(), larr.mobile().length());
}
}
void event_cb(struct bufferevent *bev, short event, void *arg)
{
ConnectStat *stat = (ConnectStat *)arg;
if (event & BEV_EVENT_EOF)
printf("connection closed\n");
else if (event & BEV_EVENT_ERROR)
printf("some other error\n");
//这将自动close套接字和free读写缓冲区
bufferevent_free(bev);
free(stat);
}