背景:
一个接口出现了SQL注入,条件查询场景下出现,形如下图
解决问题时,我们先要问,什么是SQL注入?
下面的资料有助于针对SQL注入是什么、如何验证SQL注入解决成功了,提供一些思路,建议一看,花费时间2-4min
Flask:SQLAlchemy查询是否容易受到注入攻击|极客教程 (geek-docs.com)
从0到1,SQL注入(sql十大注入类型)收藏这一篇就够了,技术解析与实战演练 - FreeBuf网络安全行业门户
过程:
此前SQL注入的错误的代码如下,like '%{0}%'这个写法就很有问题,这个代码的含义与直接使用+号来拼接sql字符串造成的SQL注入,感觉没有什么区别,我们即将对这个代码进行修复
currentPage = util.params(request=request, key='page', default=1, type=int)
limit = util.params(request=request, key='limit', default=20, type=int)
name = util.params(request=request, key='name', default=0)
code = util.params(request=request, key='code', default=0)
water_id = util.params(request=request, key='water_id', default=0)
filter_list = []
filter_list.append("water_name !='{}'".format('测试'))
if name:
filter_list.append("station_name like '%{0}%' or outfall_name like '%{0}%'".format(name))
if code:
filter_list.append("station_code like '%{0}%' or outfall_code like '%{0}%'".format(name))
if water_id:
filter_list.append("water_id = {}".format(water_id))
filter_str = " and ".join(filter_list)
if filter_list:
filter_str = "where " + filter_str
try:
sql_str = "select * from v_monitor_point {} order by water_id offset {} rows fetch next {} rows only".format(
filter_str, (currentPage - 1) * limit, limit)
count_sql = "select count(point_id) from v_monitor_point {}".format(filter_str)
monitor_point_list = db.session.execute(text(sql_str)).fetchall()
count = db.session.execute(text(count_sql)).fetchone()
count = count[0] if count else 0
except Exception:
error_info = traceback.format_exc()
logger_error.error(error_info)
return res.ResponseJson(res.RetCode.DB_ERROR)
info_list = []
facility_sn_data = db.session.query(FacilitySn.id, FacilitySn.facility_sn, FacilitySn.facility_code).all()
思路如使用ORM的查询过滤器,如这种,但是这种在我的背景代码里面不适用,我考虑使用参数化查询,如:占位符
username = request.args.get('username')
users = User.query.filter(or_(User.username == username, User.email == username)).all()
具体的使用方案思路如下,即使用参数化查询
修改成功的demo
currentPage = util.params(request=request, key='page', default=1, type=int)
limit = util.params(request=request, key='limit', default=20, type=int)
name = util.params(request=request, key='name', default=0)
code = util.params(request=request, key='code', default=0)
water_id = util.params(request=request, key='water_id', default=0)
filter_list = []
params = {}
# 动态条件
if name:
filter_list.append("station_name like :name or outfall_name like :name")
params['name'] = f"%{name}%"
if code:
filter_list.append("station_code like :code or outfall_code like :code")
params['code'] = f"%{code}%"
if water_id:
filter_list.append("water_id = :water_id")
params['water_id'] = water_id
filter_str = " and ".join(filter_list)
if filter_list:
filter_str = "where " + filter_str
# SQL 查询字符串
sql_str = f"select * from v_monitor_point {filter_str} order by water_id offset :offset rows fetch next :limit rows only"
count_sql = f"select count(point_id) from v_monitor_point {filter_str}"
try:
# 执行查询
monitor_point_list = db.session.execute(text(sql_str), {
**params,
'offset': (currentPage - 1) * limit,
'limit': limit
}).fetchall()
count = db.session.execute(text(count_sql), params).fetchone()
count = count[0] if count else 0
except Exception:
error_info = traceback.format_exc()
logger_error.error(error_info)
return res.ResponseJson(res.RetCode.DB_ERROR)
# 其他处理代码
info_list = []
facility_sn_data = db.session.query(FacilitySn.id, FacilitySn.facility_sn, FacilitySn.facility_code).all()
修复成功的截图,这个截图证明SQL注入初步修复成功
实际上我反馈给了专业的网安同学,请他帮忙验证了一下修改是否成功,结果修改是成功的
代码中哪些写法是针对SQL注入的?
text中的:写法防止SQL注入的原理
找网安核对SQL注入修改是否成功时的聊天记录,大家随便看看,主要看重点图!:
重点图!!!
博主更新动力:
欢迎大家点赞、收藏、关注、评论、批评啦