Redis 是一个高性能的键值存储数据库,它能够在内存中快速读写数据,并且支持持久化到磁盘。它被广泛应用于缓存、队列、实时分析等场景。
一、启动redis服务器
要打开redis服务器,需要在终端中输入redis-server命令。确保已经安装了redis,并且路径已经正确配置。打开终端,输入redis-server命令并按下回车键,即可启动redis服务器。
在控制台输入:
redis-cli
来连接redis服务器:
二、redis 数据类型对应的操作
主要使用python来实现redis的相关操作
(一)、使用pip命令安装redis-py库:
pip install redis
安装完成后,可以在Python代码中导入redis库:
import redis
(二)、接下来,连接到Redis服务器:
# 创建Redis连接
client = redis.Redis(host='localhost', port=6379, db=0)
这里的host是Redis服务器的主机名,port是Redis服务器的端口号(默认为6379),db是使用的数据库编号(默认为0)。
(三)、键的操作
# 获取键的类型
print(client.type("key_string"))
# 设置有效期
client.set("key_string", 101)
client.expire("key_string", 100)
# 剩余时间
print(client.ttl("key_string"))
# 移除有效期
client.persist("key_string")
# 删除键
client.delete("key_string")
# 打印键
print(client.keys())
(四)、字符串的操作
# 添加字符串
client.set("id", 101)
# 获取字符串
print(client.get("id").decode())
# 连续添加字符串
client.mset({"name": "kurumi", "address": "youkohama"})
# 连续获取字符串
print(client.mget(["name", "address", "id"]))
# 获取字符串长度
print(client.strlen("name"))
# 设置字符串寿命
client.set("id", 101, ex=100)
# 获取字符串寿命
print(client.ttl("id"))
(五)、列表的操作
# 从列表左边删除
value = client.lpop("key_list")
print(value)
# 从列表右边删除
value = client.rpop("key_list")
print(value)
# 返回索引对应的元素
print(client.lindex("key_list", 1))
# 返回索引范围内的元素
print(client.lrange("key_list", 0, 3))
# 对原始数据使用索引切割
print(client.ltrim("key_list", 2, 3))
(六)、哈希的操作
client.hset("key_hash", "key1", "value1")
print(client.hget("key_hash", "key1"))
# 键值对的个数
print(client.hlen("key_hash"))
# 删除指定的键
client.hdel("key_hash", "key1")
# 返回所有键
print(client.hkeys("key_hash"))
# 返回所有值
print(client.hvals("key_hash"))
# 返回所有键值对
print(client.hgetall("key_hash"))
(七)、集合的操作
# 添加
client.sadd("key_set", 1, 2, 3, 2, 1)
# 删除指定值,没有不管
client.srem("key_set", 1, 2, 5, 7)
# 是否是成员
print(client.sismember("key_set", "3"))
# 所有成员
print(client.smembers("key_set"))
# 个数
print(client.scard("key_set"))
# 交集
print(client.sinter(["key_set", "key_set2"]))
# 并集
print(client.sunion(["key_set", "key_set2"]))
# 差集
print(client.sdiff(["key_set", "key_set2"]))
(八)、有序集合的操作
# 添加
client.zadd("key_zset", {"aaa": 15, "bbb": 20, "ccc": 30})
# 删除
client.zrem("key_zset", "aaa", "ccc")
# 个数
print(client.zcard("key_zest"))
# 根据值取权重
print(client.zscore("key_zset", "bbb"))
# 根据获取权重范围内个数
print(client.zcount("key_zset", 20, 25))
# 根据权重排序索引返回值
print(client.zrange("key_set", 0, 1))
三、 mysql与redis结合使用
MySQL和Redis是两种功能各异的数据库系统,通常可以结合使用来发挥各自的优势。以下是它们如何结合使用的方法:
import pymysql
import redis
class MySqlHelper:
def __init__(self):
self.con = None
self.cur = None
try:
self.con = pymysql.connect(user="root", password="123456", database="python2407")
except Exception as e:
print(f"连接出异常")
else:
self.cur = self.con.cursor()
def query_one(self, sql, args=None):
try:
self.cur.execute(sql, args)
return self.cur.fetchone()
except Exception as e:
print(f"查询出异常")
def update(self, sql, args=None):
try:
row = self.cur.execute(sql, args)
self.con.commit()
return row
except Exception as e:
print(f"修改出异常")
def __del__(self):
if self.cur:
self.cur.close()
if self.con:
self.con.close()
class RedisHelper:
def __init__(self):
self.client = None
try:
self.client = redis.Redis(host="192.168.13.32", password="123456")
except Exception as e:
print(f"连接redis失败")
def query_one(self, key):
try:
return self.client.hgetall(key)
except Exception as e:
print(f"执行查询失败")
def update(self, key, value):
try:
self.client.hset(key, mapping=value)
except Exception as e:
print(f"执行修改失败", e)
def __del__(self):
if self.client:
self.client.close()
class DataHelper:
def __init__(self):
self.sql = MySqlHelper()
self.redis = RedisHelper()
def regist_user(self, username, password):
row = self.sql.update("insert into user (username, password) values (%s, %s)", (username, password))
if row > 0:
user = self.sql.query_one("select * from user where username = %s and password = %s", (username, password))
self.redis.update(f"{user[0]}", {"id": user[0], "username": user[1], "password": user[2]})
print(f"注册成功")
def query_user(self, u_id):
value = self.redis.query_one(u_id)
if value:
print(f"redis中直接找到")
return value
else:
user = self.sql.query_one("select * from user where id = %s", u_id)
if user:
print(f"redis中没有找到, 从mysql中找到放入redis")
self.redis.update(f"{u_id}", {"id": user[0], "username": user[1], "password": user[2]})
return user
def main():
dh = DataHelper()
r = dh.query_user("3031")
print(r)
if __name__ == '__main__':
main()