当前位置:首页 > 技术 > 正文内容

Spark RDD创建与转换操作精解

访客 技术 2026年6月11日 1

一、RDD的创建方式

1.1 通过集合并行化生成 适用于本地测试或小数据量场景,将内存中的列表转化为分布式数据集。

from pyspark import SparkContext

sc = SparkContext("local", "RDD Creation Demo")

# 基础创建:从列表生成RDD
data_list = [10, 20, 30, 40, 50]
rdd_from_list = sc.parallelize(data_list)

print(rdd_from_list.collect())  # [10, 20, 30, 40, 50]

# 指定分区数量(可控制并行度)
rdd_with_partition = sc.parallelize(data_list, numSlices=2)
print(rdd_with_partition.glom().collect())  # [[10, 20], [30, 40, 50]]

1.2 从外部数据源加载 支持文件系统、HDFS、S3等存储路径,是生产环境主流方式。

# 读取单个文本文件
text_rdd = sc.textFile("/input/data.txt")

# 读取目录下所有匹配文件
dir_rdd = sc.textFile("/input/logs/*.log")

# 处理CSV格式数据
csv_rdd = sc.textFile("data.csv").map(lambda line: line.strip().split(";"))

# 读取完整文件内容(返回文件名和内容元组)
whole_files = sc.wholeTextFiles("/input/small_files/*")

1.3 由已有RDD衍生 通过一系列转换操作构建新的分布式数据集。

source_rdd = sc.parallelize([1, 2, 3, 4])
transformed_rdd = source_rdd.map(lambda x: x * 2)  # [2, 4, 6, 8]

1.4 特殊类型RDD创建

# 空RDD(用于条件性创建)
empty_rdd = sc.emptyRDD()

# 创建数值范围(支持步长)
range_rdd = sc.range(0, 20, 3)  # [0, 3, 6, 9, 12, 15, 18]

# 构建键值对集合
kv_rdd = sc.parallelize([("x", 100), ("y", 200), ("z", 300)])

二、核心转换操作详解

2.1 基础映射与筛选

rdd = sc.parallelize([1, 2, 3, 4, 5])

# 映射变换:每个元素执行函数
squared_rdd = rdd.map(lambda x: x ** 2)  # [1, 4, 9, 16, 25]

# 过滤出偶数
even_rdd = rdd.filter(lambda x: x % 2 == 0)  # [2, 4]

# 扁平化处理:将列表展开成单个序列
sentences = sc.parallelize(["I love Spark", "It's powerful"])
words = sentences.flatMap(lambda s: s.split(" "))  # ["I", "love", "Spark", "It's", "powerful"]

2.2 键值对相关操作

pairs = sc.parallelize([("apple", 5), ("banana", 3), ("apple", 2)])

# 按键聚合求和
summed = pairs.reduceByKey(lambda a, b: a + b)  # [("apple", 7), ("banana", 3)]

# 分组:相同键的所有值合并为列表
grouped = pairs.groupByKey()  # [("apple", [5, 2]), ("banana", [3])]

# 按键排序(升序)
sorted_pairs = pairs.sortByKey()  # [("apple", 5), ("apple", 2), ("banana", 3)]

2.3 分区管理操作

original_rdd = sc.parallelize(range(100), 4)  # 初始4个分区

# 重新分区(触发shuffle)
repartitioned = original_rdd.repartition(8)

# 减少分区且避免全量洗牌
coalesced = repartitioned.coalesce(3, shuffle=False)

⚠️ 区别说明:

  • repartition():任意调整分区数,强制重排数据。
  • coalesce():仅减少分区数时可用,可选择是否启用shuffle。
# 自定义分区策略
def custom_key_partitioner(key):
    return abs(hash(key)) % 4

key_rdd = sc.parallelize([("A", 1), ("B", 2), ("C", 3)])
partitioned = key_rdd.partitionBy(4, custom_key_partitioner)

2.4 集合运算操作

rdd_a = sc.parallelize([1, 2, 3])
rdd_b = sc.parallelize([3, 4, 5])

# 合并两个RDD(保留重复项)
merged = rdd_a.union(rdd_b)  # [1, 2, 3, 3, 4, 5]

# 取交集
common = rdd_a.intersection(rdd_b)  # [3]

# 去除重复元素
unique = merged.distinct()  # [1, 2, 3, 4, 5]

2.5 高级组合操作

list1 = sc.parallelize(['X', 'Y'])
list2 = sc.parallelize([10, 20])

# 元素一一配对
zipped = list1.zip(list2)  # [('X', 10), ('Y', 20)]

# 计算笛卡尔积
cartesian_result = list1.cartesian(list2)  
# [('X', 10), ('X', 20), ('Y', 10), ('Y', 20)]
标签: PySpark

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。