Flink批处理数据源构建指南
Apache Flink的批处理场景下,DataSet API提供了丰富的数据接入方式,主要分为内存集合与外部文件系统两大类渠道。
内存集合数据源
通过ExecutionEnvironment的工厂方法可直接将内存中的集合转化为分布式数据集,常用入口包括:
fromElements:接收可变参数列表,适用于元素数量明确的场景fromCollection:接收Java/Scala集合类型,适配性更广generateSequence:生成连续数值序列
以下示例展示了多种Scala集合类型的接入方式:
import org.apache.flink.api.scala._
import scala.collection.mutable
import scala.collection.immutable.{Queue, Stack}
object InMemorySources {
def main(args: Array[String]): Unit = {
val ctx = ExecutionEnvironment.getExecutionEnvironment
// 基础元素
val words = ctx.fromElements("flink", "spark")
words.print()
// 元组形式
val pairs = ctx.fromElements((1001, "kafka"), (1002, "pulsar"))
pairs.print()
// 各类集合容器
ctx.fromCollection(Array("a", "b")).print()
ctx.fromCollection(List("a", "b")).print()
ctx.fromCollection(Vector("a", "b")).print()
ctx.fromCollection(Queue("a", "b")).print()
ctx.fromCollection(Stack("a", "b")).print()
ctx.fromCollection(Stream("a", "b")).print()
ctx.fromCollection(Set("a", "b")).print()
ctx.fromCollection(Iterable("a", "b")).print()
// 可变集合
ctx.fromCollection(mutable.ArrayBuffer("a", "b")).print()
ctx.fromCollection(mutable.ArraySeq("a", "b")).print()
ctx.fromCollection(mutable.ArrayStack("a", "b")).print()
// 映射与区间
ctx.fromCollection(Map(1 -> "yes", 0 -> "no")).print()
ctx.fromCollection(Range(1, 10)).print()
// 数值序列生成
ctx.generateSequence(1L, 100L).print()
}
}
文件系统数据源
本地文本文件
通过readTextFile读取本地路径,配合转换算子完成词频统计:
val env = ExecutionEnvironment.getExecutionEnvironment
val localLines = env.readTextFile("/tmp/input.txt")
val wordCounts = localLines
.flatMap(_.split("\\W+"))
.map((_, 1))
.groupBy(0)
.reduce((l, r) => (l._1, l._2 + r._2))
wordCounts.print()
HDFS分布式文件
指定HDFS的NameNode地址与文件路径即可透明访问:
val hdfsLines = env.readTextFile("hdfs://nn-host:8020/logs/app.log")
val tokenized = hdfsLines.flatMap(line => line.split("\\s+"))
val mapped = tokenized.map(w => (w, 1))
val aggregated = mapped.groupBy(0).sum(1)
aggregated.print()
结构化CSV数据
使用readCsvFile时需声明目标元组类型,并通过参数控制解析行为:
val csvPath = "hdfs://warehouse/orders.csv"
val orderRecords = env.readCsvFile[(String, String, Double, Int, Long)](
filePath = csvPath,
lineDelimiter = "\n",
fieldDelimiter = ",",
ignoreFirstLine = true,
lenient = false,
includedFields = Array(0, 1, 3, 5, 7)
)
val topItems = orderRecords
.groupBy(1)
.first(20)
topItems.print()
递归目录遍历
默认情况下Flink仅读取指定路径下的直接文件。如需递归处理嵌套子目录,需显式开启枚举参数:
import org.apache.flink.configuration.Configuration
val cfg = new Configuration()
cfg.setBoolean("recursive.file.enumeration", true)
val nestedFiles = env
.readTextFile("file:///data/archives/2024")
.withParameters(cfg)
nestedFiles.print()
压缩文件自动解压
Flink内置对常见压缩格式的自动识别与解压,包括gzip、deflate、bzip2、xz等。使用时直接指向压缩文件路径即可:
val compressed = env.readTextFile("file:///data/logs/server.log.gz")
compressed.count()
需注意压缩文件通常无法并行切分,可能退化为顺序读取,大规模场景下建议优先选用非压缩格式或支持分片的压缩方案(如Bzip2)。