Spark RDD创建与转换操作精解
一、RDD的创建方式
1.1 通过集合并行化生成 适用于本地测试或小数据量场景,将内存中的列表转化为分布式数据集。
from pyspark import SparkContext
sc = SparkContext("local", "RDD Creation Demo")
# 基础创建:从列表生成RDD
data_list = [10, 20, 30, 40, 50]
rdd_from_list = sc.parallelize(data_list)
print(rdd_from_list.collect()) # [10, 20, 30, 40, 50]
# 指定分区数量(可控制并行度)
rdd_with_partition = sc.parallelize(data_list, numSlices=2)
print(rdd_with_partition.glom().collect()) # [[10, 20], [30, 40, 50]]
1.2 从外部数据源加载 支持文件系统、HDFS、S3等存储路径,是生产环境主流方式。
# 读取单个文本文件
text_rdd = sc.textFile("/input/data.txt")
# 读取目录下所有匹配文件
dir_rdd = sc.textFile("/input/logs/*.log")
# 处理CSV格式数据
csv_rdd = sc.textFile("data.csv").map(lambda line: line.strip().split(";"))
# 读取完整文件内容(返回文件名和内容元组)
whole_files = sc.wholeTextFiles("/input/small_files/*")
1.3 由已有RDD衍生 通过一系列转换操作构建新的分布式数据集。
source_rdd = sc.parallelize([1, 2, 3, 4])
transformed_rdd = source_rdd.map(lambda x: x * 2) # [2, 4, 6, 8]
1.4 特殊类型RDD创建
# 空RDD(用于条件性创建)
empty_rdd = sc.emptyRDD()
# 创建数值范围(支持步长)
range_rdd = sc.range(0, 20, 3) # [0, 3, 6, 9, 12, 15, 18]
# 构建键值对集合
kv_rdd = sc.parallelize([("x", 100), ("y", 200), ("z", 300)])
二、核心转换操作详解
2.1 基础映射与筛选
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 映射变换:每个元素执行函数
squared_rdd = rdd.map(lambda x: x ** 2) # [1, 4, 9, 16, 25]
# 过滤出偶数
even_rdd = rdd.filter(lambda x: x % 2 == 0) # [2, 4]
# 扁平化处理:将列表展开成单个序列
sentences = sc.parallelize(["I love Spark", "It's powerful"])
words = sentences.flatMap(lambda s: s.split(" ")) # ["I", "love", "Spark", "It's", "powerful"]
2.2 键值对相关操作
pairs = sc.parallelize([("apple", 5), ("banana", 3), ("apple", 2)])
# 按键聚合求和
summed = pairs.reduceByKey(lambda a, b: a + b) # [("apple", 7), ("banana", 3)]
# 分组:相同键的所有值合并为列表
grouped = pairs.groupByKey() # [("apple", [5, 2]), ("banana", [3])]
# 按键排序(升序)
sorted_pairs = pairs.sortByKey() # [("apple", 5), ("apple", 2), ("banana", 3)]
2.3 分区管理操作
original_rdd = sc.parallelize(range(100), 4) # 初始4个分区
# 重新分区(触发shuffle)
repartitioned = original_rdd.repartition(8)
# 减少分区且避免全量洗牌
coalesced = repartitioned.coalesce(3, shuffle=False)
⚠️ 区别说明:
repartition():任意调整分区数,强制重排数据。coalesce():仅减少分区数时可用,可选择是否启用shuffle。
# 自定义分区策略
def custom_key_partitioner(key):
return abs(hash(key)) % 4
key_rdd = sc.parallelize([("A", 1), ("B", 2), ("C", 3)])
partitioned = key_rdd.partitionBy(4, custom_key_partitioner)
2.4 集合运算操作
rdd_a = sc.parallelize([1, 2, 3])
rdd_b = sc.parallelize([3, 4, 5])
# 合并两个RDD(保留重复项)
merged = rdd_a.union(rdd_b) # [1, 2, 3, 3, 4, 5]
# 取交集
common = rdd_a.intersection(rdd_b) # [3]
# 去除重复元素
unique = merged.distinct() # [1, 2, 3, 4, 5]
2.5 高级组合操作
list1 = sc.parallelize(['X', 'Y'])
list2 = sc.parallelize([10, 20])
# 元素一一配对
zipped = list1.zip(list2) # [('X', 10), ('Y', 20)]
# 计算笛卡尔积
cartesian_result = list1.cartesian(list2)
# [('X', 10), ('X', 20), ('Y', 10), ('Y', 20)]