sqlalchemy是python中最流行的orm(对象关系映射)框架之一,它提供了高效且灵活的数据库操作方式。本文将介绍如何使用sqlalchemy orm进行数据库操作。
安装
pip install sqlalchemy
如果需要连接特定数据库,还需安装相应的驱动程序:
# postgresql pip install psycopg2-binary # mysql pip install mysql-connector-python # sqlite (python标准库已包含,无需额外安装)
核心概念
- engine:数据库连接的引擎,负责与数据库通信
- session:数据库会话,管理所有持久化操作
- model:数据模型类,对应数据库中的表
- query:查询对象,用于构建和执行数据库查询
连接数据库
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # 创建数据库连接引擎 # sqlite示例 engine = create_engine('sqlite:///example.db', echo=true) # postgresql示例 # engine = create_engine('postgresql://username:password@localhost:5432/mydatabase') # mysql示例 # engine = create_engine('mysql+mysqlconnector://username:password@localhost:3306/mydatabase') # 创建会话工厂 sessionlocal = sessionmaker(autocommit=false, autoflush=false, bind=engine) # 创建会话实例 session = sessionlocal()
定义数据模型
from sqlalchemy import column, integer, string, foreignkey from sqlalchemy.orm import relationship, declarative_base # 创建基类 base = declarative_base() class user(base): __tablename__ = 'users' id = column(integer, primary_key=true, index=true) name = column(string(50), nullable=false) email = column(string(100), unique=true, index=true) # 定义一对多关系 posts = relationship("post", back_populates="author") class post(base): __tablename__ = 'posts' id = column(integer, primary_key=true, index=true) title = column(string(100), nullable=false) content = column(string(500)) author_id = column(integer, foreignkey('users.id')) # 定义多对一关系 author = relationship("user", back_populates="posts") # 定义多对多关系(通过关联表) tags = relationship("tag", secondary="post_tags", back_populates="posts") class tag(base): __tablename__ = 'tags' id = column(integer, primary_key=true, index=true) name = column(string(30), unique=true, nullable=false) posts = relationship("post", secondary="post_tags", back_populates="tags") # 关联表(用于多对多关系) class posttag(base): __tablename__ = 'post_tags' post_id = column(integer, foreignkey('posts.id'), primary_key=true) tag_id = column(integer, foreignkey('tags.id'), primary_key=true)
创建数据库表
# 创建所有表 base.metadata.create_all(bind=engine) # 删除所有表 # base.metadata.drop_all(bind=engine)
基本crud操作
创建数据
# 创建新用户 new_user = user(name="张三", email="zhangsan@example.com") session.add(new_user) session.commit() # 批量创建 session.add_all([ user(name="李四", email="lisi@example.com"), user(name="王五", email="wangwu@example.com") ]) session.commit()
读取数据
# 获取所有用户 users = session.query(user).all() # 获取第一个用户 first_user = session.query(user).first() # 根据id获取用户 user = session.query(user).get(1)
更新数据
# 查询并更新 user = session.query(user).get(1) user.name = "张三四" session.commit() # 批量更新 session.query(user).filter(user.name.like("张%")).update({"name": "张氏"}, synchronize_session=false) session.commit()
删除数据
# 查询并删除 user = session.query(user).get(1) session.delete(user) session.commit() # 批量删除 session.query(user).filter(user.name == "李四").delete(synchronize_session=false) session.commit()
查询数据
基本查询
# 获取所有记录 users = session.query(user).all() # 获取特定字段 names = session.query(user.name).all() # 排序 users = session.query(user).order_by(user.name.desc()).all() # 限制结果数量 users = session.query(user).limit(10).all() # 偏移量 users = session.query(user).offset(5).limit(10).all()
过滤查询
from sqlalchemy import or_ # 等值过滤 user = session.query(user).filter(user.name == "张三").first() # 模糊查询 users = session.query(user).filter(user.name.like("张%")).all() # in查询 users = session.query(user).filter(user.name.in_(["张三", "李四"])).all() # 多条件查询 users = session.query(user).filter( user.name == "张三", user.email.like("%@example.com") ).all() # 或条件 users = session.query(user).filter( or_(user.name == "张三", user.name == "李四") ).all() # 不等于 users = session.query(user).filter(user.name != "张三").all()
聚合查询
from sqlalchemy import func # 计数 count = session.query(user).count() # 分组计数 user_post_count = session.query( user.name, func.count(post.id) ).join(post).group_by(user.name).all() # 求和、平均值等 avg_id = session.query(func.avg(user.id)).scalar()
连接查询
# 内连接 results = session.query(user, post).join(post).filter(post.title.like("%python%")).all() # 左外连接 results = session.query(user, post).outerjoin(post).all() # 指定连接条件 results = session.query(user, post).join(post, user.id == post.author_id).all()
关系操作
# 创建带关系的对象 user = user(name="赵六", email="zhaoliu@example.com") post = post(title="我的第一篇博客", content="hello world!", author=user) session.add(post) session.commit() # 通过关系访问 print(f"文章 '{post.title}' 的作者是 {post.author.name}") print(f"用户 {user.name} 的所有文章:") for p in user.posts: print(f" - {p.title}") # 多对多关系操作 python_tag = tag(name="python") sqlalchemy_tag = tag(name="sqlalchemy") post.tags.append(python_tag) post.tags.append(sqlalchemy_tag) session.commit() print(f"文章 '{post.title}' 的标签:") for tag in post.tags: print(f" - {tag.name}")
事务管理
# 自动提交事务 try: user = user(name="测试用户", email="test@example.com") session.add(user) session.commit() except exception as e: session.rollback() print(f"发生错误: {e}") # 使用事务上下文管理器 from sqlalchemy.orm import session def create_user(session: session, name: str, email: str): try: user = user(name=name, email=email) session.add(user) session.commit() return user except: session.rollback() raise # 嵌套事务 with session.begin_nested(): user = user(name="事务用户", email="transaction@example.com") session.add(user) # 保存点 savepoint = session.begin_nested() try: user = user(name="保存点用户", email="savepoint@example.com") session.add(user) savepoint.commit() except: savepoint.rollback()
最佳实践
- 会话管理:为每个请求创建新会话,请求结束后关闭
- 异常处理:始终处理异常并适当回滚事务
- 延迟加载:注意n+1查询问题,使用 eager loading 优化
- 连接池:合理配置连接池大小和超时设置
- 数据验证:在模型层或应用层验证数据完整性
# 使用上下文管理器管理会话 from contextlib import contextmanager @contextmanager def get_db(): db = sessionlocal() try: yield db db.commit() except exception: db.rollback() raise finally: db.close() # 使用示例 with get_db() as db: user = user(name="上下文用户", email="context@example.com") db.add(user)
总结
sqlalchemy orm提供了强大而灵活的数据库操作方式,通过本文的介绍,您应该能够:
- 安装和配置sqlalchemy
- 定义数据模型和关系
- 执行基本的crud操作
- 构建复杂查询
- 管理数据库事务
- 遵循最佳实践
sqlalchemy还有更多高级特性,如混合属性、事件监听、自定义查询等,值得进一步探索学习。
到此这篇关于python内存管理机制垃圾回收与引用计数的文章就介绍到这了,更多相关python垃圾回收与引用计数内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论