1、流表数据结构的初始化:
流表的初始化在FlowInitConfig函数中,函数调用栈如下:
main
-->SuricataMain
-->PostConfLoadedSetup
-->PreRunInit
-->FlowInitConfig(流相关数据结构初始化)
-->FlowQueueInit(flow_recycle_q流回收队列初始化)
-->FlowSparePoolInit(flow_spare_pool流节点预分配链表初始化)
-->FlowInitFlowProto(设置不同协议不同状态的超时时间)
FlowInitConfig主要是读取配置,对流表的配置结构体FlowConfig flow_config进行初始化、flow_recycle_q流回收队列初始化、flow_spare_pool流节点预分配链表初始化以及给不同协议不同状态设置不同的超时时间。
flow_spare_pool流节点预分配链表会根据配置事先分配一定数量的flow节点,这些flow节点按比例挂在flow_spare_pool链表的每个节点上。创建流表时,先优先从这些节点上获取flow。
2、收包线程FlowWorker模块对流表的处理:
收包线程FlowWorker模块对流表进行处理的函数调用栈如下:
FlowWorker
-->FlowHandlePacket
-->FlowGetFlowFromHash
-->FlowGetNew(新建流)
-->FlowQueuePrivateGetFromTop(从flow_spare_pool中申请流)
-->FlowAlloc(flow_spare_pool不够,直接alloc申请)
-->MoveToWorkQueue(已有流超时的,从哈希桶中删除,并放入work_queue或者evicted链表)
-->FlowWorkerProcessInjectedFlows(取出flow_queue中的flow放入到work_queue)
-->FlowWorkerProcessLocalFlows
-->CheckWorkQueue
-->FlowClearMemory(清除流信息)
-->FlowSparePoolReturnFlow(将流归还到flow_spare_pool中)
流表的哈希表本身并不复杂,所有的流节点都是通过全局的FlowBucket *flow_hash哈希桶管理起来的,哈希桶的大小就是FlowInitConfig的hash_size,收包线程收到报文后,根据报文的流的哈希值(以五元组+vlan为key),先查流表,如果没有就创建流表项,如果有就找到对应的流节点。
这里面复杂的几个地方如下:
1)如果包对应的哈希链没有节点,需要获取流节点:先从上面讲到的flow_spare_pool链表中获取,获取不到就看能不能重用,不能重用就直接调用alloc自己创建一个节点。
2)如果哈希链有节点,则遍历链,看有没有超时的,超时就从哈希链中删除,放入work_queue或者evicted链表
3)从flow_queue中取出flow放入到work_queue,flow_queue是FlowManager线程处理时,发现的超时流需要重组时放入的
4)检查work_queue,清楚流节点信息,将流归还到flow_spare_pool链表中。
关键代码加注释如下:
Flow *FlowGetFlowFromHash(ThreadVars *tv, FlowLookupStruct *fls, Packet *p, Flow **dest)
{
Flow *f = NULL;
/* get our hash bucket and lock it */
const uint32_t hash = p->flow_hash; //Decode协议解码时已经计算好的哈希值
FlowBucket *fb = &flow_hash[hash % flow_config.hash_size]; //查找到对应的哈希链
FBLOCK_LOCK(fb); //每个哈希链一个锁
SCLogDebug("fb %p fb->head %p", fb, fb->head);
/* see if the bucket already has a flow */
if (fb->head == NULL) { //哈希链为空
f = FlowGetNew(tv, fls, p); //新建流
if (f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
/* flow is locked */
fb->head = f;
/* got one, now lock, initialize and return */
FlowInit(f, p);
f->flow_hash = hash;
f->fb = fb;
FlowUpdateState(f, FLOW_STATE_NEW);
FlowReference(dest, f); //给包的流指针赋值
FBLOCK_UNLOCK(fb);
return f;
}
const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0;
const uint32_t fb_nextts = !emerg ? SC_ATOMIC_GET(fb->next_ts) : 0;
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
Flow *prev_f = NULL; /* previous flow */
f = fb->head;
do { //遍历哈希桶fb
Flow *next_f = NULL; //先判断每个flow节点是否超时
const bool timedout = (fb_nextts < (uint32_t)SCTIME_SECS(p->ts) &&
FlowIsTimedOut(f, (uint32_t)SCTIME_SECS(p->ts), emerg));
if (timedout) {
FLOWLOCK_WRLOCK(f);
next_f = f->next; //超时Flow从Flow哈希表中移除
MoveToWorkQueue(tv, fls, fb, f, prev_f);
FLOWLOCK_UNLOCK(f);
goto flow_removed;
} else if (FlowCompare(f, p) != 0) { //查找流节点
FLOWLOCK_WRLOCK(f);
/* found a matching flow that is not timed out */
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
Flow *new_f = TcpReuseReplace(tv, fls, fb, f, hash, p);
if (prev_f == NULL) /* if we have no prev it means new_f is now our prev */
prev_f = new_f;
MoveToWorkQueue(tv, fls, fb, f, prev_f); /* evict old flow */
FLOWLOCK_UNLOCK(f); /* unlock old replaced flow */
if (new_f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
f = new_f;
}
FlowReference(dest, f); //给包的流节点赋值
FBLOCK_UNLOCK(fb);
return f; /* return w/o releasing flow lock */ //流节点已存在
}
/* unless we removed 'f', prev_f needs to point to
* current 'f' when adding a new flow below. */
prev_f = f;
next_f = f->next;
flow_removed:
if (next_f == NULL) { //哈希链中没有找到该包对应的flow
f = FlowGetNew(tv, fls, p);
if (f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
/* flow is locked */
f->next = fb->head;
fb->head = f;
/* initialize and return */
FlowInit(f, p);
f->flow_hash = hash;
f->fb = fb;
FlowUpdateState(f, FLOW_STATE_NEW);
FlowReference(dest, f);
FBLOCK_UNLOCK(fb);
return f;
}
f = next_f;
} while (f != NULL);
/* should be unreachable */
BUG_ON(1);
return NULL;
}
/**
* \brief Get a new flow
*
* Get a new flow. We're checking memcap first and will try to make room
* if the memcap is reached.
*
* \param tv thread vars
* \param fls lookup support vars
*
* \retval f *LOCKED* flow on succes, NULL on error.
*/
static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, Packet *p)
{
const bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
#ifdef DEBUG
if (g_eps_flow_memcap != UINT64_MAX && g_eps_flow_memcap == p->pcap_cnt) {
return NULL;
}
#endif
if (FlowCreateCheck(p, emerg) == 0) {
return NULL;
}
/* get a flow from the spare queue */
Flow *f = FlowQueuePrivateGetFromTop(&fls->spare_queue); //先线程的spare_queue中申请
if (f == NULL) {
f = FlowSpareSync(tv, fls, p, emerg); //从全局flow_spare_pool中申请
}
if (f == NULL) { //检查流表分配空间是否使用完
/* If we reached the max memcap, we get a used flow */
if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) {
/* declare state of emergency */
if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
FlowTimeoutsEmergency(); //流表空间已使用完,进入紧急模式,回收存在时间较长的一些流表
FlowWakeupFlowManagerThread(); //启动老化线程
}
//找到某个索引项下面存在时长最长的那个流,复用其内存(引用计数为0)
f = FlowGetUsedFlow(tv, fls->dtv, p->ts);
if (f == NULL) {
NoFlowHandleIPS(p);
return NULL;
}
#if 0
if (tv != NULL && fls->dtv != NULL) {
#endif
StatsIncr(tv, fls->dtv->counter_flow_get_used);
#if 0
}
#endif
/* flow is still locked from FlowGetUsedFlow() */
FlowUpdateCounter(tv, fls->dtv, p->proto);
return f;
}
/* now see if we can alloc a new flow */
f = FlowAlloc(); //流表空间没有使用完,直接申请
if (f == NULL) {
#if 0
if (tv != NULL && fls->dtv != NULL) {
#endif
StatsIncr(tv, fls->dtv->counter_flow_memcap);
#if 0
}
#endif
NoFlowHandleIPS(p);
return NULL;
}
/* flow is initialized but *unlocked* */
} else {
/* flow has been recycled before it went into the spare queue */
/* flow is initialized (recylced) but *unlocked* */
}
FLOWLOCK_WRLOCK(f);
FlowUpdateCounter(tv, fls->dtv, p->proto);
return f;
}
3、FlowManager线程对流表的处理:
FlowManager线程进行流表处理的函数调用栈如下:
FlowManager
-->FlowTimeoutHash(遍历指定范围哈希表)
-->FlowManagerHashRowTimeout
-->FlowManagerFlowTimeout(检查流老化时间)
-->FlowQueuePrivateAppendFlow(超时流放入Aside_queue临时队列)
-->FlowManagerHashRowClearEvictedList(遍历Evicted链表)
-->FlowQueuePrivateAppendFlow(老化流放入Aside_queue临时队列)
-->ProcessAsideQueue(Aside_queue临时队列出队,超时流需要重组的,放入tv->flow_queue,其它放入flow_recycle_q全局回收队列)
FlowManager线程的主要任务是检查流是否有超时流需要老化删除,如果有,就放入Aside_queue临时队列,另外在收包线程中也将超时的流放入到Aside_queue临时队列了,所以要一起处理一下,处理的结果就是将超时流放入到flow_recycle_q全局回收队列中,但是超时流需要重组的,要放入tv->flow_queue中,这个tv->flow_queue,在收包线程FlowWorker模块中,又会从中把流取出来放入到work_queue中(复杂的地方就在这里,涉及到多个线程操作几个队列或者链表,你放我取,你取我放)
关键代码加注释如下:
/**
* \brief time out flows from the hash
*
* \param ts timestamp
* \param hash_min min hash index to consider
* \param hash_max max hash index to consider
* \param counters ptr to FlowTimeoutCounters structure
*
* \retval cnt number of timed out flow
*/
static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
const uint32_t hash_max, FlowTimeoutCounters *counters)
{
uint32_t cnt = 0;
const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
const uint32_t rows_checked = hash_max - hash_min;
uint32_t rows_skipped = 0;
uint32_t rows_empty = 0;
#if __WORDSIZE==64
#define BITS 64
#define TYPE uint64_t
#else
#define BITS 32
#define TYPE uint32_t
#endif
const uint32_t ts_secs = SCTIME_SECS(ts);
for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
TYPE check_bits = 0;
const uint32_t check = MIN(BITS, (hash_max - idx));
for (uint32_t i = 0; i < check; i++) {
FlowBucket *fb = &flow_hash[idx+i]; //遍历指定范围哈希桶
check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
<< (TYPE)i;
}
if (check_bits == 0)
continue;
for (uint32_t i = 0; i < check; i++) {
FlowBucket *fb = &flow_hash[idx+i];
if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
FBLOCK_LOCK(fb);
Flow *evicted = NULL;
if (fb->evicted != NULL || fb->head != NULL) {
if (fb->evicted != NULL) {
/* transfer out of bucket so we can do additional work outside
* of the bucket lock */
evicted = fb->evicted;
fb->evicted = NULL;
}
if (fb->head != NULL) {
uint32_t next_ts = 0;
FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts); //超时处理,超时的流放到td->aside_queue 这个队列
if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
SC_ATOMIC_SET(fb->next_ts, next_ts);
}
if (fb->evicted == NULL && fb->head == NULL) {
SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
}
} else {
SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
rows_empty++;
}
FBLOCK_UNLOCK(fb);
/* processed evicted list */
if (evicted) {
FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);//处理evicted链的超时流
}
} else {
rows_skipped++;
}
}
if (td->aside_queue.len) {
cnt += ProcessAsideQueue(td, counters); //处理AsideQueue队列中已经超时的流
}
}
counters->rows_checked += rows_checked;
counters->rows_skipped += rows_skipped;
counters->rows_empty += rows_empty;
if (td->aside_queue.len) {
cnt += ProcessAsideQueue(td, counters);
}
counters->flows_removed += cnt;
/* coverity[missing_unlock : FALSE] */
return cnt;
}
//处理超时的流
static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
{
FlowQueuePrivate recycle = { NULL, NULL, 0 };
counters->flows_aside += td->aside_queue.len;
uint32_t cnt = 0;
Flow *f; //Aside队列出队
while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
/* flow is still locked */
//超时流需要重组
if (f->proto == IPPROTO_TCP && !(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
!FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
/* Send the flow to its thread */
FlowForceReassemblyForFlow(f);//把flow放入原线程的tv->flow_queue队列中
FLOWLOCK_UNLOCK(f);
/* flow ownership is passed to the worker thread */
counters->flows_aside_needs_work++;
continue;
}
FLOWLOCK_UNLOCK(f);
FlowQueuePrivateAppendFlow(&recycle, f);
if (recycle.len == 100) {
FlowQueueAppendPrivate(&flow_recycle_q, &recycle);//放入回收线程的回收队列flow_recycle_q,每100个放一次
FlowWakeupFlowRecyclerThread();
}
cnt++;
}
if (recycle.len) {
FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
FlowWakeupFlowRecyclerThread();
}
return cnt;
}
4、FlowRecycler线程对流表的处理:
FlowRecycler线程的函数调用栈如下:
FlowRecycler
-->FlowQueuePrivateGetFromTop(flow_recycle_q队列出队)
-->Recycler
-->FlowClearMemory(清除流信息)
-->FlowSparePoolReturnFlow(将流归还到flow_spare_pool中)
FlowRecycler线程对流表的处理相对简单,就是将flow_recycle_q全局流回收队列的流出队,清除流信息,然后归还到flow_spare_pool流预分配链表中,方便后面的流使用。
关键代码加注释如下:
static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
{
FLOWLOCK_WRLOCK(f);
(void)OutputFlowLog(tv, ftd->output_thread_data, f);
FlowEndCountersUpdate(tv, &ftd->fec, f);
if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
StatsDecr(tv, ftd->counter_tcp_active_sessions);
}
StatsDecr(tv, ftd->counter_flow_active);
FlowClearMemory(f, f->protomap); //清除流信息
FLOWLOCK_UNLOCK(f);
FlowSparePoolReturnFlow(f); //放入流备用队列
}
5、其它说明:
1)就是当流表内存不够用时,会有一个紧急状态机制FLOW_EMERGENCY,这个时候不同协议以及不同流状态的老化时间会大大的缩短,加速流表老化,以腾出更多的内存空间为新流所用,例如紧急状态下TCP流的初建阶段的老化时长默认值为FLOW_IPPROTO_TCP_EMERG_NEW_TIMEOUT 10秒,而对于TCP握手完成之后的流老化时长默认为FLOW_IPPROTO_TCP_EMERG_EST_TIMEOUT 100秒。
2)前面说过FlowManager和FlowRecycler线程个数是可以配置的,如果配置多个线程,在遍历FlowBucket *flow_hash哈希桶时,在FlowManagerThreadInit函数中是有一个计算方法的,就是每个线程只遍历其中的一部分哈希链。
3)Flow节点的结构体加注释如下:
/**
* \brief Flow data structure.
*
* The flow is a global data structure that is created for new packets of a
* flow and then looked up for the following packets of a flow.
*
* Locking
*
* The flow is updated/used by multiple packets at the same time. This is why
* there is a flow-mutex. It's a mutex and not a spinlock because some
* operations on the flow can be quite expensive, thus spinning would be
* too expensive.
*
* The flow "header" (addresses, ports, proto, recursion level) are static
* after the initialization and remain read-only throughout the entire live
* of a flow. This is why we can access those without protection of the lock.
*/
typedef struct Flow_
{
/* flow "header", used for hashing and flow lookup. Static after init,
* so safe to look at without lock */
FlowAddress src, dst; //源目的IP
union {
Port sp; /**< tcp/udp source port */
struct {
uint8_t type; /**< icmp type */
uint8_t code; /**< icmp code */
} icmp_s;
struct {
uint32_t spi; /**< esp spi */
} esp;
};
union {
Port dp; /**< tcp/udp destination port */
struct {
uint8_t type; /**< icmp type */
uint8_t code; /**< icmp code */
} icmp_d;
};
uint8_t proto; //传输层协议号
uint8_t recursion_level;
uint16_t vlan_id[2]; //两层vlan
uint8_t vlan_idx; //vlan的层数
/* track toserver/toclient flow timeout needs */
union {
struct {
uint8_t ffr_ts:4;
uint8_t ffr_tc:4;
};
uint8_t ffr;
};
/** timestamp in seconds of the moment this flow will timeout
* according to the timeout policy. Does *not* take emergency
* mode into account. */
uint32_t timeout_at;
/** Thread ID for the stream/detect portion of this flow */
FlowThreadId thread_id[2];
struct Flow_ *next; /* (hash) list next */ //下一个flow节点
/** Incoming interface */
struct LiveDevice_ *livedev; //接口信息(第一个发起的报文的接口)
/** flow hash - the flow hash before hash table size mod. */
uint32_t flow_hash; //流的哈希值
/** timeout policy value in seconds to add to the lastts.tv_sec
* when a packet has been received. */
uint32_t timeout_policy;
/* time stamp of last update (last packet). Set/updated under the
* flow and flow hash row locks, safe to read under either the
* flow lock or flow hash row lock. */
SCTime_t lastts; //流的最后一个包的时间
FlowStateType flow_state; //流的状态 FlowState
/** flow tenant id, used to setup flow timeout and stream pseudo
* packets with the correct tenant id set */
uint32_t tenant_id;
uint32_t probing_parser_toserver_alproto_masks;
uint32_t probing_parser_toclient_alproto_masks;
uint32_t flags; //ipv4/ipv6标记 /**< generic flags */
uint16_t file_flags; /**< file tracking/extraction flags */
/** destination port to be used in protocol detection. This is meant
* for use with STARTTLS and HTTP CONNECT detection */
uint16_t protodetect_dp; /**< 0 if not used */
/* Parent flow id for protocol like ftp */
int64_t parent_id; //父流节点,例如ftp-data的控制流的信息
#ifdef FLOWLOCK_RWLOCK
SCRWLock r;
#elif defined FLOWLOCK_MUTEX
SCMutex m;
#else
#error Enable FLOWLOCK_RWLOCK or FLOWLOCK_MUTEX
#endif
/** protocol specific data pointer, e.g. for TcpSession */
void *protoctx; //TcpSession
/** mapping to Flow's protocol specific protocols for timeouts
and state and free functions. */
uint8_t protomap; //FLOW_PROTO_MAX 传输层协议id
uint8_t flow_end_flags;
/* coccinelle: Flow:flow_end_flags:FLOW_END_FLAG_ */
AppProto alproto; /**< \brief application level protocol */ //应用id enum AppProtoEnum
AppProto alproto_ts;
AppProto alproto_tc;
/** original application level protocol. Used to indicate the previous
protocol when changing to another protocol , e.g. with STARTTLS. */
AppProto alproto_orig;
/** expected app protocol: used in protocol change/upgrade like in
* STARTTLS. */
AppProto alproto_expect;
/** detection engine ctx version used to inspect this flow. Set at initial
* inspection. If it doesn't match the currently in use de_ctx, the
* stored sgh ptrs are reset. */
uint32_t de_ctx_version; //检测引擎规则版本,检测规则变化,会导致流重新匹配规则
/** ttl tracking */
uint8_t min_ttl_toserver;
uint8_t max_ttl_toserver;
uint8_t min_ttl_toclient;
uint8_t max_ttl_toclient;
/** application level storage ptrs.
*
*/
AppLayerParserState *alparser; /**< parser internal state */
void *alstate; /**< application layer state */
/** toclient sgh for this flow. Only use when FLOW_SGH_TOCLIENT flow flag
* has been set. */
const struct SigGroupHead_ *sgh_toclient;
/** toserver sgh for this flow. Only use when FLOW_SGH_TOSERVER flow flag
* has been set. */
const struct SigGroupHead_ *sgh_toserver;
/* pointer to the var list */
GenericVar *flowvar;
struct FlowBucket_ *fb; //流所在的哈希桶的指针
SCTime_t startts; //流创建的时间
//流量统计
uint32_t todstpktcnt; //到目的IP的包数统计
uint32_t tosrcpktcnt; //到源IP的包数统计
uint64_t todstbytecnt; //到目的IP的字节数统计
uint64_t tosrcbytecnt; //到源IP的字节数统计
} Flow;
4)加了一个查看flow节点信息的命令:flows-list,看看效果:
流节点格式为:流ID编号)源IP:源端口 --> 目的IP:目的端口,传输层协议号,到目的方向包数:到目的方向字节数<-->到源方向包数:到源方向字节数,流首包的网口的pcie地址
6、结尾:
好了,关于suricata的flow流管理分析就到这里了,目前只能是一个初步分析,细节上理解还不够,其实要真正理解的是作者的设计思想,就是他为什么要这样去设计,这样设计的好处是什么,这个太难了!
有问题或者需要自定义命令源码的朋友,可以进网络技术开发交流群提问(先加我wx,备注加群)。喜欢文章内容的朋友,加个关注呗~~