当前位置:首页 > 技术 > 正文内容

Flink批处理数据源构建指南

访客 技术 2026年6月9日 1

Apache Flink的批处理场景下,DataSet API提供了丰富的数据接入方式,主要分为内存集合与外部文件系统两大类渠道。

内存集合数据源

通过ExecutionEnvironment的工厂方法可直接将内存中的集合转化为分布式数据集,常用入口包括:

  • fromElements:接收可变参数列表,适用于元素数量明确的场景
  • fromCollection:接收Java/Scala集合类型,适配性更广
  • generateSequence:生成连续数值序列

以下示例展示了多种Scala集合类型的接入方式:

import org.apache.flink.api.scala._
import scala.collection.mutable
import scala.collection.immutable.{Queue, Stack}

object InMemorySources {
  def main(args: Array[String]): Unit = {
    val ctx = ExecutionEnvironment.getExecutionEnvironment

    // 基础元素
    val words = ctx.fromElements("flink", "spark")
    words.print()

    // 元组形式
    val pairs = ctx.fromElements((1001, "kafka"), (1002, "pulsar"))
    pairs.print()

    // 各类集合容器
    ctx.fromCollection(Array("a", "b")).print()
    ctx.fromCollection(List("a", "b")).print()
    ctx.fromCollection(Vector("a", "b")).print()
    ctx.fromCollection(Queue("a", "b")).print()
    ctx.fromCollection(Stack("a", "b")).print()
    ctx.fromCollection(Stream("a", "b")).print()
    ctx.fromCollection(Set("a", "b")).print()
    ctx.fromCollection(Iterable("a", "b")).print()
    
    // 可变集合
    ctx.fromCollection(mutable.ArrayBuffer("a", "b")).print()
    ctx.fromCollection(mutable.ArraySeq("a", "b")).print()
    ctx.fromCollection(mutable.ArrayStack("a", "b")).print()

    // 映射与区间
    ctx.fromCollection(Map(1 -> "yes", 0 -> "no")).print()
    ctx.fromCollection(Range(1, 10)).print()

    // 数值序列生成
    ctx.generateSequence(1L, 100L).print()
  }
}

文件系统数据源

本地文本文件

通过readTextFile读取本地路径,配合转换算子完成词频统计:

val env = ExecutionEnvironment.getExecutionEnvironment

val localLines = env.readTextFile("/tmp/input.txt")
val wordCounts = localLines
  .flatMap(_.split("\\W+"))
  .map((_, 1))
  .groupBy(0)
  .reduce((l, r) => (l._1, l._2 + r._2))

wordCounts.print()

HDFS分布式文件

指定HDFS的NameNode地址与文件路径即可透明访问:

val hdfsLines = env.readTextFile("hdfs://nn-host:8020/logs/app.log")
val tokenized = hdfsLines.flatMap(line => line.split("\\s+"))
val mapped = tokenized.map(w => (w, 1))
val aggregated = mapped.groupBy(0).sum(1)
aggregated.print()

结构化CSV数据

使用readCsvFile时需声明目标元组类型,并通过参数控制解析行为:

val csvPath = "hdfs://warehouse/orders.csv"

val orderRecords = env.readCsvFile[(String, String, Double, Int, Long)](
  filePath = csvPath,
  lineDelimiter = "\n",
  fieldDelimiter = ",",
  ignoreFirstLine = true,
  lenient = false,
  includedFields = Array(0, 1, 3, 5, 7)
)

val topItems = orderRecords
  .groupBy(1)
  .first(20)

topItems.print()

递归目录遍历

默认情况下Flink仅读取指定路径下的直接文件。如需递归处理嵌套子目录,需显式开启枚举参数:

import org.apache.flink.configuration.Configuration

val cfg = new Configuration()
cfg.setBoolean("recursive.file.enumeration", true)

val nestedFiles = env
  .readTextFile("file:///data/archives/2024")
  .withParameters(cfg)

nestedFiles.print()

压缩文件自动解压

Flink内置对常见压缩格式的自动识别与解压,包括gzipdeflatebzip2xz等。使用时直接指向压缩文件路径即可:

val compressed = env.readTextFile("file:///data/logs/server.log.gz")
compressed.count()

需注意压缩文件通常无法并行切分,可能退化为顺序读取,大规模场景下建议优先选用非压缩格式或支持分片的压缩方案(如Bzip2)。

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。