当前位置:首页 > 随笔 > 正文内容

基于PySpark与Echarts的二手房数据全链路可视化系统

访客 随笔 2026年6月9日 1

项目背景与核心目标

在房地产市场日益数据化的今天,如何高效处理海量房源信息并以直观方式呈现分析结果,成为决策支持的关键。本项目围绕广西地区二手房挂牌数据,构建了一套完整的数据处理与可视化流程,涵盖从原始数据采集、分布式清洗统计,到数据库持久化、后端API服务及前端大屏展示的全流程。

系统主要实现以下目标:

  • 利用PySpark对大规模非结构化/半结构化数据进行高性能清洗与聚合分析;
  • 将统计结果写入MySQL供后续查询调用;
  • 通过Django框架提供REST风格接口,动态生成可视化配置数据;
  • 前端采用Echarts生态实现多维度图表渲染,包括3D地理分布图、词云、柱状图、折线趋势等;
  • 支持深色主题、弹性布局、卡片式UI设计,适配大屏展示场景。

技术架构与环境准备

系统采用分层架构设计,各模块职责明确,便于维护和扩展。

技术栈选型

组件用途
Python 3.8+核心开发语言
PySpark 3.x分布式数据处理引擎
MySQL 8.0结构化结果存储
Django 4.xWeb服务与API网关
Pyecharts 2.0图表Option生成工具
Echarts 5.4 + GL前端可视化渲染库
HDFS原始数据集中存储

依赖安装

# Python环境依赖
pip install pyspark django pymysql pyecharts jieba

# 前端CDN资源引入(HTML中)
<script src="https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/echarts-gl@2.0.9/dist/echarts-gl.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/echarts-wordcloud@2.0.0/dist/echarts-wordcloud.min.js"></script>

运行环境要求

  • JDK 1.8+ 并配置 JAVA_HOME
  • Spark集群或本地模式部署完成
  • MySQL创建名为 ershoufang 的数据库
  • Hadoop HDFS 启动并可访问指定路径

数据来源与预处理

原始数据由自研爬虫程序从主流房产平台抓取,覆盖南宁、柳州、桂林等广西主要城市,字段包含户型、总价、均价、建造年份、朝向、标题关键词等。

数据获取方式

爬虫代码托管于GitHub:

https://github.com/weijiayi-1/anjuke-crawler

采集过程使用requests结合Selenium模拟浏览器行为,应对反爬机制,并将结果保存为CSV格式上传至HDFS:

hdfs dfs -put /local/data/ershoufang_cleaned.csv /data/

字段说明

字段名描述示例值
city所在城市南宁
huxing房屋格局三室两厅
total_price总售价(万元)135
unit_price单价(元/㎡)9200
build_year建筑年份2010
orientation房屋朝向
title_desc房源描述标题学区房 精装修 满五唯一

Spark读取HDFS数据

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SecondHandHouseAnalysis") \
    .getOrCreate()

# 从HDFS加载已清洗的数据
raw_df = spark.read.option("header", "true").csv("hdfs:///data/ershoufang_cleaned.csv")
cleaned_df = raw_df.dropna()  # 移除空值记录

PySpark数据清洗与统计计算

多个独立脚本分别执行不同维度的统计任务,最终结果统一写入MySQL。

初始化Spark会话

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

conf = SparkSession.builder \
    .master("spark://192.168.126.10:7077") \
    .config("spark.driver.host", "192.168.126.1") \
    .appName("CityHouseStats") \
    .getOrCreate()

统计任务实现

城市房源数量分布

city_stats = cleaned_df.groupBy("city") \
    .agg(F.count("*").alias("listing_count")) \
    .orderBy(F.desc("listing_count"))

按方位与城市计算平均单价

location_price = cleaned_df.filter(cleaned_df.unit_price.isNotNull()) \
    .groupBy("orientation", "city") \
    .agg(F.round(F.avg("unit_price"), 2).alias("avg_unit_price")) \
    .orderBy("orientation", "city")

热门户型TOP5合并其余为"其他"

top_layouts = cleaned_df.groupBy("huxing") \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(5)

other_count = cleaned_df.subtract(top_layouts.rdd.map(lambda r: r.huxing).collect()) \
    .count()

# 构造"其他"类别并合并
other_row = [(F.lit("其他户型"), other_count)]
other_df = spark.createDataFrame(other_row, ["huxing", "count"])
final_layout_dist = top_layouts.unionByName(other_df)

标题关键词提取与频率统计

import jieba
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import ArrayType, StringType

# 中文分词UDF
def segment(text):
    if not text:
        return []
    stopwords = {"的", "了", "和", "是", "有"}
    return [word for word in jieba.lcut(text) if len(word) > 1 and word not in stopwords]

segment_udf = udf(segment, ArrayType(StringType()))

# 分词后展开统计频次
words_df = cleaned_df.select(explode(segment_udf(F.col("title_desc"))).alias("keyword"))
keyword_freq = words_df.groupBy("keyword") \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(50)

按建造年代区间分析均价趋势

from pyspark.sql.window import Window

yearly_avg = cleaned_df.groupBy("build_year") \
    .agg(F.round(F.avg("unit_price"), 2).alias("mean_price"))

# 添加行号筛选特定时间段(如第35-45条)
window_spec = Window.orderBy("build_year")
ranked_data = yearly_avg.withColumn("row_index", F.row_number().over(window_spec))
subset_data = ranked_data.filter((F.col("row_index") >= 35) & (F.col("row_index") <= 45)) \
    .drop("row_index")

结果写入MySQL

def save_to_mysql(df, table_name):
    jdbc_url = "jdbc:mysql://192.168.126.10:3306/ershoufang"
    props = {
        "user": "root",
        "password": "123456",
        "driver": "com.mysql.cj.jdbc.Driver"
    }
    df.coalesce(1).write.jdbc(url=jdbc_url, table=table_name, mode="overwrite", properties=props)

# 示例调用
save_to_mysql(city_stats, "city_listing_summary")

Django后端服务开发

后端负责连接数据库、生成图表配置项并通过HTTP接口对外提供数据。

项目结构

visual_screen/
├── charts/
│   ├── views.py
│   ├── urls.py
│   └── templates/charts/chart.html
└── settings.py

数据库查询与图表生成

import pymysql
from pyecharts.charts import Bar, Pie, WordCloud, Map3D
from pyecharts import options as opts

def fetch_city_counts():
    conn = pymysql.connect(
        host='192.168.126.10',
        user='root', password='123456',
        database='ershoufang', charset='utf8mb4'
    )
    with conn.cursor() as cur:
        cur.execute("SELECT city, listing_count FROM city_listing_summary ORDER BY listing_count DESC")
        rows = cur.fetchall()
    conn.close()
    return [r[0] for r in rows], [int(r[1]) for r in rows]

def build_bar_chart():
    cities, counts = fetch_city_counts()
    bar = (
        Bar()
        .add_xaxis(cities)
        .add_yaxis("挂牌量", counts, category_gap="50%")
        .set_global_opts(
            title_opts=opts.TitleOpts(title="各城市二手房挂牌数量"),
            tooltip_opts=opts.TooltipOpts(is_show=True),
            legend_opts=opts.LegendOpts(pos_left="center")
        )
    )
    return bar

API接口定义

from django.http import JsonResponse

def get_all_charts(request):
    try:
        bar_opt = build_bar_chart().dump_options()
        pie_opt = build_pie_chart().dump_options()
        wordcloud_opt = build_wordcloud().dump_options()
        map3d_opt = build_3d_map().dump_options()
        
        return JsonResponse({
            "status": "success",
            "data": {
                "bar": bar_opt,
                "pie": pie_opt,
                "wordcloud": wordcloud_opt,
                "map3d": map3d_opt
            }
        })
    except Exception as e:
        # 异常时返回默认选项防止前端崩溃
        return JsonResponse({"status": "error", "message": str(e)}, status=500)

前端大屏可视化实现

前端页面通过Ajax请求获取JSON格式的图表配置,并交由Echarts实例化渲染。

图表动态加载

fetch("/charts/data/")
    .then(res => res.json())
    .then(result => {
        if (result.status === "success") {
            const data = result.data;
            
            echarts.init(document.getElementById("barContainer")).setOption(JSON.parse(data.bar));
            echarts.init(document.getElementById("pieContainer")).setOption(JSON.parse(data.pie));
            echarts.init(document.getElementById("wordcloudContainer")).setOption(JSON.parse(data.wordcloud));
            initMap3D(data.map3d); // 特殊处理3D地图
        }
    });

3D地图初始化

function initMap3D(mapOpt) {
    fetch("https://geo.datav.aliyun.com/areas_v3/bound/450000_full.json")
        .then(resp => resp.json())
        .then(geoJson => {
            echarts.registerMap("Guangxi", geoJson);
            const chart = echarts.init(document.getElementById("map3dContainer"));
            chart.setOption(JSON.parse(mapOpt));
        });
}

页面布局与视觉优化

  • 采用CSS Flex布局划分左、中、右三大区域及底部栏;
  • 整体使用暗黑主题提升视觉对比度;
  • 卡片容器添加阴影与圆角边框增强层次感;
  • 主标题使用渐变字体与动画效果突出重点;
  • 所有图表容器设置固定宽高比例,确保响应一致性。
大屏效果图

常见问题排查与优化建议

词云无法显示

  • 确认已正确引入 echarts-wordcloud.min.js 插件;
  • 检查后端返回的词频数值是否过大,建议做归一化处理;
  • 避免出现单个词语占比过高导致其他词汇不可见。

3D地图不渲染

  • 验证GeoJSON是否成功注册地图名称;
  • 确保DOM容器具备足够尺寸(建议最小400x300px);
  • 检查浏览器是否支持WebGL。

图表排版错乱

  • 统一设置字体大小与颜色主题;
  • 合理调整margin/padding避免重叠;
  • 使用媒体查询适配不同分辨率屏幕。

总结与未来扩展方向

本项目实现了从原始数据采集到终端可视化的完整闭环,展示了大数据分析在房地产领域的典型应用。系统具备良好的可扩展性,后续可进一步增强如下能力:

  • 接入Kafka实现实时数据流处理,支持动态刷新;
  • 增加用户交互功能,如城市筛选、价格区间过滤、图表联动;
  • 支持移动端适配,实现跨平台访问;
  • 集成机器学习模型预测房价走势。

相关文章

可以按小时收费的VPS

很多 VPS 提供商都支持 按小时计费(hourly billing),想短期试用 / 临时搭建节点、测试网络、短期项目等场景非常合适。下面是当前最主流且靠谱的按小时 VPS 选项,分别按不同需求场景整理: 1. Vultr(全球节点,包括日本) 按小时计费 可选机房:东京 / 大阪 / 洛杉矶 / 法兰克福 / 伦敦 … 支持 PayPal(部分情况),但更常用信用卡/PayPal+卡价格参考$...

在 iPhone 上下载国外App

地区/国家限制App Store 会根据 Apple ID 的国家或地区限制应用下载。如果你的 Apple ID 绑定的是中国大陆,就可能无法下载 OpenAI 官方的 ChatGPT 应用,因为它在大陆 App Store 不上架。解决办法:换成美国、加拿大、香港等地区的 Apple ID。或者在现有 Apple ID 上更改地区。注册一个国外 Apple ID(推荐)比如注册 美国区 Appl...

Node.js 中的异步编程:回调与 Promise

Node.js 是一个基于 JavaScript 构建的单线程、非阻塞运行环境,它通过异步编程机制来高效处理多个操作。在执行如文件读取、API 请求或数据库查询等任务时,Node.js 不会等待这些操作完成,而是使用回调函数和 Promise 来避免阻塞主线程。 回调方式实现异步 那么当异步操作完成后,Node.js 如何知道接下来要做什么呢?这就要用到 回调函数(callback)。 回调本质上...

Selenium自动化测试入门指南

Selenium自动化测试入门指南

什么是自动化测试? 自动化测试是指利用软件工具自动执行测试用例,模拟用户操作,如打开网页、点击链接、输入文本等,并验证结果是否符合预期。 其主要优点包括: 大幅减少人工成本 测试速度快 可以在非工作时间运行 支持持续集成和交付 然而,它也存在一些局限性,例如开发成本较高、不适合快速变化的项目、依赖稳定的UI界面等。 自动化测试的应用条件 适合引入自动化测试的情况包括: 手动测试耗时且需要大量...

MariaDB Galera集群故障快速恢复指南

OpenStack控制节点采用三节点MariaDB Galera集群架构。当数据库集群因故障重启时,有时会出现Galera集群无法正常启动的问题。虽然有多种方法可以恢复数据库服务,但如何实现快速启动同时确保数据完整性呢? 通过分析日志发现,MariaDB Galera集群节点宕机时会在日志中输出以下信息: [Note] WSREP: 新集群视图:全局状态: 874d8e7e-5980-11e8-8...

Android 中 EventBus 的通信机制与实现原理深度解析

EventBus 核心设计思想 EventBus 是一个基于观察者模式的事件总线框架,广泛应用于 Android 平台以实现组件解耦。它通过中心化的消息分发机制,使不同层级、不同线程的对象能够以"发布-订阅"方式通信,避免了传统接口回调或广播带来的强依赖问题。 核心角色说明 事件(Event):任意 Java 对象,作为数据载体,如网络状态变更通知、用户登录信息等。 发布者(Publi...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。