需求背景
以小时为周期不停地上报事件到事件平台
,事件平台如果在连续2个周期 没有检测到上报的事件,就会发送告警给事件的相关责任人.
问题的难点在于如何检测连续周期内无数据?
如上图,2 点和 3 点,都没有上报数据,说明连续两个周期存在无数据上报.
解决方案
本文采用 redis 的有序集合实现此算法.
1. 假设
先做一些假设
- 事件需要有类型属性,每种类型的事件有唯一一个上报渠道 ID
,参数为data_id
,每个上报渠道只有一种事件类型. - 上报的每个事件都有一个hash值,参数为
hash_value
,相同 hash 值的所有事件当做一条时间序列.连续周期无数据检测针对的是一条时间序列进行处理.
2. 记录事件的hash 值到 redis 集合
事件平台每收到一个事件,就将事件的 hash 值hash_value
记录在 redis 的 set 中,示例代码如下
from django.core.cache import cache as redis_cache
data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
hash_value = "7dcfd8f2-6feb-11ee-a73d-acde48001122"
redis_key = data_id
redis_client = redis_cache.client.get_client()
redis_client.sadd(redis_key, hash_value)
3. 记录事件的上报时间到 redis 有序集合
事件平台每收到一个事件,就将上报时间戳作为 score,记录到 redis 的 sorted set 中,示例代码如下
from django.core.cache import cache as redis_cache
data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
hash_value = "7dcfd8f2-6feb-11ee-a73d-acde48001122"
# 假设事件的值为
event = {
"hash_value": hash_value,
"data_id": data_id,
"data": {
"key1": "value1",
"key2": "value2"
}
}
redis_key = f"{data_id}:{hash_value}"
score = int(time.time())
redis_client = redis_cache.client.get_client()
redis_client.zadd(redis_key, {value: score})
使用有序集合,就可以构造出一个分数为时间戳的时间序列.
4. 检测无数据周期
需要开启一个定时任务,这里假设5 分钟检测一次时间序列.每次检测指定时间范围内事件的数量.
from datetime import timezone
from django.core.cache import cache as redis_cache
from django.utils import timezone
data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
hash_value = "7dcfd8f2-6feb-11ee-a73d-acde48001122"
# 检测周期 1 小时
interval_seconds = 3600
# 无数据周期数
cycle_number = 2
# 计算无数据检查时间范围
period_end_time = timezone.now()
period_start_time = period_end_time - timedelta(seconds=interval_seconds * cycle_number)
# 计算分数区间
period_start_score = int(period_start_time.timestamp())
period_end_score = int(period_end_time.timestamp())
# 获得周期范围内的事件数量
redis_key = f"{data_id}:{hash_value}"
redis_client = redis_cache.client.get_client()
count = redis_client.zcount(redis_key, period_start_score, period_end_score)
如果count
为 0,说明在连续周期内无数据.
5. 删除历史数据
由于是周期性事件上报,时间久了 set 和 sorted set 的值就会越来越大,导致 redis 服务器内存告警.所以我们需要定期清理过期的数据.
from django.core.cache import cache as redis_cache
data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
redis_client = redis_cache.client.get_client()
# 获取所有的事件
redis_key = data_id
hash_value_set = redis_client.smembers(redis_key)
# 指定删除的时间区间
min_score = ""
max_score = ""
# 根据指定的时间取件删除有序集合中过期的数据
for hash_value in hash_value_set:
redis_key = f"{data_id}:{hash_value}"
redis_client.zremrangebyscore(redis_key, min_score, max_score)