基于Selenium的股票与课程数据爬取方案
概述
本文介绍两种基于Selenium框架结合MySQL数据库的网页数据爬取方案,分别用于获取股票市场行情数据和在线课程资源信息。
方案一:A股市场数据采集系统
需求分析
需要采集沪深A股、上证A股、深证A股三个板块的股票数据,包括股票代码、名称、最新价、涨跌幅、涨跌额、成交量、成交额、振幅、最高价、最低价、开盘价、昨收价等字段。
技术架构
- Selenium WebDriver:自动化浏览器操作
- pymysql:Python数据库连接
- XPath/CSS Selector:页面元素定位
- MySQL 8.0:数据持久化存储
实现代码
import pymysql
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
# 数据库连接配置
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'stock_market',
'port': 3306,
'charset': 'utf8mb4'
}
# 板块映射配置
MARKET_SECTORS = [
("沪深A股", "#hs_a_board"),
("上证A股", "#sh_a_board"),
("深证A股", "#sz_a_board")
]
PAGINATION_LIMIT = 3
def initialize_browser():
"""初始化Chrome浏览器实例"""
chrome_options = Options()
# 隐藏自动化控制特征
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
driver = webdriver.Chrome(options=chrome_options)
driver.maximize_window()
return driver
def setup_database(connection):
"""初始化数据库表结构"""
cursor = connection.cursor()
# 删除已存在的表
cursor.execute("DROP TABLE IF EXISTS stock_info")
# 创建股票信息表
create_table_sql = '''
CREATE TABLE stock_info (
id INT AUTO_INCREMENT PRIMARY KEY,
sector_name VARCHAR(50),
security_code VARCHAR(20),
security_name VARCHAR(50),
current_price VARCHAR(20),
price_change VARCHAR(20),
change_value VARCHAR(20),
trading_volume VARCHAR(50),
trading_amount VARCHAR(50),
price_amplitude VARCHAR(20),
highest_price VARCHAR(20),
lowest_price VARCHAR(20),
opening_price VARCHAR(20),
previous_close VARCHAR(20)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
'''
cursor.execute(create_table_sql)
connection.commit()
return cursor
def extract_table_data(driver, sector_name):
"""从页面表格中提取股票数据"""
stock_rows = driver.find_elements(By.XPATH, "//table//tbody/tr")
extracted_data = []
for row in stock_rows:
cells = row.find_elements(By.TAG_NAME, "td")
# 过滤无效行
if len(cells) < 10:
continue
code = cells[1].text
if not code:
continue
# 构建数据记录
record = (
sector_name,
code,
cells[2].text, # 股票名称
cells[4].text, # 最新价
cells[5].text, # 涨跌幅
cells[6].text, # 涨跌额
cells[7].text, # 成交量
cells[8].text, # 成交额
cells[9].text, # 振幅
cells[10].text, # 最高价
cells[11].text, # 最低价
cells[12].text, # 开盘价
cells[13].text # 昨收价
)
extracted_data.append(record)
return extracted_data
def navigate_to_page(driver, page_number):
"""翻页操作"""
try:
next_button = driver.find_element(By.CSS_SELECTOR, "a[title='下一页']")
driver.execute_script("arguments[0].click();", next_button)
time.sleep(5)
return True
except Exception as e:
print(f"翻页失败: {e}")
return False
def save_to_database(cursor, connection, data):
"""批量保存数据到数据库"""
if not data:
return False
insert_sql = '''
INSERT INTO stock_info (
sector_name, security_code, security_name, current_price,
price_change, change_value, trading_volume, trading_amount,
price_amplitude, highest_price, lowest_price, opening_price, previous_close
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
'''
cursor.executemany(insert_sql, data)
connection.commit()
return True
def crawl_sector_data(driver, cursor, connection, sector_info):
"""爬取单个板块数据"""
sector_name, sector_selector = sector_info
target_url = f"http://quote.eastmoney.com/center/gridlist.html{sector_selector}"
print(f"\n正在采集: {sector_name}")
print(f"目标URL: {target_url}")
driver.get(target_url)
driver.refresh()
time.sleep(2)
for page in range(1, PAGINATION_LIMIT + 1):
print(f" 处理第 {page} 页...")
# 提取当前页数据
page_data = extract_table_data(driver, sector_name)
if page_data:
save_to_database(cursor, connection, page_data)
print(f" 第 {page} 页完成,保存 {len(page_data)} 条记录")
else:
print(" 未获取到数据")
# 翻页处理
if page < PAGINATION_LIMIT:
if not navigate_to_page(driver, page + 1):
break
time.sleep(2)
def main():
"""主执行流程"""
print("=" * 50)
print("股票数据采集系统启动")
print("=" * 50)
# 连接数据库
print("\n[1/4] 连接MySQL数据库...")
try:
db_connection = pymysql.connect(**DB_CONFIG)
cursor = setup_database(db_connection)
print("数据库初始化完成")
except Exception as e:
print(f"数据库连接失败: {e}")
return
# 初始化浏览器
print("\n[2/4] 初始化浏览器...")
browser = initialize_browser()
try:
# 遍历各板块进行数据采集
print("\n[3/4] 开始数据采集...")
for sector in MARKET_SECTORS:
crawl_sector_data(browser, cursor, db_connection, sector)
print("\n[4/4] 采集完成")
finally:
browser.quit()
cursor.close()
db_connection.close()
print("\n资源释放完成,程序结束")
if __name__ == "__main__":
main()
关键技术点
元素定位策略:
- 使用XPath定位表格行:
//table//tbody/tr - 通过CSS选择器定位翻页按钮:
a[title='下一页'] - 使用JavaScript执行点击避免广告拦截问题
反检测机制:
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_argument("--disable-blink-features=AutomationControlled")
方案二:在线课程资源采集系统
需求分析
从慕课平台采集课程资源信息,包括课程编号、课程名称、开课院校、主讲教师、课程团队、选课人数、学习进度、课程简介等数据。
实现代码
import pymysql
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
class CourseCrawler:
"""课程数据采集器"""
def __init__(self):
self.db_config = {
'host': 'localhost',
'user': 'root',
'password': 'root',
'database': 'online_courses',
'charset': 'utf8mb4'
}
self.target_url = 'https://www.icourse163.org/home.htm?userId=xxxxxx#/home/course'
def init_database(self):
"""初始化数据库连接和表结构"""
self.conn = pymysql.connect(**self.db_config)
self.cursor = self.conn.cursor()
# 删除并重建课程表
self.cursor.execute("DROP TABLE IF EXISTS course_info")
create_sql = '''
CREATE TABLE course_info (
id INT AUTO_INCREMENT PRIMARY KEY,
course_name VARCHAR(255),
institution VARCHAR(255),
instructor VARCHAR(255),
enrollment VARCHAR(100),
progress VARCHAR(100),
description TEXT,
course_url VARCHAR(500)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
'''
self.cursor.execute(create_sql)
self.conn.commit()
print("数据库初始化完成")
def init_browser(self):
"""初始化浏览器"""
options = webdriver.ChromeOptions()
options.add_argument('--disable-blink-features=AutomationControlled')
self.driver = webdriver.Chrome(options=options)
self.driver.maximize_window()
def collect_course_links(self):
"""收集课程详情页链接"""
all_anchors = self.driver.find_elements(By.TAG_NAME, 'a')
url_collection = []
for anchor in all_anchors:
href = anchor.get_attribute('href')
if href and '/course/' in href and 'search.htm' not in href:
if href not in url_collection:
url_collection.append(href)
print(f"共发现 {len(url_collection)} 个课程链接")
return url_collection
def extract_course_details(self, course_url):
"""提取单个课程的详细信息"""
self.driver.get(course_url)
time.sleep(3)
# 初始化默认数据
course_data = {
'name': '未找到',
'school': '未找到',
'teacher': '未找到',
'enrollment': '0',
'progress': '未找到',
'description': '无简介'
}
# 提取学校信息
school_xpath = '/html/body/div[5]/div[2]/div[2]/div[2]/div[2]/div[2]/div[2]/div/a/img'
school_elements = self.driver.find_elements(By.XPATH, school_xpath)
if school_elements:
school = school_elements[0].get_attribute('alt')
course_data['school'] = school if school else school_elements[0].get_attribute('title')
else:
backup_xpath = '//*[@id="j-teacher"]/div/a'
backup_elements = self.driver.find_elements(By.XPATH, backup_xpath)
if backup_elements:
course_data['school'] = backup_elements[0].text
# 提取课程名称
name_xpath = '/html/body/div[5]/div[2]/div[1]/div/div/div/div[2]/div[2]/div/div[2]/div[1]/span[1]'
name_elements = self.driver.find_elements(By.XPATH, name_xpath)
if name_elements:
course_data['name'] = name_elements[0].text
# 提取教师信息
teacher_xpath = '/html/body/div[5]/div[2]/div[2]/div[2]/div[2]/div[2]/div[2]/div/div/div[2]/div/div/div[1]/div/div/h3'
teacher_elements = self.driver.find_elements(By.XPATH, teacher_xpath)
if teacher_elements:
course_data['teacher'] = teacher_elements[0].text
# 提取课程简介
desc_xpath = '/html/body/div[5]/div[2]/div[2]/div[2]/div[1]/div[1]/div[2]/div[2]/div[1]'
desc_elements = self.driver.find_elements(By.XPATH, desc_xpath)
if desc_elements:
text = desc_elements[0].text
course_data['description'] = text[:200] + "..." if len(text) > 200 else text
# 提取选课人数
count_xpath = '/html/body/div[5]/div[2]/div[1]/div/div/div/div[2]/div[2]/div/div[3]/div/div[1]/div[4]/span[2]'
count_elements = self.driver.find_elements(By.XPATH, count_xpath)
if count_elements:
course_data['enrollment'] = count_elements[0].text
# 提取学习进度
progress_xpath = '/html/body/div[5]/div[2]/div[1]/div/div/div/div[2]/div[2]/div/div[3]/div/div[1]/div[4]/span[1]'
progress_elements = self.driver.find_elements(By.XPATH, progress_xpath)
if progress_elements:
course_data['progress'] = progress_elements[0].text
return course_data
def save_course_record(self, course_info, url):
"""保存课程记录到数据库"""
insert_sql = '''
INSERT INTO course_info (
course_name, instructor, institution, enrollment,
progress, description, course_url
) VALUES (%s, %s, %s, %s, %s, %s, %s)
'''
self.cursor.execute(insert_sql, (
course_info['name'],
course_info['teacher'],
course_info['school'],
course_info['enrollment'],
course_info['progress'],
course_info['description'],
url
))
self.conn.commit()
def run(self):
"""执行采集任务"""
print("系统启动...")
# 初始化数据库
self.init_database()
# 初始化浏览器
self.init_browser()
# 打开目标页面
print(f"访问目标页面: {self.target_url}")
self.driver.get(self.target_url)
# 等待用户扫码登录
print("等待登录操作...")
time.sleep(10)
# 采集课程链接
print("开始采集课程链接...")
course_urls = self.collect_course_links()
# 遍历各课程页面获取详情
print(f"\n开始采集课程详情...")
for idx, url in enumerate(course_urls, 1):
print(f"处理第 {idx} 个课程: {url}")
course_data = self.extract_course_details(url)
self.save_course_record(course_data, url)
print(f" 保存成功: {course_data['name']}")
# 清理资源
self.driver.quit()
self.cursor.close()
self.conn.close()
print("\n采集任务完成")
if __name__ == "__main__":
crawler = CourseCrawler()
crawler.run()
实现要点
URL收集策略:
- 遍历页面所有a标签
- 筛选包含
/course/的链接 - 排除搜索页面链接
- 使用集合去重
XPath定位技巧:
- 使用绝对路径定位稳定元素
- 准备备用路径方案提高容错性
- 处理简介文本过长时进行截断
方案三:Flume日志采集配置
实验环境
- Flume客户端安装与配置
- Kafka消息队列集成
- Python测试数据生成脚本
配置流程
步骤1:Python脚本生成测试数据
使用Python编写数据生成脚本,模拟业务日志数据输出。
步骤2:Kafka环境配置
- 创建Kafka主题用于接收日志数据
- 配置生产者与消费者参数
步骤3:Flume Agent配置
配置Flume采集端,包括Source、Channel、Sink三个核心组件:
- Source:配置日志文件来源或网络端口
- Channel:配置内存或文件传输通道
- Sink:配置Kafka作为数据输出目标
步骤4:启动验证
依次启动Kafka、Flume Agent,观察数据流转情况。
实验总结
通过本实验,掌握了Flume日志采集工具的基本使用方法,理解了Source-Channel-Sink组件模型的工作原理,并完成了与Kafka的集成配置。
总结
以上三种方案分别展示了网页数据爬取和日志采集的典型应用场景。关键技术要点包括:
- Selenium反检测配置确保自动化脚本稳定运行
- XPath与CSS选择器灵活定位页面元素
- 数据库表结构设计合理规划数据存储
- 翻页处理与异常捕获保证数据完整性
在实际项目中,需根据目标网站的反爬策略和数据结构特点选择合适的采集方案。