flask session原理、闪现、请求的扩展

发布时间 2023-11-27 21:06:38作者: Way*yy

session源码

'''
1 app.session_interface 默认是某个类的对象,以后全局对象 session,就是SecureCookieSessionInterface()的对象
2 请求来了,会执行这个对象的: open_session方法
3 请求走了,会执行这个对象的:save_session方法
4 找出上面讲的--》读源码--》
app.run()---->run_simple(地址, 端口, self可调用对象)--->self 是谁?就是 app
请求来了,就会执行 self可调用对象()--->app()---->对象加括号---》触发---》类的__call__
请求来了,就会执行flask类的 __call__--->self.wsgi_app(environ, start_response)
    def wsgi_app(self, environ: dict, start_response: t.Callable) -> t.Any:
        ctx = self.request_context(environ)
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if "werkzeug.debug.preserve_context" in environ:
                environ["werkzeug.debug.preserve_context"](_cv_app.get())
                environ["werkzeug.debug.preserve_context"](_cv_request.get())

            if error is not None and self.should_ignore_error(error):
                error = None

            ctx.pop(error)
# 5 ctx.push()--->有如下代码
if self.session is None: # 请求刚来,是空的
    #session_interface 就是SecureCookieSessionInterface类的对象
    self.session = session_interface.open_session(self.app, self.request)

    if self.session is None:
        self.session = session_interface.make_null_session(self.app)
        
#6  SecureCookieSessionInterface类的open_session
from flask.sessions import SecureCookieSessionInterface
open_session步骤:
    1 会去cookie中取出session对应的 三段式的字符串
    2 解密,校验签名---》把这个数据--》放到 session对象中
save_session步骤:   
    1 从session取出数据
    2 加密,签名---放到cookie中
    3 返回给前端
'''

save_session

'''
1 视图函数中,咱们  session[name]=lqz
2 请求走了,会触发save_session
3 触发save_session时:
	把session中的数据,加密签名得到三段字符串
	放到cookie中,放到了浏览器中

'''
def save_session(
    self, app: Flask, session: SessionMixin, response: Response
) -> None:
    name = self.get_cookie_name(app)
    domain = self.get_cookie_domain(app)
    path = self.get_cookie_path(app)
    secure = self.get_cookie_secure(app)
    samesite = self.get_cookie_samesite(app)
    httponly = self.get_cookie_httponly(app)

    # Add a "Vary: Cookie" header if the session was accessed at all.
    if session.accessed:
        response.vary.add("Cookie")

        # If the session is modified to be empty, remove the cookie.
        # If the session is empty, return without setting the cookie.
        if not session:
            if session.modified:
                response.delete_cookie(
                    name,
                    domain=domain,
                    path=path,
                    secure=secure,
                    samesite=samesite,
                    httponly=httponly,
                )
                response.vary.add("Cookie")

                return

            if not self.should_set_cookie(app, session):
                return

            expires = self.get_expiration_time(app, session)
            # 加密,签名---放到cookie中
            val = self.get_signing_serializer(app).dumps(dict(session))  # type: ignore
            response.set_cookie(
                name,
                val,  # type: ignore
                expires=expires,
                httponly=httponly,
                domain=domain,
                path=path,
                secure=secure,
                samesite=samesite,
            )
            response.vary.add("Cookie")

open_session

'''
1 请求来了,request中带着cookie(也有可能没有)
2 根据 session这个key,取出value,如果有,就是 我们当时生成的三段
3 字典=s.loads(value)  把内容验签,解密出来,转成了字典
4 把这个字典转到 session对象中
5 以后视图函数中  session[name] 就能取到当时你放的name
'''
def open_session(self, app: Flask, request: Request) -> SecureCookieSession | None:
    s = self.get_signing_serializer(app)
    if s is None:
        return None
    # val 就是那三段
    val = request.cookies.get(self.get_cookie_name(app))
    if not val:
        # 如果没有带cookie,造个空session,返回
        return self.session_class()
    max_age = int(app.permanent_session_lifetime.total_seconds())
    try:
        data = s.loads(val, max_age=max_age)
        # 解密,校验签名---》把这个数据--》放到 session对象中
        return self.session_class(data)
    except BadSignature:
        return self.session_class()
    
    
django session的控制:
    from django.contrib.sessions.middleware import SessionMiddleware

闪现

# flash 翻译过来的
# 假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息

# 以后遇到,在当次请求有数据要存,下次请求还能取出来,就可以使用闪现
### 设置闪现
    # 1 普通使用 :通过闪现---》放进去,取一次,就没了
    	# flash(s)  # 放到闪现中了,加密放到了cookie总
    # 2 分类,以分类放入
        flash(s, category='xxx')  
        flash('xxx', category='yyy') 
        
### 取闪现
    # 1 普通取,全取出来 从闪现中取出来
    # err = get_flashed_messages()[0]
    # 2 根据分类取
    err = get_flashed_messages(category_filter=['xxx'])[0]
    
    
    
# 总结:
	1 设置flash,可以按分类设置
    2 去flash,在当前请求中,可以取出多次,都是在的
    3 一旦有请求取出,再去别的的请求中取,就没了,无论有没有分类,都没了

请求扩展

from flask import Flask, request, render_template, jsonify

app = Flask(__name__)
app.debug = True


@app.before_request 
def before():
    # print("我是最先执行的") # 如果return None 还继续往下走,如果return 四件套就不会往下走了
    return "我来了"


@app.route("/")
def index():
    return "xxx"


@app.after_request
def after(response):  # 如果用after_request必须有一个response参数,且这个参数必须return出去
    print("我走了")
    return response



@app.teardown_request
def error(exc): # 如果视图函数正常顺利运行,err是None的,如果视图函数出错了,err就是错误对象,一般用来做日志记录
    print(exc)
    print("teardown_request")


@app.route("/")
def index():
    raise "我出错了"
    return "index"


@app.errorhandler(404) # 当我路径不存在时跳转到指定的页面
def error(exc):
    print(exc)
    return render_template("login.html")


@app.route("/")
def index():
    return "index"


# template_global
#标签

@app.template_global()
def sb(a1, a2):
    return a1 + a2
#{{sb(1,2)}}


# template_filter
# 过滤器

@app.template_filter()
def db(a1, a2, a3):
    return a1 + a2 + a3
#{{ 1|db(2,3)}}

蓝图

#  blurprint 翻译过来的,flask是要做成项目,放到多个文件中,使用蓝图划分目录

# 小型
Flsk_one
    src
        __init__.py
        views.py
    static
    templates
    main.py
    
__init__.py:
	from flask import Flask
    from src.views import bp_home

    app = Flask(__name__)

    app.register_blueprint(bp_home) # 注册蓝图
    
views.py:
    from flask import Blueprint
    bp_home = Blueprint("home", __name__) # 创建蓝图

    @bp_home.route("/")
    def index():
        return "index"

    
main.py:
	from src import app
    if __name__ == '__main__':
        app.run()

        
        
# 使用蓝图,划分大型项目目录  多个app,像django一样
big_blueprint  								# 项目名
    -src									# 核心文件
        -admin								# admin的app
        	-static							# 静态文件
        		-1.jpg						# 图片
        	-templates						# 模板文件目录
        		-admin_home.html			 # 模板文件
        	-__init__.py					# 包
        	-models.py						# 表模型
        	-views.py						# 视图函数
        -home								# home app
        -order								# orderapp
        -__init__.py						# 包
        -settings.py						# 配置文件
    -manage.py								# 启动文件

flask-session

之前的flask的session加密后放到了cookie中,如果想把session放到Redis中该如何去做?
借助于第三方的flask-session的模块:pip3 install flask-session

# 方式一
from flask import Flask, session
from flask_session import RedisSessionInterface
import redis

app = Flask(__name__)
app.secret_key = "asdjkadjksdskdjs"
conn = redis.Redis(host="127.0.0.1", port="6379")
app.session_interface = RedisSessionInterface(conn, "session")


def index():
    session["name"] = "yang"
    return "index"


app.add_url_rule("/index", view_func=index)

# 方式二:(继承第三方 ,通用方案:第三方提供的一个类,把app包裹一下,这个第三方就能用了)
from flask_session import Session
from redis import Redis

app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='127.0.0.1', port='6379')
# app.config['SESSION_KEY_PREFIX'] = 'lqz'  # 如果不写,默认以:SESSION_COOKIE_NAME 作为key
# app.config.from_pyfile('./settings.py')
Session(app)  # 核心跟第一种方式一模一样


- session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:')

- session的key理应该是 uuid,如果还是三段式-->之前浏览器器中存在了

数据库连接池

flask使用pymysql

from flask import Flask, jsonify
import pymysql
from pymysql.cursors import DictCursor

app = Flask(__name__)
app.debug = True


@app.route("/")
def index():
    coon = pymysql.connect(
        user="root",
        password="020501",
        host="127.0.0.1",
        database="pearadminflask",
        port=3306
    )
    cursor = coon.cursor(DictCursor)
    sql = "select id,name,url from article limit 10"
    cursor.execute(sql)
    res = cursor.fetchall()
    return jsonify({"code": 200, "msg": "成功", "result": res})


并发问题:conn和cursor 要做成单例,还是每个视图函数一个?
    -如果用单例,数据会错乱
    -咱们需要,每个视图函数,拿一个连接--->如果并发数过多,mysql连接数就很多--->使用连接池解决
    
    
# 解决上面的问题
	-数据库连接池
    -创建一个全局的池
    -每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数
    -1 安装 pip3 install dbutils
    -2 使用:实例化得到一个池对象--->池是单例
    
# 创建连接池
	from dbutils.pooled_db import PooledDB
    import pymysql

    POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is
        # created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='020501',
        database='pearadminflask',
        charset='utf8'
    )

# 在视图函数中使用
	from flask import Flask, jsonify
    from mysql_pool import POOL

    app = Flask(__name__)
    app.debug = True


    @app.route("/")
    def index():
        conn = POOL.connection()
        cursor = conn.cursor()
        sql = 'select id,name,url from article limit 10'
        cursor.execute(sql)
        # 切换
        res = cursor.fetchall()
        return jsonify({"code": 200, "msg": "成功", "result": res})
    
    
# django 的 mysql操作,有连接池?
	-没有--->一个请求--->就是一个新的连接
    
    -django中引入连接池--->自行搜索

wtforms

# 生成表单,做数据校验
# django--->forms组件-->django内置的
	1 写个类,继承Form,在里面写字段
    2 requset.POST 要校验的数据  form=MyForm(request.Post)   form.is_valiad()
    3 可以在模板上,快速生成form表单
    
    
# wtform就是用来做表单校验,和生成表单

flask-script



### 如果运行不了:报错 from flask._compat import text_type,降版本
# -1 pip3 install  Flask-Script==2.0.3
# -2 pip3 install flask==1.1.4
# -3 pip3 install markupsafe=1.1.1


from flask import Flask
from flask_script import Manager

app = Flask(__name__)
app.debug = True
manager = Manager(app)

####3 自定制命令
@manager.command
def custom(arg):
    """自定义命令
    python manage.py custom 123
    """
    print(arg)


@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令(-n也可以写成--name)
    执行: python manage.py  cmd -n lqz -u xxxx
    执行: python manage.py  cmd --name lqz --url xxx
    """
    print(name, url)

# 比如定制一个命令--->导入初始化的省市的数据
# 比如定制一个命令--->传入一个excel--->把excel的数据同步到mysql表中--->openpyxl

@app.route('/')
def index():
    return 'index'


if __name__ == '__main__':
    # app.run(port=8888)
    manager.run()

新版基于click

# flask 自己有,基于click
	-运行flask    flask --app py文件名字:app run
        
        
##  1 flask 运行项目
flask --app py文件名字:app run

## 2 定制命令
@app.cli.command("create-user")
@click.argument("name")
def create_user(name):
    print(name)
    
# 命令行中执行
flask --app 7-flask命令:app create-user lqz
# 简写成 前提条件是 app所在的py文件名字叫 app.py
flask create-user lqz

django中自定制命令

# 1 app下新建文件夹
	management/commands/
# 2 在该文件夹下新建py文件,随便命名(命令名)

# 3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):
    help = '命令提示'
    def handle(self, *args, **kwargs):
		命令逻辑  
# 4 使用命令
python manage.py  py文件(命令名)

信号

# https://flask.palletsprojects.com/en/3.0.x/signals/

# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

# 内置信号
	request_started = _signals.signal('request-started')                # 请求到来前执行
    request_finished = _signals.signal('request-finished')              # 请求结束后执行

    before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
    template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行

    got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行

    request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
    appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)

    appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
    appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
    message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发
    
    
# 通过内置信号,可以实现跟请求扩展类似的功能,但是他们是不同的方案,信号更丰富

signal:信号--》flask中的信号,django中也有


Semaphore :信号量,多把锁,https://zhuanlan.zhihu.com/p/489305763
import threading
def task(semaphore):
    # 请求访问资源
    semaphore.acquire()
    try:
        # 访问共享资源
        print("Accessing shared resource")
    finally:
        # 释放资源
        semaphore.release()

# 创建Semaphore对象
semaphore = threading.Semaphore(5)

# 创建50个线程
threads = [threading.Thread(target=access_resource, args=(semaphore,)) for _ in range(50)]

# 启动线程
for thread in threads:
    thread.start()

# 等待所有线程结束
for thread in threads:
    thread.join()

内置信号使用

# 示例
from flask import Flask, signals, render_template

app = Flask(__name__)


# 先写一个函数
def render_logger(*args, **kwargs):
    print(args)
    print(kwargs)
    app.logger.info("模版渲染了")


# 与内置信号进行绑定
signals.template_rendered.connect(render_logger)


def index():
    name = "xxxx"
    return render_template("indexs.html", name=name)


app.add_url_rule("/", view_func=index)

自定义信号

#### 自定义信号
# 定义自定义信号
from flask.signals import _signals

print_args = _signals.signal('print_args')


# 第二步:写个函数
def lqz(*args, **kwargs):
    print(args)
    print(kwargs)
    print('我执行了')


# 第三步:绑定信号
print_args.connect(lqz)


# 第四步:触发信号
@app.route('/home')
def home():
    print('xxxxxxxxx')
    # 第四步:触发信号
    print_args.send(value='xxxxxxxxx')
    return 'home'

信号的实际用途

# 1 新增一个用户,给管理员发条邮件
# 2 banner表中插入数据,就删除缓存
# 3 只要mysql增加一条记录,就把数据同步到 es中

django中如何使用信号

# banner表中插入数据,就删除缓存  伪代码

#1  定义一个函数
from django.db.models.signals import pre_save
def callBack(*args **kwargs):
    print(args)
    print(kwargs)
    instance=kwargs.get('instance') # 新增的对象
    # 通过对象,判断是增加的哪个表中的数据
    #删缓存
# 2 跟内置信号绑定
pre_save.connect(callBack)

# 3 等待内置信号触发

flask-cache

参考:https://flask.palletsprojects.com/en/3.0.x/patterns/caching/