当前位置:首页 > 技术 > 正文内容

使用SQLAlchemy ORM进行Python数据库操作

访客 技术 2026年6月22日 18

SQLAlchemy 是 Python 中备受欢迎的对象关系映射 (ORM) 框架,它提供了一种高效且灵活的方式来与数据库交互。本文将指导您如何利用 SQLAlchemy ORM 进行数据库操作。

目录

  1. 安装 SQLAlchemy
  2. 核心概念
  3. 数据库连接
  4. 数据模型定义
  5. 表结构创建
  6. 基础增删改查 (CRUD)
  7. 数据检索
  8. 模型间关系
  9. 事务控制
  10. 工程实践

安装

使用 pip 进行安装:

pip install sqlalchemy

若要连接特定数据库,请安装相应的数据库驱动:

# PostgreSQL
pip install psycopg2-binary

# MySQL
pip install mysql-connector-python

# SQLite (Python 内置,无需额外安装)

核心概念

  • Engine: 数据库交互的入口,负责与数据库建立连接。
  • Session: 数据库会话,用于管理对象的状态和数据库操作。
  • Model: 映射数据库表的 Python 类。
  • Query: 用于构建和执行数据库查询的对象。

数据库连接

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 示例:SQLite 数据库连接
db_uri = 'sqlite:///data.db'
db_engine = create_engine(db_uri, echo=True) # echo=True 会打印 SQL 语句

# 示例:PostgreSQL 连接
# db_uri = 'postgresql://user:password@host:port/database_name'
# db_engine = create_engine(db_uri)

# 示例:MySQL 连接
# db_uri = 'mysql+mysqlconnector://user:password@host:port/database_name'
# db_engine = create_engine(db_uri)

# 创建会话工厂
SessionGenerator = sessionmaker(autocommit=False, autoflush=False, bind=db_engine)

# 获取一个数据库会话实例
current_session = SessionGenerator()

数据模型定义

使用 SQLAlchemy 的声明式基类来定义模型。

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base

# 声明式基类
BaseModel = declarative_base()

class Customer(BaseModel):
    __tablename__ = 'customers'

    customer_id = Column('id', Integer, primary_key=True, index=True)
    full_name = Column(String(100), nullable=False, index=True)
    contact_email = Column(String(150), unique=True, index=True)

    # 定义与 Order 模型的一对多关系
    orders = relationship("Order", back_populates="customer")

class Order(BaseModel):
    __tablename__ = 'orders'

    order_id = Column('id', Integer, primary_key=True, index=True)
    order_code = Column(String(50), nullable=False, index=True)
    customer_id = Column(Integer, ForeignKey('customers.id'))

    # 定义与 Customer 模型的多对一关系
    customer = relationship("Customer", back_populates="orders")

    # 定义与 Product 模型的多对多关系 (通过 OrderItem 关联表)
    products = relationship("Product", secondary="order_items", back_populates="orders")

class Product(BaseModel):
    __tablename__ = 'products'

    product_id = Column('id', Integer, primary_key=True, index=True)
    product_name = Column(String(100), nullable=False, unique=True)

    orders = relationship("Order", secondary="order_items", back_populates="products")

# 订单项关联表
class OrderItem(BaseModel):
    __tablename__ = 'order_items'

    order_item_id = Column('id', Integer, primary_key=True) # 实际可以不设置主键,如果仅用作关联
    order_ref = Column(Integer, ForeignKey('orders.id'), primary_key=True)
    product_ref = Column(Integer, ForeignKey('products.id'), primary_key=True)

表结构创建

根据模型定义创建数据库表。

# 创建所有在 BaseModel 中定义的表
BaseModel.metadata.create_all(bind=db_engine)

# 删除所有表 (谨慎操作!)
# BaseModel.metadata.drop_all(bind=db_engine)

基础增删改查 (CRUD)

创建数据

# 创建新客户
new_customer = Customer(full_name="张三", contact_email="zhangsan@example.com")
current_session.add(new_customer)

# 批量添加
customer_li = Customer(full_name="李四", contact_email="lisi@example.com")
customer_wang = Customer(full_name="王五", contact_email="wangwu@example.com")
current_session.add_all([customer_li, customer_wang])

# 提交事务
current_session.commit()

读取数据

# 获取所有客户
all_customers = current_session.query(Customer).all()

# 获取第一条记录
first_customer = current_session.query(Customer).first()

# 根据主键 ID 获取
customer_by_id = current_session.query(Customer).get(1)

更新数据

# 查询并修改
customer_to_update = current_session.query(Customer).filter(Customer.customer_id == 1).first()
if customer_to_update:
    customer_to_update.full_name = "张三丰"
    current_session.commit()

# 批量更新
current_session.query(Customer).filter(Customer.full_name.like("李%")).update(
    {"full_name": "李先生"}, synchronize_session='fetch'
)
current_session.commit()

删除数据

# 查询并删除
customer_to_delete = current_session.query(Customer).filter(Customer.contact_email == "wangwu@example.com").first()
if customer_to_delete:
    current_session.delete(customer_to_delete)
    current_session.commit()

# 批量删除
current_session.query(Customer).filter(Customer.full_name.in_(["李四", "李四甲"])).delete(synchronize_session='fetch')
current_session.commit()

数据检索

基础查询

# 获取所有客户姓名
customer_names = current_session.query(Customer.full_name).all()

# 按名称降序排序
sorted_customers = current_session.query(Customer).order_by(Customer.full_name.desc()).all()

# 限制返回数量
limited_customers = current_session.query(Customer).limit(5).all()

# 分页查询
paginated_customers = current_session.query(Customer).offset(10).limit(5).all()

条件过滤

from sqlalchemy import or_

# 精确匹配
specific_customer = current_session.query(Customer).filter(Customer.full_name == "张三丰").first()

# LIKE 查询
customers_with_zhang = current_session.query(Customer).filter(Customer.full_name.like("张%")).all()

# IN 查询
customers_in_list = current_session.query(Customer).filter(Customer.full_name.in_(["张三", "李四"])).all()

# 组合条件 (AND)
complex_filter = current_session.query(Customer).filter(
    Customer.full_name.startswith("张"),
    Customer.contact_email.endswith("@example.com")
).all()

# OR 条件
or_filter = current_session.query(Customer).filter(
    or_(Customer.full_name == "张三", Customer.full_name == "李四")
).all()

# 不等于
not_equal_filter = current_session.query(Customer).filter(Customer.full_name != "王五").all()

聚合查询

from sqlalchemy import func

# 客户总数
total_customers = current_session.query(Customer).count()

# 统计每个客户的订单数量
from sqlalchemy.orm import ali<bos>

# 使用 aliased 来避免命名冲突
customer_order_counts = current_session.query(
    Customer.full_name,
    func.count(Order.order_id)
).join(Order, isouter=True).group_by(Customer.full_name).all()

# 计算平均 ID
avg_customer_id = current_session.query(func.avg(Customer.customer_id)).scalar()

连接查询

# 内连接查询客户及其订单
customer_orders = current_session.query(Customer, Order).join(Order).filter(Order.order_code.like("%INV%")).all()

# 左外连接查询所有客户,包括没有订单的
all_customers_with_orders = current_session.query(Customer, Order).outerjoin(Order).all()

# 指定连接条件
explicit_join = current_session.query(Customer, Order).join(Order, Customer.customer_id == Order.customer_id).all()

模型间关系

# 创建与关联对象
customer_zhao = Customer(full_name="赵六", contact_email="zhaoliu@example.com")
order_zhao = Order(order_code="ORD-001", customer=customer_zhao) # 通过对象关联
current_session.add(order_zhao)
current_session.commit()

# 通过关系访问数据
print(f"订单 {order_zhao.order_code} 的客户是: {order_zhao.customer.full_name}")
print(f"客户 {customer_zhao.full_name} 的所有订单:")
for o in customer_zhao.orders:
    print(f"  - {o.order_code}")

# 多对多关系操作
product_a = Product(product_name="产品A")
product_b = Product(product_name="产品B")
current_session.add_all([product_a, product_b])
current_session.commit()

order_zhao.products.append(product_a)
order_zhao.products.append(product_b)
current_session.commit()

print(f"订单 {order_zhao.order_code} 包含的产品:")
for p in order_zhao.products:
    print(f"  - {p.product_name}")

事务控制

# 正常事务提交与回滚
try:
    new_customer = Customer(full_name="事务测试", contact_email="tx@example.com")
    current_session.add(new_customer)
    current_session.commit()
except Exception as e:
    current_session.rollback() # 发生错误时回滚
    print(f"操作失败: {e}")

# 使用上下文管理器进行事务管理
from sqlalchemy.orm import Session

def add_product(db: Session, name: str):
    try:
        new_product = Product(product_name=name)
        db.add(new_product)
        db.commit()
        return new_product
    except:
        db.rollback()
        raise

# 嵌套事务
with current_session.begin_nested():
    customer_nested = Customer(full_name="嵌套用户", contact_email="nested@example.com")
    current_session.add(customer_nested)
    # 内部操作成功,外部事务继续;失败则内部回滚

# 保存点
save_point = current_session.begin_nested()
try:
    customer_sp = Customer(full_name="保存点用户", contact_email="sp@example.com")
    current_session.add(customer_sp)
    save_point.commit() # 提交保存点
except:
    save_point.rollback() # 回滚到保存点

工程实践

  1. 会话生命周期: 为每个业务操作或请求管理独立的会话,并在操作完成后关闭。
  2. 错误处理: 总是包含 `try...except` 块来捕获数据库错误并执行 `rollback`。
  3. 性能优化: 关注 SQLAlchemy 的加载策略(`lazy`, `joined`, `selectin`)以避免 N+1 查询问题。
  4. 连接池配置: 根据应用负载调整引擎的连接池大小和超时设置。
  5. 数据校验: 在模型层面或服务层实现数据输入的验证逻辑。

使用上下文管理器简化会话管理:

from contextlib import contextmanager

@contextmanager
def db_session_context():
    session_instance = SessionGenerator()
    try:
        yield session_instance
        session_instance.commit()
    except Exception as e:
        session_instance.rollback()
        print(f"上下文操作失败: {e}")
        raise
    finally:
        session_instance.close()

# 使用示例
with db_session_context() as db:
    new_product_ctx = Product(product_name="上下文产品")
    db.add(new_product_ctx)
标签: SQLAlchemy

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。