Flask快速入门day 06 (sqlalchemy的使用,scoped-session线程安全)

发布时间 2023-04-10 22:08:06作者: kangshong

Flask框架之sqlalchemy的使用

一、SQLAlchemy基本使用

1、简介

什么是sqlalchemy:

sqlalchemy是一个基于Python实现的ORM框架,该框架建立在DB API之上,使用对象关系映射进行数据的操作,简而言之就是,将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果

  • flask 中没有orm框架,对象关系映射,方便我们快速操作数据库
  • flask,fastapi中用sqlalchemy居多

sqlalchemy的安装:

pip install sqlalchemy

了解:

sqlalchemy本身无法操作数据库,隐藏需要借助pymysql等第三方插件进行连接

# 实例化对象时需要指定以下的参数
    pymysql  
    	mysql+pymysql://username:password@host/dbname[?<options>]

    cx_Oracle
    	oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

# 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html

2、操作原生sql

- 方式一:
	# 第一步:导入
    from sqlalchemy import create_engine

    # 实例化对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/flask_test",  # 连接的数据库
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )

    # 获取连接引擎
    conn = engine.raw_connection()
    # 获取游标对象
    cursor = conn.cursor()
    # 执行sql
    cursor.execute('select * from test')
    # 获取执行结果


- 方式二:
    from models import User, Book
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.orm import scoped_session
    engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa")
    Session = sessionmaker(bind=engine)
    session = scoped_session(Session)

    # 2.0.9 版本需要使用text包裹一下,原来版本不需要
    # cursor = session.execute(text('select * from users'))
    # result = cursor.fetchall()
    # print(result)

    cursor = session.execute(text('insert into books(name) values(:name)'), params={"name": '红楼梦'})
    session.commit()
    print(cursor.lastrowid)

    session.close()

3、表创建

# 1、导入需要的模块
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 2、实例declarative_base对象,用于表模型继承(类似于django表模型继承的Model)
Base = declarative_base()


# 3、使用表模型继承实例的Base
class User(Base):
    # 所有的字段都使用Column,类型在参数内指定(sqlalchemy不会自动生成ID字段)
    id = Column(Integer, primary_key=True)
    # nullable=False(字段必填),index(建立索引)
    name = Column(String(32), nullable=False, index=True)
    age = Column(Integer, nullable=True)
    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    ctime = Column(DateTime, default=datetime.datetime.now)
    __tablename__ = 'users'

    # 第六步:建立联合索引,联合唯一
    __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),  # 联合唯一
        Index('ix_id_name', 'name'),  # 索引
    )


class Book(Base):
    # 创建的表的名字
    __tablename__ = 'books'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))


# 创建表(只会创建表,库需要自己创建)
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/flask_test",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 把表同步到数据库(把被Base管理的所有表,都创建到数据库)表迁移
Base.metadata.create_all(engine)

# 把所有表删除(修改字段后需要删除表重新创建、这里不会像django一样修改后迁移一次就会更新表)
# Base.metadata.drop_all(engine)

4、ORM操作

4、1.基本使用

# 导入必要的模块
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import User

# 生成一个engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/flask_test",  # 连接的数据库
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 获得一个Session类,将engine当作参数传入
Session = sessionmaker(bind=engine)

# 实例Session的对象
session = Session()

# 生成一个model表的对象
user = User(name='jason', age=18)
# 在表内生成对应的记录
session.add(user)
# 需要提交
session.commit()
# 关闭session对象
session.close()

4、2.增删改查

增加:

# 增加单条记录 add
	user = User(name='jason', age=18)
	session.all(user)

# 增加多条 add_all
    user = User(name='jason', age=18)
    user1 = User(name='lqz', age=18)
    book = Book(name='红楼梦', age=18)
    session.add_all([user,user1,book])  # 放入列表,对象可以是任意表的对象

基本查:

# 基本查  filter  filter_by     filer:写条件     filter_by:等于的值
 -filter
    session.query(User)  # 中写表模型,可以写多个表模型(连表操作)  select * from User;
    # filter 过滤条件,必须写表达式  ==    >=    <=    !=   select * from user where user.id=1
    # 2.3 all:普通列表  first
    # user = session.query(User).filter(User.name == 'lqz').first()
    # user = session.query(User).filter(User.name != 'lqz').all()
    # print(user)
    # res = session.query(User).filter(User.id > 1).all()
    # print(res)

- filter_by  # 直接写等式    不能写成 User.name = 'lqz'
    # user = session.query(User).filter_by(name='lqz').first()
    # user = session.query(User).filter_by(id=2).first()
    # user = session.query(User).filter_by(id=2).first()
    # print(user)

修改:

# 4 修改(查到才能改)
- 方式一:update修改
    # res = session.query(User).filter_by(id=3).update({"name" : "彭于晏"})
    # print(res)
    # session.commit()
- 方式二,使用对象修改
    # res = session.query(User).filter_by(id=3).first()
    # res = session.query(User).filter_by(name='zzz').first()
    # res.name='kkk'
    # print(res.id)
    # session.add(res)  # add 如果有主键,就是修改,如果没有主键就是新增
    # session.commit()

删除:

- 删除(查到才能删) filter或filter_by查询的结果  不要all或first出来, .delete()即可
    # res = session.query(User).filter_by(id=2).delete()
    # session.commit()  # 一定不要忘了
    # print(res) # 影响的行数

4、3.高级查询

# 4 查询: filer:写条件     filter_by:等于的值
    # 4.1 查询所有  是list对象
    res = session.query(User).all()  # 是个普通列表
    print(type(res))
    print(len(res))

# 4.1.1 只查询某几个字段
    # select name as xx,email from user;
    res = session.query(User.name.label('xx'), User.email)
    print(res)  # 打出原生sql
    # print(res.all())
    for item in res.all():
        print(item[0])


# 4.1.2 filter传的是表达式,filter_by传的是参数
    res = session.query(User).filter(User.name == "lqz").all()
    res = session.query(User).filter(User.name != "lqz").all()
    res = session.query(User).filter(User.name != "lqz", User.email == '3@qq.com').all()  # django 中使用 Q
    res = session.query(User).filter_by(name='lqz099').all()
    res = session.query(User).filter_by(name='lqz099',email='47@qq.com').all()
    print(len(res))

# 4.2 取一个 all了后是list,list 没有first方法
    res = session.query(User).first()


# 4.3 查询所有,使用占位符(了解)  :value     :name
    # select * from user where id <20 or name=lqz099
    res = session.query(User).filter(text("id<:value or name=:name")).params(value=10, name='lqz099').all()


# 4.4 自定义查询(了解)
    # from_statement  # 写纯原生sql
    res=session.query(User).from_statement(text("SELECT * FROM users where email=:email")).params(email='3@qq.com').all()
    # print(type(res[0]))  # 是book的对象,但是查的是User表   不要这样写
    print(res[0].name) 

# 4.5 高级查询

# 条件
# 表达式,and条件连接
    res = session.query(User).filter(User.id > 1, User.name == 'lqz099').all() # and条件

# between
    res = session.query(User).filter(User.id.between(1, 9), User.name == 'lqz099').all()
    res = session.query(User).filter(User.id.between(1, 9)).all()

# in
    res = session.query(User).filter(User.id.in_([1,3,4])).all()
    res = session.query(User).filter(User.email.in_(['3@qq.com','r@qq.com'])).all()

# ~非,除。。外
    res = session.query(User).filter(~User.id.in_([1,3,4])).all()
    print(res)

# 二次筛选
    res = session.query(User).filter(~User.id.in_(session.query(User.id).filter_by(name='lqz099'))).all()
    print(res)


# and or条件
    from sqlalchemy import and_, or_
    # or_包裹的都是or条件,and_包裹的都是and条件
    res = session.query(User).filter(and_(User.id >= 3, User.name == 'lqz099')).all()  #  and条件
    res = session.query(User).filter(User.id < 3, User.name == 'lqz099').all()  #  等同于上面
    res = session.query(User).filter(or_(User.id < 2, User.name == 'eric')).all()
    res = session.query(User).filter(
        or_(
            User.id < 2,
            and_(User.name == 'lqz099', User.id > 3),
            User.extra != ""
        )).all()


# 通配符,以e开头,不以e开头
    res = session.query(User).filter(User.email.like('%@%')).all()
    select user.id from user where  user.name not like e%;
    res = session.query(User.id).filter(~User.name.like('e%'))


# 分页
    # 一页2条,查第5页
    res = session.query(User)[2*5:2*5+2]

# 排序,根据name降序排列(从大到小)
    res = session.query(User).order_by(User.email.desc()).all()
    res = session.query(Book).order_by(Book.price.desc()).all()
    res = session.query(Book).order_by(Book.price.asc()).all()
    # 第一个条件重复后,再按第二个条件升序排
    res = session.query(User).order_by(User.name.desc(), User.id.asc())



# 分组查询  5个聚合函数
    from sqlalchemy.sql import func
    res = session.query(User).group_by(User.extra)  # 如果是严格模式,就报错
    # 分组之后取最大id,id之和,最小id  和分组的字段
    res = session.query(
         User.extra,
         func.max(User.id),
         func.sum(User.id),
        func.min(User.id)).group_by(User.extra).all()
    for item in res:
        print(item[2])

# having
    # select max(id),sum(id),min(id) from  user group by  user.extra   having id_max>2;
    res = session.query(
        func.max(User.id),
        func.sum(User.id),
        func.min(User.id)).group_by(User.extra).having(func.max(User.id) > 2)

二、外键关系

本质:

# 一对一:本身是一个表,折成两张表,一对一关联(本质一对多,外键字段做了唯一)

# 一对多:外键关系建在多的一方

# 多对多:需要建立中间表(本质就是两张表对中间表做了一对多的外键关系)

- 总结:三种外键关系的本质其实都是一对多

1、一对多

1、1.表模型

# 一对多关系
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    id = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名
    # 关联字段写在多的一方,写在Person中,跟hobby表中id字段做外键关联
    hobby_id = Column(Integer, ForeignKey("hobby.id"))

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 基于对象的跨表查询:就要加这个字段,取对象  person.hobby     pserson.hobby_id
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')  # 如果有hobby对象,拿到所有人 hobby.pers

    def __repr__(self):
        return self.name


engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa", )

# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)

# 把所有表删除
# Base.metadata.drop_all(engine)

1、2.新增和基于对象的查询

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models1 import Hobby, Person

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 一对多新增
    hobby = Hobby(caption='乒乓球')
    session.add(hobby)
    person = Person(name='张三')
    session.add(person)
    # 先用上的增加后,然后查出来
    hobby=session.query(Hobby).filter(Hobby.caption=='乒乓球').first()
    person = Person(name='王五',hobby_id=hobby.id)
    session.add(person)


# 支持按对象的增加方式,必须加relationship 做关联
    # 方式一
    hobby=session.query(Hobby).filter(Hobby.caption=='乒乓球').first()
    person = Person(name='赵六',hobby=hobby)
    
    # 方式二
    hobby = Hobby(caption='羽毛球')  # 表中暂时没有
    person = Person(name='赵六', hobby=hobby)
    session.add_all([person, hobby])
    session.commit()


## 基于对象的跨表查询  .
    # 正向查询
    person=session.query(Person).filter(Person.name=='王五').first()
    print(person.hobby_id)
    print(person.hobby)  # Hobby 的对象

    # 反向查询
    hobby=session.query(Hobby).filter(Hobby.id==1).first()
    print(hobby.pers)

2、多对多

2、1.表模型

# 一对多关系
from sqlalchemy import create_engine
import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship

# 第二步:执行declarative_base,得到一个类
Base = declarative_base()


# 多对多

# 中间表  手动创建
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True)
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(64), unique=True, nullable=False)



    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    # 方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

    def __str__(self):
        return self.name

    def __repr__(self):
        return self.name


engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa", )


# 把表同步到数据库  (把被Base管理的所有表,都创建到数据库)
Base.metadata.create_all(engine)

# 把所有表删除
# Base.metadata.drop_all(engine)

2、2.新增和基于对象查询

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from models2 import Girl, Boy, Boy2Girl

engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/aaa")
Session = sessionmaker(bind=engine)
session = scoped_session(Session)

# 新增
    # 1 笨办法新增
    girl=Girl(name='刘亦菲')
    boy=Boy(name='彭于晏')
    session.add_all([girl,boy])
    session.add(Boy2Girl(girl_id=1,boy_id=1))
    session.commit()

    # 2 使用relationship
    boy = Boy(name='lqz')
    boy.girls = [Girl(name='迪丽热巴'), Girl(name='景田')]
    session.add(boy)
    session.commit()


# 基于对象的跨表查询
    # 正向
    boy = session.query(Boy).filter(Boy.id==2).first()
    print(boy.girls)

# 反向
    girl = session.query(Girl).filter(Girl.id==2).first()
    print(girl.boys)


# 如果没有relationship,纯自己操作

3、连表查询

### 关联关系,基于连表的跨表查询
from models1 import Person,Hobby
# 链表操作
# select * from person,hobby where person.hobby_id=hobby.id;
# res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id).all()

# 自己连表查询
# join表,默认是inner join,自动按外键关联
# select * from Person inner join Hobby on Person.hobby_id=Hobby.id;
# res = session.query(Person).join(Hobby).all()

#isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可
# select * from Person left join Hobby on Person.hobby_id=Hobby.id;
# res = session.query(Person).join(Hobby, isouter=True).all()
# 没有right join,通过这个实现
# res = session.query(Hobby).join(Person, isouter=True).all()

# # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上
# select * from Person left join Hobby on Person.id=Hobby.id;
# res = session.query(Person).join(Hobby, Person.hobby_id == Hobby.id, isouter=True) #  sql本身有问题,只是给你讲, 自己指定链接字段
# 右链接
# print(res)



# 多对多关系连表
# 多对多关系,基于链表的跨表查
#方式一:直接连
res = session.query(Boy, Girl,Boy2Girl).filter(Boy.id == Boy2Girl.boy_id,Girl.id == Boy2Girl.girl_id).all()
# 方式二:join连
res = session.query(Boy).join(Boy2Girl).join(Girl).filter(Person.id>=2).all()

三、scoped_session线程安全

1、简介

使用sqlalchemy后发现在每个视图函数中都需要建立一次连接,如果将sqlalchemy放在全局变量中,就会造成数据错乱,scoped_session可以解决线程安全问题

本质:使用了local方法

2、使用方法

from sqlalchemy.orm import scoped_session
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 第一步:生成engine对象
engine = create_engine(
    "mysql+pymysql://root@127.0.0.1:3306/aaa",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:拿到一个Session类,传入engine
Session = sessionmaker(bind=engine)
# 线程不安全
# session = Session()

# 做成线程安全的:如何做的?
# 内部使用了local对象,取当前线程的session,如果当前线程有,就直接返回用,如果没有,创建一个,放到local中
# session 是  scoped_session 的对象
session = scoped_session(Session)

# 以后全局使用session即可,它线程安全