1、排序
order_by
方法排序:可以指定根据模型中某个属性进行排序,"模型名.属性名.desc()"
代表的是降序排序。
# 根据年龄降序
lst = session.query(Student).order_by(Student.age.desc()).all()
# 根据年龄升序
lst = session.query(Student).order_by(Student.age).all()
- 在定义模型的时候指定排序:有些时候,不想每次在查询的时候都用
order_by
方法,可以在定义模型的时候就指定排序的方式。
一、模型对象定义中加排序
# 学生表
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
age = Column(Integer)
# 4.定义relationship属性
course_list = relationship('Course', backref='student_list', secondary=temp_tab)
__mapper_args__ = { # 新的映射参数
"order_by": age.desc() # 降序
}
def __repr__(self):
return "Student:name=%s, age=%s" % (self.name, self.age)
二、在relationship
的方法中order_by
属性
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
age = Column(Integer)
course_list = relationship('Course', backref='student_list', secondary=temp_tab, order_by=age.desc())
# course_list = relationship('Course', backref='student_list', secondary=temp_tab, order_by=age)
2、分页查询
limit
:可以限制查询的时候只查询前几条数据。
lst = session.query(Student).limit(10).all()
offset
:可以限制查找数据的时候过滤掉前面多少条。可指定开始查询时的偏移量。
lst = session.query(Student).offset(5).limit(10).all()
- 切片:可以对Query对象使用切片操作,来获取想要的数据。
lst = session.query(Student).slice(2, 8).all() # 前开后闭
print(lst)
print(len(lst)) # 6
- 可以使用
slice(start,stop)
方法来做切片操作。slice(step, stop)
,不包括step,包括stop。 - 也可以使用
[start:stop]
的方式来进行切片操作。 - 一般在实际开发中,中括号的形式是用得比较多的。
3、懒加载
在一对多,或者多对多关系的时候,如果想要获取多的一方这一部分的数据的时候,往往能通过一个属性就可以全部获取了。
但是有时候并不想获得所有的关联属性,那么这时候我们可以给relationship
方法添加属性lazy='dynamic'
,以后通过.articles
获取到的就不是一个列表,而是一个AppenderQuery对象了。
这样就可以对这个对象再进行一层过滤和排序等操作。
通过 lazy='dynamic'
,获取出来的多的那一部分的数据,就是一个 AppenderQuery
对象了。这种对象既可以添加新数据,也可以跟 Quer
y 一样,可以再进行一层过滤。
from datetime import date
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, backref
engine = create_engine('mysql+pymysql://root:root@localhost:3306/flask_db?charset=utf8mb4', echo=True)
Base = declarative_base(engine)
# 部门和员工之间,是一种典型的一(主)对多(从)
class Dep(Base):
__tablename__ = 't_dept'
dept_no = Column(Integer, primary_key=True)
d_name = Column(String(255)) # 部门名字
city = Column(String(50))
# 代表当前部门下所有员工的列表, 这种写法不是最优的, 最优的写法只要在其中一个对象中关联
# emps = relationship("Emp") # 参数必须是另外一个相关联的类名
def __str__(self):
return 'DEP:<部门名字:{}, 城市:{}>'.format(self.d_name, self.city)
# 员工表
class Emp(Base):
__tablename__ = 't_emp'
emp_no = Column(Integer, primary_key=True)
emp_name = Column(String(50)) # 员工名字
job = Column(String(50))
hire_time = Column(DATE) # 入职时间
sal = Column(DECIMAL(10, 2)) # 薪资,连两位小数共十位
# dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='NO ACTION'))
# dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='CASCADE')) # 级联删除
dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='RESTRICT')) #
# dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='SET NULL')) #
# 当前员工所属部门
dept = relationship("Dep", backref=backref('emps', lazy='dynamic'), lazy='select') # lazy默认值是select
def __str__(self):
return "EMP:<员工编号:{}, 员工姓名:{}>".format(self.emp_no, self.emp_name)
# Base.metadata.drop_all()
# Base.metadata.create_all()
# d1 = Dep(d_name='人事部', city='上海')
# d2 = Dep(d_name='销售部', city='上海')
# e1 = Emp(emp_name='张三', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=1)
# e2 = Emp(emp_name='李四', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=1)
# e3 = Emp(emp_name='王二', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=2)
# e4 = Emp(emp_name='麻子', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=2)
# 创建session对象
session = sessionmaker(engine)()
def test_lazy():
d = session.query(Dep).filter(Dep.dept_no == 1)
print(type(d)) # <class 'sqlalchemy.orm.query.Query'>
print(d) # 打印查询的SQL语句
result = d.first() # d.first 执行SQL语句 DEP:<部门名字:人事部, 城市:上海>
print(result.emps) # 当前对象是一个AppenderQuery对象:可以追加,排序,过滤
e = Emp(emp_name='xxx', job='manager', hire_time=date(2022, 12, 27), sal=8888)
result.emps.append(e)
result.emps.filter(Emp.emp_no > 6).all() # 过滤
session.commit()
if __name__ == '__main__':
test_lazy()
4. 分组过滤
group_by
:根据某个字段进行分组。
统计每个工资级别下有多少员工。
result = session.query(Emp.sal, func.count(Emp.emp_no)).group_by(Emp.sal).all()
having
:对分组查找结果作进一步过滤。
统计每个工资级别下有多少员工数量,只统计薪资大于3000
以上的
# 统计每个工资级别下有多少员工数量,只统计薪资大于3000以上的
result = session.query(Emp.sal, func.count(Emp.emp_no)).group_by(Emp.sal).having(Emp.sal > 3000).all()
print(result)
5.子查询
5、子查询
子查询即select
语句中还有select
。
那么在sqlalchemy
中,要实现一个子查询,需以下几个步骤:
-
将子查询按照传统的方式写好查询代码,然后在
query
对象后面执行subquery
方法,将这个查询变成一个子查询。 -
在子查询中,将以后需要用到的字段通过
label
方法,取个别名。 -
在父查询中,如果想要使用子查询的字段,那么可以通过子查询的返回值上的
c
属性拿到(c=Column)
。
查询和张三这个员工的职位、入职时间都相同的其他员工
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, backref
engine = create_engine('mysql+pymysql://root:root@localhost:3306/flask_db?charset=utf8mb4', echo=True)
Base = declarative_base(engine)
# 部门和员工之间,是一种典型的一(主)对多(从)
class Dep(Base):
__tablename__ = 't_dept'
dept_no = Column(Integer, primary_key=True)
d_name = Column(String(255)) # 部门名字
city = Column(String(50))
# 代表当前部门下所有员工的列表, 这种写法不是最优的, 最优的写法只要在其中一个对象中关联
# emps = relationship("Emp") # 参数必须是另外一个相关联的类名
def __str__(self):
return 'DEP:<部门名字:{}, 城市:{}>'.format(self.d_name, self.city)
# 员工表
class Emp(Base):
__tablename__ = 't_emp'
emp_no = Column(Integer, primary_key=True)
emp_name = Column(String(50)) # 员工名字
job = Column(String(50))
hire_time = Column(DATE) # 入职时间
sal = Column(DECIMAL(10, 2)) # 薪资,连两位小数共十位
# dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='NO ACTION'))
# dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='CASCADE')) # 级联删除
dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='RESTRICT')) #
# dept_no = Column(Integer, ForeignKey('t_dept.dept_no', ondelete='SET NULL')) #
# 当前员工所属部门
dept = relationship("Dep", backref=backref('emps', lazy='dynamic'), lazy='select') # lazy默认值是select
def __repr__(self):
return "EMP:<员工编号:{}, 员工姓名:{}>".format(self.emp_no, self.emp_name)
# Base.metadata.drop_all()
# Base.metadata.create_all()
# d1 = Dep(d_name='人事部', city='上海')
# d2 = Dep(d_name='销售部', city='上海')
# e1 = Emp(emp_name='张三', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=1)
# e2 = Emp(emp_name='李四', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=1)
# e3 = Emp(emp_name='王二', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=2)
# e4 = Emp(emp_name='麻子', job='经理', hire_time=date(2022, 12, 22), sal=6666.50, dept_no=2)
# 创建session对象
session = sessionmaker(engine)()
def test_group():
# 统计每个工资级别下有多少员工数量
result = session.query(Emp.sal, func.count(Emp.emp_no)).group_by(Emp.sal).all()
print(result)
# 统计每个工资级别下有多少员工数量,只统计薪资大于3000以上的
result = session.query(Emp.sal, func.count(Emp.emp_no)).group_by(Emp.sal).having(Emp.sal > 3000).all()
print(result)
def test_subQuery():
# 查询和张三这个员工的职位、入职时间都相同的其他员工
# 1.写子查询
stmt = session.query(Emp.hire_time.label('ht'), Emp.job.label('jb')).filter(Emp.emp_name == '张三').subquery()
# 2.写父查询
result = session.query(Emp).filter(Emp.hire_time == stmt.c.ht, Emp.job == stmt.c.jb).all()
print(result)
if __name__ == '__main__':
# test_group()
test_subQuery()