当前位置:首页 > 随笔 > 正文内容

网络数据采集实践:多线程图片下载与 Scrapy 框架应用

访客 随笔 2026年6月15日 1

一、气象图片多线程爬取实验

1. 实验目标

通过单线程与多线程两种模式从气象网站下载图片,对比 IO 密集型任务的执行效率差异,掌握 requests、BeautifulSoup 和线程池(ThreadPoolExecutor)的配合使用。

2. 实现方案

2.1 整体流程

  • 页面获取:通过 requests 请求目标地址 HTML。
  • 链接提取:BeautifulSoup 解析所有 <img> 标签,获取 src 属性。
  • 路径补全:缺失协议或域名的相对路径用 urljoin 拼合成完整 URL。
  • 格式过滤:仅保留 .jpg、.png 等图片后缀。
  • 文件保存:创建本地目录,使用分块流式写入,文件名经正则清理非法字符。
  • 并发控制:单线程串行循环;多线程通过 ThreadPoolExecutor 调度。

2.2 单线程实现

按顺序逐个调用下载任务,完成一个后再执行下一个。

def single_thread_crawl():
    create_image_dir()
    content = get_page_content(SITE_URL)
    urls = extract_image_urls(SITE_URL, content)
    if not urls:
        print("无图片可下载")
        return
    for url in urls:
        download_image(url)

2.3 多线程实现

利用 ThreadPoolExecutor 创建固定线程池(默认 5 个),通过 map 批量提交任务。

def multi_thread_crawl(workers=5):
    create_image_dir()
    content = get_page_content(SITE_URL)
    urls = extract_image_urls(SITE_URL, content)
    if not urls:
        print("无图片可下载")
        return
    with ThreadPoolExecutor(max_workers=workers) as executor:
        executor.map(download_image, urls)

3. 结果分析

实验验证了多线程在大量网络 IO 操作中的优势。单线程适用于少量链接的简单场景,多线程则显著提升批量下载效率。实际开发中需根据任务类型和服务器负载选择合适的并发策略。


二、东方财富股票数据定向爬虫

1. 实验目标

掌握 Scrapy 框架的 Item、Pipeline 以及 MySQL 存储的完整流程,从东方财富网站通过动态 API 获取股票列表,提取代码、名称、最新价、涨跌幅等字段,完成数据清洗与入库。

2. 实现方案

2.1 技术路线

  • API 定位:分析页面网络请求,找到返回 JSONP 格式的股票数据接口。
  • 数据清洗:去除函数包裹体,提取纯 JSON 字符串;将占位符 "-" 替换为 0.0。
  • 单位换算:价格分转元、成交量股转万股、成交额元转亿元。
  • 持久化:参数化 SQL 写入 MySQL,支持主键重复更新。

2.2 数据模型定义(items.py)

class StockItem(scrapy.Item):
    bStockNo = scrapy.Field()       # 股票代码 f12
    bStockName = scrapy.Field()     # 股票名称 f14
    latestPrice = scrapy.Field()    # 最新价 f2
    changePercent = scrapy.Field()  # 涨跌幅 f3
    changeAmount = scrapy.Field()   # 涨跌额 f4
    volume = scrapy.Field()         # 成交量 f5 (万股)
    turnover = scrapy.Field()       # 成交额 f6 (亿元)
    amplitude = scrapy.Field()      # 振幅 f7
    high = scrapy.Field()           # 最高 f8
    low = scrapy.Field()            # 最低 f9
    openPrice = scrapy.Field()      # 今开 f10
    prevClose = scrapy.Field()      # 昨收 f11

2.3 Pipeline 数据写入

def process_item(self, item, spider):
    sql = """INSERT INTO stocks (
        bStockNo,bStockName,latestPrice,changePercent,
        changeAmount,volume,turnover,amplitude,
        high,low,openPrice,prevClose
    ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
    ON DUPLICATE KEY UPDATE
        latestPrice=VALUES(latestPrice),
        changePercent=VALUES(changePercent),
        changeAmount=VALUES(changeAmount),
        volume=VALUES(volume),
        turnover=VALUES(turnover),
        amplitude=VALUES(amplitude),
        high=VALUES(high),
        low=VALUES(low),
        openPrice=VALUES(openPrice),
        prevClose=VALUES(prevClose);"""
    data = (
        item['bStockNo'], item['bStockName'], item['latestPrice'],
        item['changePercent'], item['changeAmount'], item['volume'],
        item['turnover'], item['amplitude'], item['high'],
        item['low'], item['openPrice'], item['prevClose']
    )
    try:
        self.cursor.execute(sql, data)
        self.conn.commit()
    except pymysql.Error as e:
        self.conn.rollback()
        spider.logger.error(f"插入失败 {item['bStockNo']}: {e}")
    return item

2.4 爬虫核心逻辑

循环构造多页 API 请求,解析 JSONP 切片后清洗并转换单位,产出 StockItem。

def parse(self, response):
    json_str = response.text[response.text.find('{'):response.text.rfind('}')+1]
    data = json.loads(json_str)
    stocks = data.get('data',{}).get('diff')
    if not stocks:
        return
    for s in stocks:
        for k in s:
            if s[k] == "-" or s[k] is None:
                s[k] = 0.0
        item = StockItem()
        item['bStockNo'] = str(s.get('f12',''))
        item['bStockName'] = s.get('f14','')
        item['latestPrice'] = float(s['f2'])/100
        item['changePercent'] = float(s['f3'])
        item['changeAmount'] = float(s['f4'])/100
        item['volume'] = float(s['f5'])/10000
        item['turnover'] = float(s['f6'])/100000000
        item['amplitude'] = float(s['f7'])
        item['high'] = float(s['f8'])/100
        item['low'] = float(s['f9'])/100
        item['openPrice'] = float(s['f10'])/100
        item['prevClose'] = float(s['f11'])/100
        yield item

3. 实验结果

数据成功写入 MySQL 数据库,验证了 API 解析、数据清洗及全流程 Pipeline 的正确性。


三、中国银行外汇牌价爬取

1. 实验目标

使用 Scrapy + Xpath 解析 HTML + SQLite 存储,全量爬取中国银行官网外汇牌价,解决表格嵌套、空白字符、自动翻页等实际问题。

2. 实现方案

2.1 技术要点

  • 表格定位:基于包含"货币名称"的表头,使用 ancestor::table[1] 向上查找目标表格。
  • 数据清洗:normalize-space() 去除单元格内的多余空格和换行。
  • 翻页循环:提取"下一页"链接的 href,通过 urljoin 构造绝对路径,递归回调 parse。
  • 空值处理:价格字段若为空字符串或 None,在 Pipeline 中转为 0.0 再入库。

2.2 爬虫代码(boc_spider.py)

def parse(self, response):
    table = response.xpath("//th[contains(normalize-space(.), '货币名称')]/ancestor::table[1]")
    if not table:
        self.logger.error("未找到外汇牌价表格")
        return
    rows = table.xpath(".//tr[position()>1]")
    for row in rows:
        item = BocScraperItem()
        item['currency'] = row.xpath("normalize-space(.//td[1])").get()
        item['tbp'] = row.xpath("normalize-space(.//td[2])").get()
        item['cbp'] = row.xpath("normalize-space(.//td[3])").get()
        item['tsp'] = row.xpath("normalize-space(.//td[4])").get()
        item['csp'] = row.xpath("normalize-space(.//td[5])").get()
        item['time'] = row.xpath("normalize-space(.//td[8])").get()
        yield item
    next_url = response.xpath("//a[contains(normalize-space(.), '下一页')]/@href").get()
    if next_url:
        yield scrapy.Request(response.urljoin(next_url), callback=self.parse)

2.3 Pipeline SQLite 存储

def process_item(self, item, spider):
    tbp = float(item.get('tbp') or 0.0)
    cbp = float(item.get('cbp') or 0.0)
    tsp = float(item.get('tsp') or 0.0)
    csp = float(item.get('csp') or 0.0)
    sql = "INSERT INTO exchange_rates (currency,tbp,cbp,tsp,csp,time) VALUES (?,?,?,?,?,?)"
    data = (item['currency'], tbp, cbp, tsp, csp, item.get('time',''))
    try:
        self.cursor.execute(sql, data)
        self.conn.commit()
    except (ValueError, sqlite3.Error) as e:
        self.conn.rollback()
        raise DropItem(f"数据无效:{item}")
    return item

3. 实验结果

运行 scrapy crawl boc 后,成功爬取美元、欧元等 30+ 货币的全部分页牌价,数据准确落入 SQLite 数据库。

4. 心得体会

本组实验覆盖了从简单 HTML 分析到动态 API 解析、从文件下载到关系型数据库写入的多种场景。多线程技术有效提升了 IO 密集型任务速度,Scrapy 框架的组件化设计使代码结构清晰易扩展。针对动态接口和复杂 HTML 页面,合理利用 Xpath 函数和 JSON 解析技巧是保证数据质量的关键。后续可在此基础上增加反爬策略、增量更新和异常重试机制。

标签: 爬虫

相关文章

可以按小时收费的VPS

很多 VPS 提供商都支持 按小时计费(hourly billing),想短期试用 / 临时搭建节点、测试网络、短期项目等场景非常合适。下面是当前最主流且靠谱的按小时 VPS 选项,分别按不同需求场景整理: 1. Vultr(全球节点,包括日本) 按小时计费 可选机房:东京 / 大阪 / 洛杉矶 / 法兰克福 / 伦敦 … 支持 PayPal(部分情况),但更常用信用卡/PayPal+卡价格参考$...

在 iPhone 上下载国外App

地区/国家限制App Store 会根据 Apple ID 的国家或地区限制应用下载。如果你的 Apple ID 绑定的是中国大陆,就可能无法下载 OpenAI 官方的 ChatGPT 应用,因为它在大陆 App Store 不上架。解决办法:换成美国、加拿大、香港等地区的 Apple ID。或者在现有 Apple ID 上更改地区。注册一个国外 Apple ID(推荐)比如注册 美国区 Appl...

Node.js 中的异步编程:回调与 Promise

Node.js 是一个基于 JavaScript 构建的单线程、非阻塞运行环境,它通过异步编程机制来高效处理多个操作。在执行如文件读取、API 请求或数据库查询等任务时,Node.js 不会等待这些操作完成,而是使用回调函数和 Promise 来避免阻塞主线程。 回调方式实现异步 那么当异步操作完成后,Node.js 如何知道接下来要做什么呢?这就要用到 回调函数(callback)。 回调本质上...

Selenium自动化测试入门指南

Selenium自动化测试入门指南

什么是自动化测试? 自动化测试是指利用软件工具自动执行测试用例,模拟用户操作,如打开网页、点击链接、输入文本等,并验证结果是否符合预期。 其主要优点包括: 大幅减少人工成本 测试速度快 可以在非工作时间运行 支持持续集成和交付 然而,它也存在一些局限性,例如开发成本较高、不适合快速变化的项目、依赖稳定的UI界面等。 自动化测试的应用条件 适合引入自动化测试的情况包括: 手动测试耗时且需要大量...

MariaDB Galera集群故障快速恢复指南

OpenStack控制节点采用三节点MariaDB Galera集群架构。当数据库集群因故障重启时,有时会出现Galera集群无法正常启动的问题。虽然有多种方法可以恢复数据库服务,但如何实现快速启动同时确保数据完整性呢? 通过分析日志发现,MariaDB Galera集群节点宕机时会在日志中输出以下信息: [Note] WSREP: 新集群视图:全局状态: 874d8e7e-5980-11e8-8...

Android 中 EventBus 的通信机制与实现原理深度解析

EventBus 核心设计思想 EventBus 是一个基于观察者模式的事件总线框架,广泛应用于 Android 平台以实现组件解耦。它通过中心化的消息分发机制,使不同层级、不同线程的对象能够以"发布-订阅"方式通信,避免了传统接口回调或广播带来的强依赖问题。 核心角色说明 事件(Event):任意 Java 对象,作为数据载体,如网络状态变更通知、用户登录信息等。 发布者(Publi...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。