正在进入ing...

Sqlalchemy 异步操作体验

发布时间:2022-09-16 浏览量: 75 文章分类: python

sqlalchemy使用也很久了,一直也没遇到什么问题,最近在做一个业务的时候,出现查询延迟非常严重的情况,因为并发统计量和数据本身量级都比较大,也懒得做别的优化了,直接上async异步,大力破奇迹了。

先吐槽一下官方文档,实在是。。乱的一团糟。

英文版本官方文档: 点我查看

中文官方文档:点我查看

这次的改进还是比较大的,我觉得使用flask等其他web框架的支持更好了,因为我通常使用的是django,所以也不熟悉。就自己边看文档,边做一些记录了。

安装

我的版本如下,请确认你的版本和我的是不是一致。

python=3.10.5

SQLAlchemy==1.4.40
asyncmy==0.2.5

连接数据库

常规连接

from sqlalchemy import create_engine

engine = create_engine(
        "mysql+pymysql://root:123456@localhost:3306/newdb01?charset=utf8mb4",
        echo=False,max_overflow=5,pool_recycle=1800,pool_pre_ping=True)

异步连接

from sqlalchemy.ext.asyncio import create_async_engine

async_engine = create_async_engine(
        "mysql+asyncmy://root:123456@localhost:3306/newdb01?charset=utf8mb4",
        echo=False,pool_recycle=3500,max_overflow=20,pool_size=10)

通过上面例子还是比较清楚可以看出来2个变化的

  • 在引入的时候变成了sqlalchemy.ext.asyncio专门的异步库中引入了

  • 连接mysql的时候,由之前的mysql+pymysql变成了mysql+asyncmy

其余参数暂时还没发现有变化的,以后我发现了在持续更新补充。

创建连接

这里变化还是挺大的,直接看差异吧

常规创建

from sqlalchemy.orm import sessionmaker


Session = sessionmaker(bind=engine)
session = Session()

异步创建

async with async_engine.connect() as conn:
     ....

这里有几个差别,因为涉及到异步了,所以不能跟之前的模式一样,直接引入了,而是要基于async模式来改写了。当然你要手动创建连接也可以将sessionmaker变成AsyncSession,同样引入还是from sqlalchemy.ext.asyncio import AsyncSession

完整版

因为增删改查的感觉都差不多,所以我就直接列个查询的例子好了,整体没有区别。

from sqlalchemy.ext.asyncio import create_async_engine

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column,BIGINT,Integer

Base = declarative_base()

class Tickets(Base):
    __tablename__ = 'back_base_tickets'
    id = Column(BIGINT,primary_key=True)
    ticket_type = Column(Integer)
    app_id = Column(Integer)
    biz_id = Column(Integer)
    status = Column(Integer)


async def async_main():
    async with async_engine.connect() as conn:
        # 进行结果查询
        result = await conn.execute(select(Tickets).where(Tickets.ticket_type == 2))
        # 返回结果查询
        return result.fetchall()

    # 对于在函数范围中创建的异步引擎,关闭 、清理池连接
    await async_engine.dispose()

如果需要使用,只要在异步函数中调用await async_main()即可。 因为我是仍到apschedulerAsyncIOScheduler里面去跑的,这里就不贴代码了,基本都一样。有兴趣的可以看我之前的文章。