MySQL数据库高级操作技巧实战
环境准备与数据库连接
本文介绍使用Python操作MySQL数据库的实用技巧,包括数据透视、排名计算、连续性问题等高级应用。
# 导入所需库
import numpy as np
import pandas as pd
import pymysql
# 建立数据库连接
conn = pymysql.connect(
host='127.0.0.1', # 数据库服务器地址
user='root', # 用户名
password='password', # 密码
database='testdb', # 指定的数据库
port=3306, # 端口号
charset='utf8' # 字符编码
)
# 创建字典类型游标,返回带字段名的结果集
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
定义查询辅助函数,便于执行SQL并返回DataFrame格式:
def execute_query(sql):
"""执行SQL查询并返回DataFrame"""
cursor.execute(sql)
return pd.DataFrame(cursor)
数据库基本操作
创建数据库和表
# 查看现有数据库
execute_query('SHOW DATABASES;')
# 删除已存在的数据库(如果需要)
execute_query('DROP DATABASE IF EXISTS student_db')
# 创建新数据库
execute_query('CREATE DATABASE student_db')
# 使用该数据库
execute_query('USE student_db')
生成模拟学生成绩数据:
# 生成30条学生成绩数据
np.random.seed(42)
score_data = pd.DataFrame(np.random.randint(50, 100, (30, 3)))
score_data.columns = ["chinese", "math", "english"]
score_data.index = ['S' + str(i) for i in range(1, 31)]
score_data.insert(0, "class_name", np.random.choice(["Class1", "Class2", "Class3"], 30))
score_data.insert(0, "gender", np.random.choice(["M", "F"], 30))
score_data = score_data.reset_index().rename(columns={"index": "student_name"})
score_data.head()
# 创建成绩表
execute_query('''
CREATE TABLE scores(
student_name VARCHAR(50),
gender VARCHAR(10),
class_name VARCHAR(20),
chinese FLOAT,
math FLOAT,
english FLOAT
)
''')
# 插入数据
insert_sql = 'INSERT INTO scores VALUES(%s, %s, %s, %s, %s, %s)'
values = score_data.values.tolist()
cursor.executemany(insert_sql, values)
conn.commit()
# 验证数据插入
result = execute_query('SELECT * FROM scores LIMIT 5')
result
CASE WHEN条件表达式
CASE WHEN是SQL中强大的条件判断工具,可实现数据映射和分箱功能。
数据映射转换
# 性别编码转换:M映射为1,F映射为0
execute_query('''
SELECT *,
CASE
WHEN gender = 'M' THEN 1
ELSE 0
END AS gender_code
FROM scores
''').head()
数据透视表实现
利用CASE WHEN结合GROUP BY可以实现类似Excel的数据透视表功能:
# 按性别统计各班级的平均成绩
execute_query('''
SELECT
gender,
AVG(CASE WHEN class_name = 'Class1' THEN chinese + math + english END) AS class1_avg,
AVG(CASE WHEN class_name = 'Class1' THEN chinese END) AS class1_chinese,
AVG(CASE WHEN class_name = 'Class1' THEN math END) AS class1_math,
AVG(CASE WHEN class_name = 'Class2' THEN chinese + math + english END) AS class2_avg,
AVG(CASE WHEN class_name = 'Class2' THEN chinese END) AS class2_chinese,
AVG(CASE WHEN class_name = 'Class3' THEN chinese + math + english END) AS class3_avg,
AVG(CASE WHEN class_name = 'Class3' THEN chinese END) AS class3_chinese
FROM scores
GROUP BY gender
ORDER BY gender
''')
逆透视操作
使用UNION实现列转行的逆透视:
# 将透视后的列数据转换为行数据
execute_query('''
SELECT gender, 'Class1' AS class_name, class1_avg AS total_avg FROM (
SELECT gender,
AVG(CASE WHEN class_name = 'Class1' THEN chinese + math + english END) AS class1_avg,
AVG(CASE WHEN class_name = 'Class2' THEN chinese + math + english END) AS class2_avg
FROM scores GROUP BY gender
) t
UNION ALL
SELECT gender, 'Class2' AS class_name, class2_avg AS total_avg FROM (
SELECT gender,
AVG(CASE WHEN class_name = 'Class1' THEN chinese + math + english END) AS class1_avg,
AVG(CASE WHEN class_name = 'Class2' THEN chinese + math + english END) AS class2_avg
FROM scores GROUP BY gender
) t
''')
IF函数与缺失值处理
IF条件函数
# 使用IF函数进行简单条件转换
execute_query('''
SELECT *, IF(gender = 'M', 1, 0) AS gender_flag
FROM scores
''').head()
IFNULL缺失值填充
使用WITH ROLLUP生成汇总行,并用IFNULL填充NULL值:
# 按班级和性别分组汇总,包含小计行
execute_query('''
SELECT
class_name,
gender,
AVG(chinese) AS chinese_avg,
AVG(math) AS math_avg,
AVG(english) AS english_avg
FROM scores
GROUP BY class_name, gender WITH ROLLUP
''')
# 使用IFNULL处理NULL值
execute_query('''
SELECT
IFNULL(class_name, 'TOTAL') AS class_name,
IFNULL(gender, 'ALL') AS gender,
chinese_avg,
math_avg,
english_avg
FROM (
SELECT
class_name,
gender,
AVG(chinese) AS chinese_avg,
AVG(math) AS math_avg,
AVG(english) AS english_avg
FROM scores
GROUP BY class_name, gender WITH ROLLUP
) t
''')
排名函数详解
全表排名实现
方法一:用户变量实现
# 计算总分并排名(逐行递增)
execute_query('''
SELECT *,
chinese + math + english AS total_score,
@rownum := @rownum + 1 AS row_num
FROM scores, (SELECT @rownum := 0) init
ORDER BY total_score DESC
''').head(5)
方法二:窗口函数ROW_NUMBER()
# 使用窗口函数实现行号
execute_query('''
SELECT *,
chinese + math + english AS total_score,
ROW_NUMBER() OVER(ORDER BY chinese + math + english DESC) AS ranking
FROM scores
''').head(5)
并列连续排名(DENSE_RANK)
并列连续排名,相同值占用同一排名,排名连续不跳跃:
# 使用DENSE_RANK实现并列连续排名
execute_query('''
SELECT *,
chinese + math + english AS total_score,
DENSE_RANK() OVER(ORDER BY chinese + math + english DESC) AS dense_ranking
FROM scores
''').head(5)
并列间隔排名(RANK)
并列间隔排名,相同值占用同一排名,但排名会跳跃:
# 使用RANK实现并列间隔排名
execute_query('''
SELECT *,
chinese + math + english AS total_score,
RANK() OVER(ORDER BY chinese + math + english DESC) AS rank_score
FROM scores
''').head(5)
组内排名
使用PARTITION BY实现分组内排名:
# 按班级计算组内排名和年级总排名
execute_query('''
SELECT *,
chinese + math + english AS total_score,
ROW_NUMBER() OVER(PARTITION BY class_name ORDER BY chinese + math + english DESC) AS class_rank,
DENSE_RANK() OVER(ORDER BY chinese + math + english DESC) AS grade_rank
FROM scores
''').head(10)
获取组内TOP N数据
结合窗口函数和子查询获取每个班级成绩前N的学生:
# 获取每个班级前三名的学生
execute_query('''
SELECT * FROM (
SELECT *,
DENSE_RANK() OVER(PARTITION BY class_name ORDER BY chinese + math + english DESC) AS class_rank
FROM scores
) t
WHERE class_rank <= 3
ORDER BY class_name, total_score DESC
''')
集合运算:差集查询
查找存在于A表但不在B表中的记录:
# A表左连接B表,找出B.key为NULL的记录即为A有B没有的
# SELECT * FROM A LEFT JOIN B ON A.key = B.key WHERE B.key IS NULL
累积计算
累积求和
使用聚合函数结合OVER()子句实现累积计算:
# 按英语成绩排序的累积求和
execute_query('''
SELECT *,
SUM(english) OVER(ORDER BY english, student_name) AS eng_cumulative
FROM scores
''').head(10)
分组累积
按班级分别计算累积和:
# 按班级分别计算数学成绩的累积和
execute_query(SELECT *,
SUM(math) OVER(PARTITION BY class_name ORDER BY math, student_name) AS math_cumsum
FROM scores
''').head()
行间差值计算
使用LEAD和LAG函数计算上下行之间的差值:
# 计算英语成绩的上下行差值
execute_query('''
SELECT *,
english,
LEAD(english, 1, 0) OVER() AS eng_next,
LAG(english, 1, 0) OVER() AS eng_prev,
english - LAG(english, 1, 0) OVER() AS eng_diff
FROM scores
''').head(10)
连续登录问题
连续登录问题是经典的SQL面试题,核心思路是:日期排名差值法。
创建用户登录记录表
# 创建登录记录表
execute_query('''
DROP TABLE IF EXISTS login_records
''')
execute_query('''
CREATE TABLE login_records(
user_id VARCHAR(20),
login_date DATE,
login_value FLOAT
)
''')
# 插入模拟数据
login_data = [
['U001', '2017-01-01', 10], ['U001', '2017-01-02', 270], ['U001', '2017-01-04', 60],
['U002', '2017-01-01', 10], ['U002', '2017-01-02', 220], ['U002', '2017-01-03', 110],
['U002', '2017-01-04', 150], ['U002', '2017-01-05', 101], ['U002', '2017-01-06', 68],
['U003', '2017-01-01', 20], ['U003', '2017-01-02', 160], ['U003', '2017-01-03', 160],
['U003', '2017-01-04', 20], ['U003', '2017-01-05', 120], ['U003', '2017-01-06', 20],
['U003', '2017-01-07', 120], ['U004', '2017-01-01', 110], ['U004', '2017-01-02', 70],
['U004', '2017-01-03', 120], ['U004', '2017-01-04', 30], ['U004', '2017-01-05', 60],
['U004', '2017-01-06', 120], ['U004', '2017-01-07', 130]
]
cursor.executemany('INSERT INTO login_records VALUES(%s, %s, %s)', login_data)
conn.commit()
execute_query('SELECT * FROM login_records')
计算最大连续登录天数
核心算法:日期 - 排名 = 差值,差值相同表示连续登录
# 步骤1:为每个用户的登录日期添加行号
execute_query('''
SELECT *,
ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY login_date) AS rn
FROM login_records
''')
# 步骤2:计算日期与排名的差值
execute_query('''
SELECT *,
DATE_SUB(login_date, INTERVAL rn DAY) AS date_diff
FROM (
SELECT *,
ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY login_date) AS rn
FROM login_records
) t1
''')
# 步骤3:按用户和差值分组,统计连续登录天数
execute_query('''
SELECT user_id, MAX(continuous_days) AS max_continuous
FROM (
SELECT user_id, date_diff, COUNT(1) AS continuous_days
FROM (
SELECT *,
DATE_SUB(login_date, INTERVAL rn DAY) AS date_diff
FROM (
SELECT *,
ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY login_date) AS rn
FROM login_records
) t1
) t2
GROUP BY user_id, date_diff
) t3
GROUP BY user_id
''')
计算相邻登录间隔
# 计算相邻两次登录之间的时间间隔
execute_query('''
SELECT *,
DATEDIFF(login_date, LAG(login_date, 1) OVER(PARTITION BY user_id ORDER BY login_date)) AS day_gap
FROM login_records
''').head(10)
筛选连续N天满足条件的数据
例如:找出连续3天以上登录值都大于50的用户记录
# 步骤1:先筛选满足条件的记录并编号
execute_query('''
SELECT *,
ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY login_date) AS rn,
DATE_SUB(login_date, INTERVAL ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY login_date) DAY) AS date_diff
FROM login_records
WHERE login_value > 50
''')
# 步骤2:按用户和差值分组,筛选连续3天以上的记录
execute_query('''
SELECT user_id, date_diff, COUNT(1) AS consecutive_days
FROM (
SELECT *,
DATE_SUB(login_date, INTERVAL rn DAY) AS date_diff
FROM (
SELECT *,
ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY login_date) AS rn
FROM login_records
WHERE login_value > 50
) t1
) t2
GROUP BY user_id, date_diff
HAVING COUNT(1) >= 3
''')
总结
本文介绍了MySQL的多种高级操作技巧,包括:
- CASE WHEN条件表达式实现数据映射和透视
- 窗口函数(ROW_NUMBER、DENSE_RANK、RANK)进行排名计算
- OVER()子句实现累积计算
- LEAD/LAG函数计算上下行差值
- 连续登录问题的经典解法(日期排名差值法)
这些技巧在实际数据分析、报表生成和业务开发中都有广泛应用,熟练掌握能够大幅提升SQL查询效率。