当前位置:首页 > 技术 > 正文内容

Apache Flink 中将 Table 转换为 DataStream 或 DataSet 的方法

访客 技术 2026年6月8日 2

在 Apache Flink 中,可以通过 Table API 将 Table 实例转换为底层的 DataStream(流处理)或 DataSet(批处理)对象。这种转换允许开发者在 Table 查询结果上执行更灵活的原生数据流或数据集操作。

将 Table 转换为 DataStream

对于流式查询,Table 可以通过两种不同的模式转换为 DataStream,具体取决于查询是否包含聚合等更新操作:

1. 追加模式(Append Mode)

当查询结果仅包含插入操作,不会修改已有记录时,使用 toAppendStream 方法将每条结果作为新记录追加到流中。

2. 撤回模式(Retract Mode)

如果查询涉及聚合或可能更新先前输出的结果(例如去重、统计等),则应使用 toRetractStream。该方法生成一个二元组流 DataStream[(Boolean, T)],其中布尔值表示变更类型:true 表示 INSERT(新增或更新),false 表示 DELETE(撤销旧记录)。

基本语法示例:

val tableEnv = TableEnvironment.getTableEnvironment(streamEnv)

// 假设有一个表:Table with schema (name: String, age: Int)
val resultTable: Table = ...

// 转换为追加流(仅插入)
val appendStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)

// 转换为指定元组类型的追加流
val tupleStream: DataStream[(String, Int)] = 
  tableEnv.toAppendStream[(String, Int)](resultTable)

// 转换为撤回流,用于支持更新和删除语义
val retractStream: DataStream[(Boolean, Row)] = 
  tableEnv.toRetractStream[Row](resultTable)

完整代码示例:

case class Message(user: Long, seq: Int, text: String)

object TableToDataStreamExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 创建输入数据并转为表
    val inputList = List(
      Message(1L, 1, "Hello"),
      Message(2L, 2, "Hello"),
      Message(7L, 7, "Hello World"),
      Message(8L, 8, "Hello World"),
      Message(20L, 20, "Hello World")
    )

    val dataStream = env.fromCollection(inputList)
    val messageTable = tEnv.fromDataStream(dataStream)

    // 使用追加模式转换为 DataStream
    val appendStream: DataStream[Message] = tEnv.toAppendStream[Message](messageTable)

    // 使用撤回模式转换(适用于有状态更新的场景)
    val retractStream: DataStream[(Boolean, Message)] = 
      tEnv.toRetractStream[Message](messageTable)

    retractStream.print()

    env.execute("Table to DataStream Conversion")
  }
}

将 Table 转换为 DataSet(批处理)

在批处理环境中,Table 可以直接转换为 DataSet。由于批处理不涉及实时更新,因此无需区分追加或撤回模式,所有结果都被视为最终且完整的。

语法结构:

val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(batchEnv)

val batchTable: Table = ...

// 转换为 Row 类型的 DataSet
val rowDataSet: DataSet[Row] = tableEnv.toDataSet[Row](batchTable)

// 转换为特定样例类或元组类型的 DataSet
val typedDataSet: DataSet[Message] = tableEnv.toDataSet[Message](batchTable)

实际应用示例:

object TableToDataSetExample {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val records = List(
      Message(1L, 1, "Hello"),
      Message(2L, 2, "Hello"),
      Message(8L, 8, "Hello World"),
      Message(20L, 20, "Hello World")
    )

    val inputDS = env.fromCollection(records)
    val table = tableEnv.fromDataSet(inputDS)

    // 将表还原为 DataSet
    val outputDS: DataSet[Message] = tableEnv.toDataSet[Message](table)

    outputDS.print()
  }
}

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。