本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/119377944
前面分享过 接口返回值 和 表记录 的校验 、 导出的CSV、Excel文件 和 表记录 的校验,最近 我们项目常常用到Redis,也得对缓存做校验,分享下;
【实际这篇博客推迟发布N个月】
个人博客:https://blog.csdn.net/zyooooxie
【以下所有内容仅为个人项目经历,如有不同,纯属正常】
需求
日常工作中,我这边刷缓存的操作大概2种: 某些新建、删除操作 刷新相关的缓存、定时Job跑 刷全部缓存;本期讲得是 第二种:大批量的key,如何来和表记录做校验。
拿到 相关缓存的 name、value
设计重点:
- SCAN命令;
- key不同类型时,如何获取 value;
- 只要key的name时 【need_keyName=True】,返回的是 {key_name1:None, key_name2:None} ;要key的value时 【need_keyName=False】,返回的是 {key_name1:value1, key_name2:value2};
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
"""
def get_key_useSCAN(key_name: str, count: int = 5000, key_type: str = 'string', need_keyName: bool = False) -> dict:
"""
:param key_name:
:param count: scan 让用户告知迭代命令, 在每次迭代中应该从数据集里返回多少元素
:param key_type:
:param need_keyName:只需要name,不需要value
:return:只要key的name时,返回的是 {key_name1:None, key_name2:None} ;要key的value时,返回的是 {key_name1:value1, key_name2:value2}
"""
key_name = '*{}*'.format(key_name)
Log.info(key_name)
host, port, pwd = host1, port1, pwd1
master_list = get_master_list(host=host, port=port, pwd=pwd)
res_dict = dict()
for ml in master_list:
host_m, port_m = ml.split(":")
Log.debug('{}, {}'.format(host_m, port_m))
rc = redis_connect(host_m, port_m, pwd)
cursor = 0
while True:
cursor, data = rc.scan(cursor, key_name, count=count)
Log.debug('游标:{},长度:{}'.format(cursor, len(data)))
# Log.debug(data)
if need_keyName:
data = dict.fromkeys(data)
res_dict.update(data)
else:
for d in data:
value = rc.get(d) if key_type == 'string' else (rc.hgetall(d) if key_type == 'hash' else
(rc.lrange(d, 0, -1) if key_type == 'list'
else rc.smembers(d)))
# Log.debug(d)
res_dict.update({d: value})
if not bool(cursor):
break
rc.close()
# Log.debug(res_dict)
Log.info('redis中 相关key的数量是 {}'.format(len(res_dict)))
return res_dict
校验name
因为 chat* 搜出来的所有相关key的value 都是 ‘true’,故我把校验重点放在 key的name;
设计重点:
- sql执行后,拿到结果;
- 依据 mobile_or_union 传的值,手动构造 key_name;
- 返回的是 key_name的list;
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
"""
def get_key_useSQL_chatMobile_or_chatUnionId(mobile_or_union: str, db: Connection, cur: Cursor) -> list:
sql = """
select t2.union_id, group_concat(distinct t3.org_code),t2.mobile
from table_cgc t1
left join table_cgc_member t2
on t1.chat_id = t2.chat_id
left join table_cgc_org_relation t3
on t1.chat_id = t3.chat_id
where t1.delete_flag = 0
and t2.delete_flag = 0
and t3.delete_flag = 0
and t2.union_id is not null
{} and t2.mobile is not null
group by t2.union_id;
""".format('' if mobile_or_union == 'mobile' else '#')
data = fetch_sql(sql=sql, db_name='zy_db', db=db, cur=cur, fetch_mode='fetchall')
# Log.debug(data)
res_list = list()
for d in data:
mobile = d[-1]
union_id = d[0]
for oc in d[1].split(','):
if mobile_or_union == 'mobile':
res = ''.join(['chat:mobile:add:', oc, ':', mobile])
else:
res = ''.join(['chat:unionId:add:', oc, ':', union_id])
res_list.append(res)
# Log.debug(res_list)
Log.info('手动处理后,缓存的数量为 {}'.format(len(res_list)))
return res_list
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
"""
if __name__ == "__main__":
pass
db_m, cur_m = connect_db(db_name='zy_db')
# --------
# string类型
# 只对比 name
# # abc = get_key_useSQL_chatMobile_or_chatUnionId(db=db_m, cur=cur_m, mobile_or_union='mobile')
# abc = get_key_useSQL_chatMobile_or_chatUnionId(db=db_m, cur=cur_m, mobile_or_union='unionId')
# abc = dict.fromkeys(abc)
#
# # ABC = get_key_useSCAN('chat:mobile:add', need_keyName=True)
# ABC = get_key_useSCAN('chat:unionId:add', need_keyName=True)
#
# # Log.info('手动删掉的:{}'.format(ABC.popitem())) # 主动修改abc,看执行结果
#
# compare_dict_key(abc, ABC)
# compare_dict_key(ABC, abc)
# --------
close_db(db_m, cur_m)
校验name、value
设计重点:
- sql在zy_db执行后,拿到结果;取division_code字段值得到 division_code_list;
- 组成 data_dict;
- sql_ 在zyooooxie_db执行后,拿到结果;
- 根据实际key(hash类型)的all fields,生成res_dict;
- 返回的是 key:value的dict
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
"""
def get_key_useSQL_deptRegionInfo(db: Connection, cur: Cursor) -> dict:
sql = """
SELECT
re.region_id,
re.division_code,
GROUP_CONCAT(DISTINCT dept.user_id)
FROM
table_relation re
JOIN
table_dept dept
ON re.region_id = dept.department_id
WHERE
re.delete_flag = 0
AND dept.delete_flag = 0
GROUP BY
re.division_code;
"""
data = fetch_sql(sql=sql, db_name='zy_db', db=db, cur=cur, fetch_mode='fetchall')
# Log.debug(data)
division_code_list = [d[1] for d in data]
data_dict = {d[1]: (d[0], d[-1]) for d in data}
Log.debug(data_dict)
if len(division_code_list) == 1:
division_code = "('{}')".format(division_code_list[0])
elif len(division_code_list) == 0:
raise Exception('长度为空-查不到数据')
else:
division_code = tuple(division_code_list)
sql_ = """
SELECT dept_code, division_code FROM table_zddi WHERE division_code IN {} ;
""".format(division_code)
data_ = fetch_sql(sql=sql_, db_name='zyooooxie_db', fetch_mode='fetchall')
# Log.debug(data_)
res_dict = dict()
for d_ in data_:
dept_code = d_[0]
key = 'dept:region:info:{}'.format(dept_code)
value = data_dict.get(d_[-1])
new_value = {'regionId': value[0], 'userId': value[1]}
res_dict.update({key: new_value})
# Log.debug(res_dict)
Log.info('手动处理后,缓存的数量为 {}'.format(len(res_dict)))
return res_dict
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
"""
if __name__ == "__main__":
pass
db_m, cur_m = connect_db(db_name='zy_db')
# --------
# # hash类型
# # 对比 name、value
abc = get_key_useSQL_deptRegionInfo(db=db_m, cur=cur_m)
ABC = get_key_useSCAN('dept:region:info', key_type='hash')
delete_ = ABC.popitem()
Log.info(delete_)
ABC.update({delete_[0]: 'test'}) # 主动修改ABC,看执行结果
compare_dict_key_value(abc, ABC)
compare_dict_key_value(ABC, abc)
# --------
close_db(db_m, cur_m)
公共方法:元素比较
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
"""
def compare_dict_key(dict1: Union[dict, list], dict2: Union[dict, list]) -> list:
"""
:param dict1:
:param dict2:
:return:
"""
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
Log.info('手动转换类型')
dict1 = dict.fromkeys(dict1)
dict2 = dict.fromkeys(dict2)
not_in_list = list()
for key in dict1:
if key not in dict2:
not_in_list.append(key)
Log.info(not_in_list)
Log.info(len(not_in_list))
return not_in_list
def compare_dict_key_value(dict1: dict, dict2: dict) -> dict:
"""
:param dict1:
:param dict2:
:return:
"""
not_in_dict = dict()
for key, value in dict1.items():
if key not in dict2 or value != dict2.get(key):
not_in_dict.update({key: value})
Log.info(not_in_dict)
Log.info(len(not_in_dict))
return not_in_dict
本文链接:https://blog.csdn.net/zyooooxie/article/details/119377944
个人博客 https://blog.csdn.net/zyooooxie