使用SQLAlchemy ORM进行Python数据库操作
SQLAlchemy 是 Python 中备受欢迎的对象关系映射 (ORM) 框架,它提供了一种高效且灵活的方式来与数据库交互。本文将指导您如何利用 SQLAlchemy ORM 进行数据库操作。
目录
- 安装 SQLAlchemy
- 核心概念
- 数据库连接
- 数据模型定义
- 表结构创建
- 基础增删改查 (CRUD)
- 数据检索
- 模型间关系
- 事务控制
- 工程实践
安装
使用 pip 进行安装:
pip install sqlalchemy
若要连接特定数据库,请安装相应的数据库驱动:
# PostgreSQL
pip install psycopg2-binary
# MySQL
pip install mysql-connector-python
# SQLite (Python 内置,无需额外安装)
核心概念
- Engine: 数据库交互的入口,负责与数据库建立连接。
- Session: 数据库会话,用于管理对象的状态和数据库操作。
- Model: 映射数据库表的 Python 类。
- Query: 用于构建和执行数据库查询的对象。
数据库连接
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 示例:SQLite 数据库连接
db_uri = 'sqlite:///data.db'
db_engine = create_engine(db_uri, echo=True) # echo=True 会打印 SQL 语句
# 示例:PostgreSQL 连接
# db_uri = 'postgresql://user:password@host:port/database_name'
# db_engine = create_engine(db_uri)
# 示例:MySQL 连接
# db_uri = 'mysql+mysqlconnector://user:password@host:port/database_name'
# db_engine = create_engine(db_uri)
# 创建会话工厂
SessionGenerator = sessionmaker(autocommit=False, autoflush=False, bind=db_engine)
# 获取一个数据库会话实例
current_session = SessionGenerator()
数据模型定义
使用 SQLAlchemy 的声明式基类来定义模型。
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
# 声明式基类
BaseModel = declarative_base()
class Customer(BaseModel):
__tablename__ = 'customers'
customer_id = Column('id', Integer, primary_key=True, index=True)
full_name = Column(String(100), nullable=False, index=True)
contact_email = Column(String(150), unique=True, index=True)
# 定义与 Order 模型的一对多关系
orders = relationship("Order", back_populates="customer")
class Order(BaseModel):
__tablename__ = 'orders'
order_id = Column('id', Integer, primary_key=True, index=True)
order_code = Column(String(50), nullable=False, index=True)
customer_id = Column(Integer, ForeignKey('customers.id'))
# 定义与 Customer 模型的多对一关系
customer = relationship("Customer", back_populates="orders")
# 定义与 Product 模型的多对多关系 (通过 OrderItem 关联表)
products = relationship("Product", secondary="order_items", back_populates="orders")
class Product(BaseModel):
__tablename__ = 'products'
product_id = Column('id', Integer, primary_key=True, index=True)
product_name = Column(String(100), nullable=False, unique=True)
orders = relationship("Order", secondary="order_items", back_populates="products")
# 订单项关联表
class OrderItem(BaseModel):
__tablename__ = 'order_items'
order_item_id = Column('id', Integer, primary_key=True) # 实际可以不设置主键,如果仅用作关联
order_ref = Column(Integer, ForeignKey('orders.id'), primary_key=True)
product_ref = Column(Integer, ForeignKey('products.id'), primary_key=True)
表结构创建
根据模型定义创建数据库表。
# 创建所有在 BaseModel 中定义的表
BaseModel.metadata.create_all(bind=db_engine)
# 删除所有表 (谨慎操作!)
# BaseModel.metadata.drop_all(bind=db_engine)
基础增删改查 (CRUD)
创建数据
# 创建新客户
new_customer = Customer(full_name="张三", contact_email="zhangsan@example.com")
current_session.add(new_customer)
# 批量添加
customer_li = Customer(full_name="李四", contact_email="lisi@example.com")
customer_wang = Customer(full_name="王五", contact_email="wangwu@example.com")
current_session.add_all([customer_li, customer_wang])
# 提交事务
current_session.commit()
读取数据
# 获取所有客户
all_customers = current_session.query(Customer).all()
# 获取第一条记录
first_customer = current_session.query(Customer).first()
# 根据主键 ID 获取
customer_by_id = current_session.query(Customer).get(1)
更新数据
# 查询并修改
customer_to_update = current_session.query(Customer).filter(Customer.customer_id == 1).first()
if customer_to_update:
customer_to_update.full_name = "张三丰"
current_session.commit()
# 批量更新
current_session.query(Customer).filter(Customer.full_name.like("李%")).update(
{"full_name": "李先生"}, synchronize_session='fetch'
)
current_session.commit()
删除数据
# 查询并删除
customer_to_delete = current_session.query(Customer).filter(Customer.contact_email == "wangwu@example.com").first()
if customer_to_delete:
current_session.delete(customer_to_delete)
current_session.commit()
# 批量删除
current_session.query(Customer).filter(Customer.full_name.in_(["李四", "李四甲"])).delete(synchronize_session='fetch')
current_session.commit()
数据检索
基础查询
# 获取所有客户姓名
customer_names = current_session.query(Customer.full_name).all()
# 按名称降序排序
sorted_customers = current_session.query(Customer).order_by(Customer.full_name.desc()).all()
# 限制返回数量
limited_customers = current_session.query(Customer).limit(5).all()
# 分页查询
paginated_customers = current_session.query(Customer).offset(10).limit(5).all()
条件过滤
from sqlalchemy import or_
# 精确匹配
specific_customer = current_session.query(Customer).filter(Customer.full_name == "张三丰").first()
# LIKE 查询
customers_with_zhang = current_session.query(Customer).filter(Customer.full_name.like("张%")).all()
# IN 查询
customers_in_list = current_session.query(Customer).filter(Customer.full_name.in_(["张三", "李四"])).all()
# 组合条件 (AND)
complex_filter = current_session.query(Customer).filter(
Customer.full_name.startswith("张"),
Customer.contact_email.endswith("@example.com")
).all()
# OR 条件
or_filter = current_session.query(Customer).filter(
or_(Customer.full_name == "张三", Customer.full_name == "李四")
).all()
# 不等于
not_equal_filter = current_session.query(Customer).filter(Customer.full_name != "王五").all()
聚合查询
from sqlalchemy import func
# 客户总数
total_customers = current_session.query(Customer).count()
# 统计每个客户的订单数量
from sqlalchemy.orm import ali<bos>
# 使用 aliased 来避免命名冲突
customer_order_counts = current_session.query(
Customer.full_name,
func.count(Order.order_id)
).join(Order, isouter=True).group_by(Customer.full_name).all()
# 计算平均 ID
avg_customer_id = current_session.query(func.avg(Customer.customer_id)).scalar()
连接查询
# 内连接查询客户及其订单
customer_orders = current_session.query(Customer, Order).join(Order).filter(Order.order_code.like("%INV%")).all()
# 左外连接查询所有客户,包括没有订单的
all_customers_with_orders = current_session.query(Customer, Order).outerjoin(Order).all()
# 指定连接条件
explicit_join = current_session.query(Customer, Order).join(Order, Customer.customer_id == Order.customer_id).all()
模型间关系
# 创建与关联对象
customer_zhao = Customer(full_name="赵六", contact_email="zhaoliu@example.com")
order_zhao = Order(order_code="ORD-001", customer=customer_zhao) # 通过对象关联
current_session.add(order_zhao)
current_session.commit()
# 通过关系访问数据
print(f"订单 {order_zhao.order_code} 的客户是: {order_zhao.customer.full_name}")
print(f"客户 {customer_zhao.full_name} 的所有订单:")
for o in customer_zhao.orders:
print(f" - {o.order_code}")
# 多对多关系操作
product_a = Product(product_name="产品A")
product_b = Product(product_name="产品B")
current_session.add_all([product_a, product_b])
current_session.commit()
order_zhao.products.append(product_a)
order_zhao.products.append(product_b)
current_session.commit()
print(f"订单 {order_zhao.order_code} 包含的产品:")
for p in order_zhao.products:
print(f" - {p.product_name}")
事务控制
# 正常事务提交与回滚
try:
new_customer = Customer(full_name="事务测试", contact_email="tx@example.com")
current_session.add(new_customer)
current_session.commit()
except Exception as e:
current_session.rollback() # 发生错误时回滚
print(f"操作失败: {e}")
# 使用上下文管理器进行事务管理
from sqlalchemy.orm import Session
def add_product(db: Session, name: str):
try:
new_product = Product(product_name=name)
db.add(new_product)
db.commit()
return new_product
except:
db.rollback()
raise
# 嵌套事务
with current_session.begin_nested():
customer_nested = Customer(full_name="嵌套用户", contact_email="nested@example.com")
current_session.add(customer_nested)
# 内部操作成功,外部事务继续;失败则内部回滚
# 保存点
save_point = current_session.begin_nested()
try:
customer_sp = Customer(full_name="保存点用户", contact_email="sp@example.com")
current_session.add(customer_sp)
save_point.commit() # 提交保存点
except:
save_point.rollback() # 回滚到保存点
工程实践
- 会话生命周期: 为每个业务操作或请求管理独立的会话,并在操作完成后关闭。
- 错误处理: 总是包含 `try...except` 块来捕获数据库错误并执行 `rollback`。
- 性能优化: 关注 SQLAlchemy 的加载策略(`lazy`, `joined`, `selectin`)以避免 N+1 查询问题。
- 连接池配置: 根据应用负载调整引擎的连接池大小和超时设置。
- 数据校验: 在模型层面或服务层实现数据输入的验证逻辑。
使用上下文管理器简化会话管理:
from contextlib import contextmanager
@contextmanager
def db_session_context():
session_instance = SessionGenerator()
try:
yield session_instance
session_instance.commit()
except Exception as e:
session_instance.rollback()
print(f"上下文操作失败: {e}")
raise
finally:
session_instance.close()
# 使用示例
with db_session_context() as db:
new_product_ctx = Product(product_name="上下文产品")
db.add(new_product_ctx)