文章目录
- ustream 核心数据结构
- struct ustream
- struct ustream_buf_list
- struct ustream_buf
- struct ustream_fd
- ustream 核心API
- ustream_fd_init
- ustream_uloop_cb
- ustream_fd_read_pending
- ustream_fill_read
- ustream_write_pending
- ustream_write
- ustream_fd_write
- ustream 应用示例
- 使用ustream分别管理输入或者输出流
- 参考
在C/C++中经常会提到
流
,可能有很多同学对
流
这个概念不是特别理解,实际上,流只是不同设备间传输的一系列数据的抽象,简单的说,流就是一串数据。如果是这串数据需要对外输出,那么就称这个流为输出流,反之则称为输入流。
ustream 是 libubox 提供的一个流管理工具,它可以实现自动从流中获取数据或者将数据写入流,还可以主动通知ustream的所有者什么时候可以从流读取数据。如果是将数据写入流,会有一个特别的设计——当流可写时,数据会被直接写入流,当流不可写时,数据会被缓存,并在流再次可写时自动地将缓存的数据继续写入流,直到缓存中所有的数据都被写入。
ustream 源码仓库: libubox.git
本文基于
ustream 核心数据结构
ustream 中核心的数据结构关系图如下:
struct ustream
ustream结构表示一个流管理对象,而流本质上就是一个fd。
struct ustream {
struct ustream_buf_list r, w;//流的读写缓存链表
struct uloop_timeout state_change;//流状态定时器,notify_state()是通过这个定时器回调函数调用的
struct ustream *next;
// 通知流的owner 可以从流的读缓冲区中读取数据了
void (*notify_read)(struct ustream *s, int bytes_new);
// 通知流的owner 可以将数据写入流了
void (*notify_write)(struct ustream *s, int bytes);
// 通知流的owner 当前流的状态有变化,一般是异常状态如写错误
void (*notify_state)(struct ustream *s);
// 将数据写入流具体函数
int (*write)(struct ustream *s, const char *buf, int len, bool more);
// 释放一个流
void (*free)(struct ustream *s);
// 将流设置为读阻塞状态,禁止数据写入流的读缓冲
void (*set_read_blocked)(struct ustream *s);
// 监听流状态
bool (*poll)(struct ustream *s);
/*
* ustream user should set this if the input stream is expected
* to contain string data. the core will keep all data 0-terminated.
*/
bool string_data; // 流是否是字符串数据
bool write_error; // 是否发生写错误
bool eof, eof_write_done; // 是否到达EOF
enum read_blocked_reason read_blocked;// 流不可读的原因
};
enum read_blocked_reason {
READ_BLOCKED_USER = (1 << 0), // 用户主动禁止从流读取数据
READ_BLOCKED_FULL = (1 << 1), // 流已经满了,主要指的是读缓冲区满了
};
struct ustream_buf_list
ustream_buf_list 表示一个ustream buf 链表,每个流对应读和写2个链表。此结构在创建ustream
时自动创建,不需要用户单独定义。
struct ustream_buf_list {
struct ustream_buf *head; // 指向第一个ustream_buf
struct ustream_buf *data_tail; // 指向第一个尚未使用或者未使用完的ustream_buf
struct ustream_buf *tail;// 指向最后一个ustream_buf
int (*alloc)(struct ustream *s, struct ustream_buf_list *l);// 申请新的ustream_buf函数
int data_bytes;//当前ustream_buf_list里面总的数据量
int min_buffers;//当前ustream_buf_list 最少的 ustream_buf 个数
int max_buffers;//当前ustream_buf_list 最多的 ustream_buf 个数
int buffer_len;//单个 ustream_buf 大小
int buffers;//当前 ustream_buf_list 实际的 ustream_buf 个数
};
struct ustream_buf
ustream_buf 是实际存放数据的buffer,ustream_buf 通常有多个。
struct ustream_buf {
struct ustream_buf *next; // 指向下一个ustream_buf
char *data;//指向 ustream_buf 还没有被读写的位置
char *tail;//指向 ustream_buf 尚未使用的位置
char *end;// 指向 ustream_buf 末尾
char head[]; // 用于存放实际的数据区
};
ustream_buf_list 和 ustream_buf 之间的关系如下图所示:
struct ustream_fd
stream 是流管理对象,前面有提到,流本质上就是一个fd,uloop_fd 会关联此fd。
struct ustream_fd {
struct ustream stream;
struct uloop_fd fd;// 保存和当前ustream关联的fd,后续会使用uloop_run监听这个fd
};
ustream 核心API
ustream_fd_init
初始化一个ustream,注意到此函数除了接收 ustream_fd 参数,还需要关联一个fd(也就是流),这个fd 必须是“可poll”的,也就是说它只能是设备文件或者socket文件,以及pipe、fifo这类特殊设备文件,不可以是普通文件。
初始化之后默认就会开始监听fd的可读事件,不会监听可写事件,可写事件会在满足一定条件后才会监听。
void ustream_fd_init(struct ustream_fd *sf, int fd)
{
struct ustream *s = &sf->stream;
ustream_init_defaults(s);// 初始化ustream一些基础参数,比如ustream_buf的最大个数、大小等
sf->fd.fd = fd;
sf->fd.cb = ustream_uloop_cb;// fd 可读或者可写事件回调,由uloop_run调用
s->set_read_blocked = ustream_fd_set_read_blocked;
s->write = ustream_fd_write;// 将ustream写缓冲区里面的数据写入fd
s->free = ustream_fd_free;
s->poll = ustream_fd_poll;
ustream_fd_set_uloop(s, false);// 由uloop_run监听这个fd
}
ustream_uloop_cb
ustream_uloop_cb 是前面ustream_fd->fd 的可读写事件回调函数,当fd满足可读/写事件后就会自动调用此函数进行处理。
static void ustream_uloop_cb(struct uloop_fd *fd, unsigned int events)
{
struct ustream_fd *sf = container_of(fd, struct ustream_fd, fd);
__ustream_fd_poll(sf, events);
}
static bool __ustream_fd_poll(struct ustream_fd *sf, unsigned int events)
{
struct ustream *s = &sf->stream;
bool more = false;
if (events & ULOOP_READ)
ustream_fd_read_pending(sf, &more);// 处理可读事件
if (events & ULOOP_WRITE) {
bool no_more = ustream_write_pending(s);// 处理可写事件
if (no_more)
ustream_fd_set_uloop(s, false);//如果没有更多数据需要写入,就不再关注fd的可写事件
}
if (sf->fd.error && !s->write_error) {
ustream_state_change(s);
s->write_error = true;
ustream_fd_set_uloop(s, false);
}
return more;
}
ustream_uloop_cb 的整体流程如下:
ustream_fd_read_pending
当流中有数据可读时,uloop_run()就会调用ustream_fd_read_pending()从流中获取数据,并保存到ustream_buf,也就是前面提到的 struct ustream 的 r
成员。这个函数会一直读到流中没有数据了或者r->ustream_buf
满了或者读出错了才会停止。每读一次都会通知ustream
的所有者r->ustream_buf
中已经有数据可以被读取了。
static void ustream_fd_read_pending(struct ustream_fd *sf, bool *more)
{
struct ustream *s = &sf->stream;
int buflen = 0;
ssize_t len;
char *buf;
do {
if (s->read_blocked)
break;
buf = ustream_reserve(s, 1, &buflen);//获取一个空闲的buf,可用长度为buflen
if (!buf)
break;
len = read(sf->fd.fd, buf, buflen);// 从fd 中读取数据到 buf
if (len < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == ENOTCONN)
return;
len = 0;
}
if (!len) {
//进入这里,说明未读取到任何数据,但不一定是读取异常了,可能是没有可读的数据了
if (!s->eof)
ustream_state_change(s);
s->eof = true;
ustream_fd_set_uloop(s, false);//需要继续监听流是否可读
return;//正常都是走这里退出
}
ustream_fill_read(s, len);
*more = true;
} while (1);
}
ustream_fill_read
数据被填充到ustream_buf后,需要调整 ustream_buf 内各个指针的位置,以及更新一些变量的值(data_bytes)。
最后再通知 ustream 的所有者可以读取数据了。
void ustream_fill_read(struct ustream *s, int len)
{
struct ustream_buf *buf = s->r.data_tail;//指向第一个尚未使用的 s->r ustream_buf
int n = len;
int maxlen;
s->r.data_bytes += len;// s->r 总数据量增加len
do {
if (!buf)
abort();
maxlen = buf->end - buf->tail;//当前buf里面尚未使用的区域长度
if (len < maxlen)
//当实际需要写的数据量比 空闲空间少时,强制让maxlem = len,防止下一句出现负数
maxlen = len;
len -= maxlen;//如果maxlen = len,执行完后len=0,最后的while就直接退出了
buf->tail += maxlen;//tail指针后移 maxlen
ustream_fixup_string(s, buf);
s->r.data_tail = buf;//
buf = buf->next;//buf = s->r.data_tail->next
} while (len);//如果len > 0,再进行下一次循环
if (s->notify_read)
s->notify_read(s, n);//通知usream owner 可以读取数据了
}
ustream_write_pending
当流可以被写入时,uloop_run()就会调用ustream_write_pending()将w->ustream_buf
里面的数据写入流。
这个函数会一直将数据写入流,直到所有数据都被写入或者写入错了才会停止。不管以哪种方式导致写入动作停止,在写动作完毕后都会通知ustream
的所有者本次写入的数据总量。
bool ustream_write_pending(struct ustream *s)
{
struct ustream_buf *buf = s->w.head;
int wr = 0, len;
if (s->write_error)
return false;
while (buf && s->w.data_bytes) {
struct ustream_buf *next = buf->next;
int maxlen = buf->tail - buf->data;//当前buf中尚未读取的数据量
len = s->write(s, buf->data, maxlen, !!buf->next);//写入fd
if (len < 0) {
ustream_write_error(s);
break;
}
if (len == 0)//进入这里说明write操作出现了异常,可能是fd不可写了
break;
wr += len;
s->w.data_bytes -= len;//s->w.data_bytes 是s->w buf里面总的数据长度,这些数据都应该被写入fd
if (len < maxlen) {// 本次未能写入所有数据,说明底层write操作已不可写,直接退出,等待下一次fd可写事件
buf->data += len;//data指针后移 本次实际写入量len
break;
}
ustream_free_buf(&s->w, buf);//本次写入了一个buf里面所有的数据,需要释放这个buf
buf = next;//继续写下一个buf
}
if (s->notify_write)
s->notify_write(s, wr);//通知ustream的owner本次一共写入了多少字节
if (s->eof && wr && !s->w.data_bytes)
ustream_state_change(s);
return !s->w.data_bytes;//返回true,也就是s->w.data_bytes=0,说明所有数据都被写入流了,后面不再需要监听流是否可写了
}
ustream_write
将数据写入流,此函数会根据当前w->stream_buf
的状态,执行不同的动作:
1.如果w->ustream_buf
中无数据,则将数据直接写入流
2.如果不满足1,或者1未能将所有数据直接写入流(可能写了部分数据后流已经不允许写入了),则将数据(继续)写入w->ustream_buf
如果是情况2,就需要监听流是否可写,当流可写时,ustream_write_pending()
函数就会被调用,从而将w->stream_buf
中的数据继续写入流。
int ustream_write(struct ustream *s, const char *data, int len, bool more)
{
struct ustream_buf_list *l = &s->w;
int wr = 0;
if (s->write_error)
return 0;
if (!l->data_bytes) {
//只要缓存里面没有数据,都是直接写入ustream_fd
wr = s->write(s, data, len, more); //ustream_fd_write()写入ustream_fd
if (wr == len)
return wr;// 数据全部写入fd,直接return
if (wr < 0) {
ustream_write_error(s); //设置 s->write_error = true
return wr;
}
data += wr;
len -= wr;
}
// 进入这里说明wr < len,len表示还剩下多少字节未能写入fd
/* s->write 的返回值wr
wr<0: write fd failed
0<=wr<len: 数据没写完,这时候需要将剩余数据写入缓存 ustream_buf
wr=len || len=0: 数据已经全部写入fd
*/
return ustream_write_buffered(s, data, len, wr);
}
ustream_fd_write
ustream_fd_write会将数据写入fd(流),如果本次未能将全部数据写入fd,则会置位 ULOOP_WRITE flag,等待下一次fd可写事件触发时,uloop_run()会掉用 ustream_write_pending() 将之前未写完的数据继续写入fd。
static int ustream_fd_write(struct ustream *s, const char *buf, int buflen, bool more)
{
struct ustream_fd *sf = container_of(s, struct ustream_fd, stream);
ssize_t ret = 0, len;
if (!buflen)
return 0;
while (buflen) {
len = write(sf->fd.fd, buf, buflen);
if (len < 0) {
if (errno == EINTR)
continue;
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN)
break;
return -1;
}
ret += len;
buf += len;
buflen -= len;
}
if (buflen)
ustream_fd_set_uloop(s, true);
return ret;
}
至此,ustream中比较重要的数据结构和API基本上就已经介绍完毕了,下面将通过实例介绍ustream是如何工作的。
ustream 应用示例
使用ustream分别管理输入或者输出流
server 的逻辑:
1.注册 ubus test method,method回调函数里面会创建一个pipe,并把pipe的写端fd传送给client
2.创建一个ustream对象,并且将ustream 与 pipe 的读端fd关联,此fd相当于输入流
3.ustream会监听pipe里面是否有数据可读,并在pipe可读时自动将数据读到r->ustream_buf,最后通知ustream owner去读取数据
4.在client_notify_read()不断打印从pipe里面读到的数据
#include <sys/time.h>
#include <unistd.h>
#include <unistd.h>
#include <signal.h>
#include "blobmsg_json.h"
#include "ustream.h"
#include "libubus.h"
static struct ubus_context *ctx;
static struct blob_buf b;
struct client {
struct ustream_fd s;
int fd;
};
#define LOG(f,...) do {printf("[%s][%d] " f " ",__FUNCTION__,__LINE__,## __VA_ARGS__);}while(0)
static void client_notify_read(struct ustream *s, int bytes)
{
struct client *cl = container_of(s, struct client, s.stream);
struct ustream_buf *buf = s->r.head;
char *newline, *str;
int len;
do {
str = ustream_get_read_buf(s, &len);
if (!str)
break;
LOG("len=[%d],recvmsg=[%s]\n",len, str);
ustream_consume(s, len);
} while(1);
}
static void client_notify_state(struct ustream *s)
{
struct client *cl = container_of(s, struct client, s.stream);
if (!s->eof)
return;
ustream_free(s);
close(cl->s.fd.fd);
free(cl);
}
enum {
SYNC_ID,
SYNC_MAX
};
static const struct blobmsg_policy sync_policy[] = {
[SYNC_ID] = { .name = "id", .type = BLOBMSG_TYPE_INT32 },
};
static int test_sync_cb(struct ubus_context *ctx, struct ubus_object *obj,
struct ubus_request_data *req, const char *method,
struct blob_attr *msg)
{
struct blob_attr *tb[SYNC_MAX];
int ret;
int fds[2];
struct client *cl= NULL;
LOG("Enter\n");
blobmsg_parse(sync_policy, SYNC_MAX, tb, blob_data(msg), blob_len(msg));
if (!tb[SYNC_ID])
return UBUS_STATUS_INVALID_ARGUMENT;
LOG("[server] recv client id:%08x\n", blobmsg_get_u32(tb[SYNC_ID]));
cl = calloc(1, sizeof(*cl));
if (pipe(fds) == -1) {
fprintf(stderr, "failed to create pipe: %m\n");
return -1;
}
ubus_request_set_fd(ctx, req, fds[1]);//write
cl->s.stream.string_data = true;
cl->s.stream.notify_read = client_notify_read;
cl->s.stream.notify_state = client_notify_state;
cl->fd = fds[0];
ustream_fd_init(&cl->s, fds[0]);//read
LOG("Exit\n");
return 0;
}
static const struct ubus_method test_methods[] = {
UBUS_METHOD("test_sync", test_sync_cb, sync_policy),
};
static struct ubus_object_type test_object_type =
UBUS_OBJECT_TYPE("test", test_methods);
static struct ubus_object test_object = {
.name = "test",
.type = &test_object_type,
.methods = test_methods,
.n_methods = ARRAY_SIZE(test_methods),
};
static int server_main(void)
{
int ret;
uloop_init();
ctx = ubus_connect(NULL);
if (!ctx) {
LOG("Failed to connect to ubus\n");
return -1;
}
ubus_add_uloop(ctx);
ret = ubus_add_object(ctx, &test_object);
if (ret) {
LOG("Failed to add object: %s\n", ubus_strerror(ret));
return ret;
}
uloop_run();
ubus_free(ctx);
uloop_done();
}
int main(int argc, char **argv)
{
server_main();
return 0;
}
client 逻辑:
1.调用test method,在test_client_fd_cb中会收到server端发送过来的pipe写端fd(这部分利用的是unix socket直接传送文件描述符的机制)
2.创建一个ustream对象,并且将ustream 与 pipe 的写端fd关联,此fd相当于输出流
3.创建uloop_timer每3s向ustream 里面写一次数据
#include <sys/time.h>
#include <unistd.h>
#include <ustream.h>
#include "libubus.h"
static struct ubus_context *ctx;
static struct blob_buf b;
static struct ustream_fd test_fd;
static char buf[128] = {0};
static unsigned long count = 0;
#define LOG(f,...) do {printf("[%s][%d] " f " ",__FUNCTION__,__LINE__,## __VA_ARGS__);}while(0)
static void test_utimer_cb(struct uloop_timeout *timeout)
{
int ret = 0;
LOG("Enter\n");
sprintf(buf,"hello server %ld", count);
ret = ustream_write(&test_fd.stream,buf,strlen(buf)+1, false);
if (ret < 0)
fprintf(stderr, "ustream_write error\n");
uloop_timeout_set(timeout, 3000);
count++;
}
static struct uloop_timeout u_timer = {
.cb = test_utimer_cb,
};
static void client_notify_write(struct ustream *s, int bytes)
{
fprintf(stderr, "Wrote %d bytes, pending: %d\n", bytes, s->w.data_bytes);
}
static void client_notify_state(struct ustream *s)
{
if (!s->eof)
return;
ustream_free(s);
close(test_fd.fd.fd);
}
static void test_client_fd_cb(struct ubus_request *req, int fd)
{
LOG("Enter\n");
memset(&test_fd, 0, sizeof(test_fd));
test_fd.stream.notify_write = client_notify_write;
test_fd.stream.notify_state = client_notify_state;
ustream_fd_init(&test_fd, fd);
uloop_timeout_set(&u_timer, 2000);
}
static void client_invoke_async(void)
{
static struct ubus_request req;
uint32_t id;
int ret;
char *msg = "this is async invoke";
LOG("Enter\n");
if (ubus_lookup_id(ctx, "test", &id))
{
LOG("Failed to look up test object\n");
return;
}
blob_buf_init(&b, 0);
blobmsg_add_u32(&b, "id", id);
ubus_invoke_async(ctx, id, "test_sync", b.head, &req);
req.fd_cb = test_client_fd_cb;
ubus_complete_request_async(ctx, &req);
LOG("Exit\n");
}
static int client_main(void)
{
uloop_init();
ctx = ubus_connect(NULL);
if (!ctx) {
LOG("Failed to connect to ubus\n");
return -1;
}
ubus_add_uloop(ctx);
client_invoke_async(); //sync
uloop_run();
ubus_free(ctx);
uloop_done();
}
int main(int argc, char **argv)
{
client_main();
return 0;
}
上述示例中,client 和 server 2个进程中分别使用了ustream来管理输入或者输出流,当然ustream也支持在同一个进程中同时管理输入和输出流,由于篇幅的关系,可以自行参考libubox源码的 libubox/examples/ustream-example.c 文件。
参考
OpenWrt:libubox之ustream