网络数据采集实践:多线程图片下载与 Scrapy 框架应用
一、气象图片多线程爬取实验
1. 实验目标
通过单线程与多线程两种模式从气象网站下载图片,对比 IO 密集型任务的执行效率差异,掌握 requests、BeautifulSoup 和线程池(ThreadPoolExecutor)的配合使用。
2. 实现方案
2.1 整体流程
- 页面获取:通过 requests 请求目标地址 HTML。
- 链接提取:BeautifulSoup 解析所有 <img> 标签,获取 src 属性。
- 路径补全:缺失协议或域名的相对路径用 urljoin 拼合成完整 URL。
- 格式过滤:仅保留 .jpg、.png 等图片后缀。
- 文件保存:创建本地目录,使用分块流式写入,文件名经正则清理非法字符。
- 并发控制:单线程串行循环;多线程通过 ThreadPoolExecutor 调度。
2.2 单线程实现
按顺序逐个调用下载任务,完成一个后再执行下一个。
def single_thread_crawl():
create_image_dir()
content = get_page_content(SITE_URL)
urls = extract_image_urls(SITE_URL, content)
if not urls:
print("无图片可下载")
return
for url in urls:
download_image(url)
2.3 多线程实现
利用 ThreadPoolExecutor 创建固定线程池(默认 5 个),通过 map 批量提交任务。
def multi_thread_crawl(workers=5):
create_image_dir()
content = get_page_content(SITE_URL)
urls = extract_image_urls(SITE_URL, content)
if not urls:
print("无图片可下载")
return
with ThreadPoolExecutor(max_workers=workers) as executor:
executor.map(download_image, urls)
3. 结果分析
实验验证了多线程在大量网络 IO 操作中的优势。单线程适用于少量链接的简单场景,多线程则显著提升批量下载效率。实际开发中需根据任务类型和服务器负载选择合适的并发策略。
二、东方财富股票数据定向爬虫
1. 实验目标
掌握 Scrapy 框架的 Item、Pipeline 以及 MySQL 存储的完整流程,从东方财富网站通过动态 API 获取股票列表,提取代码、名称、最新价、涨跌幅等字段,完成数据清洗与入库。
2. 实现方案
2.1 技术路线
- API 定位:分析页面网络请求,找到返回 JSONP 格式的股票数据接口。
- 数据清洗:去除函数包裹体,提取纯 JSON 字符串;将占位符 "-" 替换为 0.0。
- 单位换算:价格分转元、成交量股转万股、成交额元转亿元。
- 持久化:参数化 SQL 写入 MySQL,支持主键重复更新。
2.2 数据模型定义(items.py)
class StockItem(scrapy.Item):
bStockNo = scrapy.Field() # 股票代码 f12
bStockName = scrapy.Field() # 股票名称 f14
latestPrice = scrapy.Field() # 最新价 f2
changePercent = scrapy.Field() # 涨跌幅 f3
changeAmount = scrapy.Field() # 涨跌额 f4
volume = scrapy.Field() # 成交量 f5 (万股)
turnover = scrapy.Field() # 成交额 f6 (亿元)
amplitude = scrapy.Field() # 振幅 f7
high = scrapy.Field() # 最高 f8
low = scrapy.Field() # 最低 f9
openPrice = scrapy.Field() # 今开 f10
prevClose = scrapy.Field() # 昨收 f11
2.3 Pipeline 数据写入
def process_item(self, item, spider):
sql = """INSERT INTO stocks (
bStockNo,bStockName,latestPrice,changePercent,
changeAmount,volume,turnover,amplitude,
high,low,openPrice,prevClose
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
latestPrice=VALUES(latestPrice),
changePercent=VALUES(changePercent),
changeAmount=VALUES(changeAmount),
volume=VALUES(volume),
turnover=VALUES(turnover),
amplitude=VALUES(amplitude),
high=VALUES(high),
low=VALUES(low),
openPrice=VALUES(openPrice),
prevClose=VALUES(prevClose);"""
data = (
item['bStockNo'], item['bStockName'], item['latestPrice'],
item['changePercent'], item['changeAmount'], item['volume'],
item['turnover'], item['amplitude'], item['high'],
item['low'], item['openPrice'], item['prevClose']
)
try:
self.cursor.execute(sql, data)
self.conn.commit()
except pymysql.Error as e:
self.conn.rollback()
spider.logger.error(f"插入失败 {item['bStockNo']}: {e}")
return item
2.4 爬虫核心逻辑
循环构造多页 API 请求,解析 JSONP 切片后清洗并转换单位,产出 StockItem。
def parse(self, response):
json_str = response.text[response.text.find('{'):response.text.rfind('}')+1]
data = json.loads(json_str)
stocks = data.get('data',{}).get('diff')
if not stocks:
return
for s in stocks:
for k in s:
if s[k] == "-" or s[k] is None:
s[k] = 0.0
item = StockItem()
item['bStockNo'] = str(s.get('f12',''))
item['bStockName'] = s.get('f14','')
item['latestPrice'] = float(s['f2'])/100
item['changePercent'] = float(s['f3'])
item['changeAmount'] = float(s['f4'])/100
item['volume'] = float(s['f5'])/10000
item['turnover'] = float(s['f6'])/100000000
item['amplitude'] = float(s['f7'])
item['high'] = float(s['f8'])/100
item['low'] = float(s['f9'])/100
item['openPrice'] = float(s['f10'])/100
item['prevClose'] = float(s['f11'])/100
yield item
3. 实验结果
数据成功写入 MySQL 数据库,验证了 API 解析、数据清洗及全流程 Pipeline 的正确性。
三、中国银行外汇牌价爬取
1. 实验目标
使用 Scrapy + Xpath 解析 HTML + SQLite 存储,全量爬取中国银行官网外汇牌价,解决表格嵌套、空白字符、自动翻页等实际问题。
2. 实现方案
2.1 技术要点
- 表格定位:基于包含"货币名称"的表头,使用
ancestor::table[1]向上查找目标表格。 - 数据清洗:
normalize-space()去除单元格内的多余空格和换行。 - 翻页循环:提取"下一页"链接的 href,通过 urljoin 构造绝对路径,递归回调 parse。
- 空值处理:价格字段若为空字符串或 None,在 Pipeline 中转为 0.0 再入库。
2.2 爬虫代码(boc_spider.py)
def parse(self, response):
table = response.xpath("//th[contains(normalize-space(.), '货币名称')]/ancestor::table[1]")
if not table:
self.logger.error("未找到外汇牌价表格")
return
rows = table.xpath(".//tr[position()>1]")
for row in rows:
item = BocScraperItem()
item['currency'] = row.xpath("normalize-space(.//td[1])").get()
item['tbp'] = row.xpath("normalize-space(.//td[2])").get()
item['cbp'] = row.xpath("normalize-space(.//td[3])").get()
item['tsp'] = row.xpath("normalize-space(.//td[4])").get()
item['csp'] = row.xpath("normalize-space(.//td[5])").get()
item['time'] = row.xpath("normalize-space(.//td[8])").get()
yield item
next_url = response.xpath("//a[contains(normalize-space(.), '下一页')]/@href").get()
if next_url:
yield scrapy.Request(response.urljoin(next_url), callback=self.parse)
2.3 Pipeline SQLite 存储
def process_item(self, item, spider):
tbp = float(item.get('tbp') or 0.0)
cbp = float(item.get('cbp') or 0.0)
tsp = float(item.get('tsp') or 0.0)
csp = float(item.get('csp') or 0.0)
sql = "INSERT INTO exchange_rates (currency,tbp,cbp,tsp,csp,time) VALUES (?,?,?,?,?,?)"
data = (item['currency'], tbp, cbp, tsp, csp, item.get('time',''))
try:
self.cursor.execute(sql, data)
self.conn.commit()
except (ValueError, sqlite3.Error) as e:
self.conn.rollback()
raise DropItem(f"数据无效:{item}")
return item
3. 实验结果
运行 scrapy crawl boc 后,成功爬取美元、欧元等 30+ 货币的全部分页牌价,数据准确落入 SQLite 数据库。
4. 心得体会
本组实验覆盖了从简单 HTML 分析到动态 API 解析、从文件下载到关系型数据库写入的多种场景。多线程技术有效提升了 IO 密集型任务速度,Scrapy 框架的组件化设计使代码结构清晰易扩展。针对动态接口和复杂 HTML 页面,合理利用 Xpath 函数和 JSON 解析技巧是保证数据质量的关键。后续可在此基础上增加反爬策略、增量更新和异常重试机制。
