使用Fabric自动化你的部署流程
Python中广泛使用的对象关系映射工具提供了高效的数据操作方案。本文将演示如何通过该工具实现数据库交互功能。
章节导航
- 安装指导
- 核心组件解析
- 数据源连接配置
- 模型结构设计
- 表结构生成
- 基础数据操作
- 查询方法详解
- 关联关系处理
- 事务控制机制
- 实践建议
安装步骤
终端命令:
pip install sqlalchemy
根据目标数据库类型需额外安装驱动:
终端命令:
# PostgreSQL
pip install psycopg2-binary
# MySQL
pip install mysql-connector-python
# SQLite (内置无需安装)
核心组件
- 数据库连接池:负责管理数据库连接资源
- 会话管理器:处理持久化对象的生命周期
- 模型类:映射数据库表结构的类定义
- 查询构建器:用于构造和执行数据库查询
数据源连接配置
Python代码示例:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# SQLite连接示例
db_engine = create_engine('sqlite:///example.db', echo=True)
# PostgreSQL连接示例
# db_engine = create_engine('postgresql://user:pass@localhost:5432/dbname')
# MySQL连接示例
# db_engine = create_engine('mysql+mysqlconnector://user:pass@localhost:3306/dbname')
# 会话工厂创建
session_factory = sessionmaker(autocommit=False, autoflush=False, bind=db_engine)
# 会话实例化
db_session = session_factory()
模型结构设计
Python代码示例:
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
# 基类定义
Base = declarative_base()
class Customer(Base):
__tablename__ = 'customers'
id = Column(Integer, primary_key=True, index=True)
full_name = Column(String(50), nullable=False)
email_address = Column(String(100), unique=True, index=True)
# 一对多关系定义
articles = relationship("BlogPost", back_populates="owner")
class BlogPost(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True, index=True)
title = Column(String(100), nullable=False)
content_text = Column(String(500))
owner_id = Column(Integer, ForeignKey('customers.id'))
# 多对一关系定义
owner = relationship("Customer", back_populates="articles")
# 多对多关系(通过关联表)
tags = relationship("Tag", secondary="post_tags", back_populates="posts")
class Tag(Base):
__tablename__ = 'tags'
id = Column(Integer, primary_key=True, index=True)
tag_name = Column(String(30), unique=True, nullable=False)
posts = relationship("BlogPost", secondary="post_tags", back_populates="tags")
# 关联表定义(用于多对多关系)
class PostTag(Base):
__tablename__ = 'post_tags'
post_id = Column(Integer, ForeignKey('posts.id'), primary_key=True)
tag_id = Column(Integer, ForeignKey('tags.id'), primary_key=True)
表结构生成
Python代码示例:
# 创建所有表结构
Base.metadata.create_all(bind=db_engine)
# 删除所有表结构
# Base.metadata.drop_all(bind=db_engine)
基础数据操作
数据插入
Python代码示例:
# 新增客户记录
new_customer = Customer(full_name="张三", email_address="zhangsan@example.com")
db_session.add(new_customer)
db_session.commit()
# 批量插入
db_session.add_all([
Customer(full_name="李四", email_address="lisi@example.com"),
Customer(full_name="王五", email_address="wangwu@example.com")
])
db_session.commit()
数据检索
Python代码示例:
# 获取所有客户
all_customers = db_session.query(Customer).all()
# 获取首条记录
first_record = db_session.query(Customer).first()
# 按ID获取
record = db_session.query(Customer).get(1)
数据更新
Python代码示例:
# 查询并更新
record = db_session.query(Customer).get(1)
record.full_name = "张三四"
db_session.commit()
# 批量更新
db_session.query(Customer).filter(Customer.full_name.like("张%")).update({"full_name": "张氏"}, synchronize_session=False)
db_session.commit()
数据删除
Python代码示例:
# 查询并删除
record = db_session.query(Customer).get(1)
db_session.delete(record)
db_session.commit()
# 批量删除
db_session.query(Customer).filter(Customer.full_name == "李四").delete(synchronize_session=False)
db_session.commit()
数据查询
基础查询
Python代码示例:
# 获取全部记录
all_records = db_session.query(Customer).all()
# 获取特定字段
names = db_session.query(Customer.full_name).all()
# 排序
sorted_records = db_session.query(Customer).order_by(Customer.full_name.desc()).all()
# 限制结果数量
limited_results = db_session.query(Customer).limit(10).all()
# 分页查询
paged_results = db_session.query(Customer).offset(5).limit(10).all()
过滤查询
Python代码示例:
from sqlalchemy import or_
# 等值查询
record = db_session.query(Customer).filter(Customer.full_name == "张三").first()
# 模糊查询
filtered_records = db_session.query(Customer).filter(Customer.full_name.like("张%")).all()
# IN查询
in_records = db_session.query(Customer).filter(Customer.full_name.in_(["张三", "李四"])).all()
# 多条件查询
multi_cond_records = db_session.query(Customer).filter(
Customer.full_name == "张三",
Customer.email_address.like("%@example.com")
).all()
# 或条件查询
or_records = db_session.query(Customer).filter(
or_(Customer.full_name == "张三", Customer.full_name == "李四")
).all()
# 不等于查询
not_equal_records = db_session.query(Customer).filter(Customer.full_name != "张三").all()
聚合查询
Python代码示例:
from sqlalchemy import func
# 记录计数
total_count = db_session.query(Customer).count()
# 分组统计
grouped_stats = db_session.query(
Customer.full_name,
func.count(BlogPost.id)
).join(BlogPost).group_by(Customer.full_name).all()
# 数值计算
avg_id = db_session.query(func.avg(Customer.id)).scalar()
关联查询
Python代码示例:
# 内连接查询
joined_results = db_session.query(Customer, BlogPost).join(BlogPost).filter(BlogPost.title.like("%Python%")).all()
# 左外连接查询
left_joined = db_session.query(Customer, BlogPost).outerjoin(BlogPost).all()
# 指定连接条件
custom_joined = db_session.query(Customer, BlogPost).join(BlogPost, Customer.id == BlogPost.owner_id).all()
关系操作
Python代码示例:
# 创建关联对象
customer = Customer(full_name="赵六", email_address="zhaoliu@example.com")
post = BlogPost(title="我的第一篇博客", content_text="Hello World!", owner=customer)
db_session.add(post)
db_session.commit()
# 通过关系访问
print(f"文章 '{post.title}' 的作者是 {post.owner.full_name}")
print(f"用户 {customer.full_name} 的所有文章:")
for p in customer.articles:
print(f" - {p.title}")
# 多对多关系操作
python_tag = Tag(tag_name="Python")
sqlalchemy_tag = Tag(tag_name="SQLAlchemy")
post.tags.append(python_tag)
post.tags.append(sqlalchemy_tag)
db_session.commit()
print(f"文章 '{post.title}' 的标签:")
for tag in post.tags:
print(f" - {tag.tag_name}")
事务管理
Python代码示例:
# 自动提交事务
try:
new_record = Customer(full_name="测试用户", email_address="test@example.com")
db_session.add(new_record)
db_session.commit()
except Exception as e:
db_session.rollback()
print(f"发生错误: {e}")
# 事务上下文管理
from sqlalchemy.orm import Session
def create_customer(session: Session, name: str, email: str):
try:
customer = Customer(full_name=name, email_address=email)
session.add(customer)
session.commit()
return customer
except:
session.rollback()
raise
# 嵌套事务
with db_session.begin_nested():
customer = Customer(full_name="事务用户", email_address="transaction@example.com")
db_session.add(customer)
# 保存点管理
savepoint = db_session.begin_nested()
try:
customer = Customer(full_name="保存点用户", email_address="savepoint@example.com")
db_session.add(customer)
savepoint.commit()
except:
savepoint.rollback()
实践建议
- 会话生命周期:采用请求-响应模式管理会话
- 异常处理机制:实现完善的错误回滚逻辑
- 加载策略:采用预加载优化减少查询次数
- 连接池配置:根据负载调整连接池参数
- 数据校验:在模型层实现数据验证规则
Python代码示例:
# 上下文管理器实现
from contextlib import contextmanager
@contextmanager
def get_db():
db = session_factory()
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
finally:
db.close()
# 使用示例
with get_db() as db:
customer = Customer(full_name="上下文用户", email_address="context@example.com")
db.add(customer)
总结
该工具提供了强大的数据库操作能力,通过本文学习可以实现:
- 环境配置与依赖安装
- 数据模型定义与关系构建
- 增删改查操作实现
- 复杂查询构建
- 事务控制机制
- 开发规范遵循
该框架还包含更多高级功能,如属性混合、事件监听等,建议深入研究以充分发挥其能力。