数据库连接池 - Pymysql

发布时间 2023-04-24 22:15:05作者: 致于数据科学家的小陈

最近想用小程序来做个移动BI, 然后涉及后端接口部分打算用 Python 的 flask 框架整一波, 主要听闻它比较轻量, 简单和可灵活配置, 这就和我很对味. 毕竟我主要搞数据用的就是 sql 而已, 只要有个服务器提供接口就行. 真正开始来写接口的时候, 就遇到这个数据库的问题, 关于查询效率, 和优雅.

我在工作中偶尔有一些数据处理或者逻辑分析的场景下, 即不通过BI平台, 大量用 pandas + sql 来操作. 我们数据的 ADS 层几乎都在 mysql 故用这个 pymysql 作为驱动连接工具是很好用的.

低频分析查询-随便搞

针对低频查询数据, 通常写法是这样的:

import pymysql

# 1.创建连接
conn = pymysql.connect(
    host='127.0.0.1',
    port=3306,
    user='root',
    passwd='123456',
    db='cj'
)

# 3.创建游标
cursor = conn.cursor()

# 4. 执行sql语句和获取数据, 以经典的超市数据为例
cursor.execute('select order_id, province from market limit 2;')
data = cursor.fetchall()
print(data)

# 5. 关闭连接(同时也就关闭了cursor, 不关可能造成锁表)
conn.close()

(('US-2023-1357144', '浙江'), ('CN-2023-1973789', '四川'))

高频分析查询-连接池

但如果变成了web, 每次访问接口都要查询数据, 如果每次都要重新进行 connect 那这个就有点低效了.

若在外层只连接一次, 重复用 cursor 的话又可能会造成锁表和线程异常问题.

因此, 创建一个数据库连接池来进行任务管理是非常必要的. 即可通过一个 pool 来自动创建多个连接和资源回收, 这样就又高效又优雅.

在 pyhton 中, 我们用 dbutils 用作池管理, 驱动还是用 pymysql, 写法如下:

import pymysql 
from pymysql import cursors
from dbutils.pooled_db import PooledDB


pool = PooledDB(
    creator=pymysql,
    # 创建最大连接
    maxconnections=6,
    mincached=2,
    maxcached=3,
    maxshared=4,
    blocking=True,
    maxusage=None,
    setsession=[],
    ping=0,
    # 这一坨会传给上面的 pymysql
    host='127.0.0.1',
    port=3306,
    user='root',
    passwd='123456',
    database='cj',
    charset='utf8',
    # 让查询结果是一个 dict
    cursorclass=cursors.DictCursor
)


# 使用上和 mysql 是一样的
conn = pool.connection()
cursor = conn.cursor()

cursor.execute('select order_id, province from market limit 2;')
data = cursor.fetchall()
print(data)

# 这里并没有关闭连接, 而是放进了连接池 pool 
conn.close()

结果也是一样的.

(('US-2023-1357144', '浙江'), ('CN-2023-1973789', '四川'))

模拟多用户请求

上面的作用不太直观, 这里用多线程 threading 来模拟多请求的情况. 假设每次最大连接 3次, 至少1次的配置,

通过 sleep 减速一下则可以看到每次 3个连接这样, 一波波处理:

import pymysql 
from pymysql import cursors
from dbutils.pooled_db import PooledDB


pool = PooledDB(
    creator=pymysql,
    # 创建最大连接, 这里假设搞个3
    maxconnections=3,
    mincached=1,
    maxcached=3,
    maxshared=4,
    blocking=True,
    maxusage=None,
    setsession=[],
    ping=0,
    # 这一坨会传给上面的 pymysql
    host='127.0.0.1',
    port=3306,
    user='root',
    passwd='123456',
    database='cj',
    charset='utf8',
    # 让查询结果是一个 dict
    cursorclass=cursors.DictCursor
)


# 模拟多个请求的情况


from threading import Thread

def task(num):
    conn = pool.connection()
    cursor = conn.cursor()

    cursor.execute('select sleep(3);')
    data = cursor.fetchall()

    print(num, '-------', data)
    conn.close()

# 多线程任务测试
for i in range(20):
    t = Thread(target=task, args=(i,))
    # 启动
    t.start()
    


一波波的输出如下:

2 ------- [{'sleep(3)': 0}]
1 ------- [{'sleep(3)': 0}]
3 ------- [{'sleep(3)': 0}]

0 ------- [{'sleep(3)': 0}]
4 ------- [{'sleep(3)': 0}]
5 ------- [{'sleep(3)': 0}]

7 ------- [{'sleep(3)': 0}]
6 ------- [{'sleep(3)': 0}]
10 ------- [{'sleep(3)': 0}]

8 ------- [{'sleep(3)': 0}] 
11 ------- [{'sleep(3)': 0}]
9 ------- [{'sleep(3)': 0}] 

14 ------- [{'sleep(3)': 0}]
12 ------- [{'sleep(3)': 0}]
15 ------- [{'sleep(3)': 0}]

13 ------- [{'sleep(3)': 0}]
16 ------- [{'sleep(3)': 0}]
18 ------- [{'sleep(3)': 0}]

17 ------- [{'sleep(3)': 0}]
19 ------- [{'sleep(3)': 0}]

可以看到连接池简直太优秀了, 非常实用和优雅呀.