Apache Flink 中将 Table 转换为 DataStream 或 DataSet 的方法
在 Apache Flink 中,可以通过 Table API 将 Table 实例转换为底层的 DataStream(流处理)或 DataSet(批处理)对象。这种转换允许开发者在 Table 查询结果上执行更灵活的原生数据流或数据集操作。
将 Table 转换为 DataStream
对于流式查询,Table 可以通过两种不同的模式转换为 DataStream,具体取决于查询是否包含聚合等更新操作:
1. 追加模式(Append Mode)
当查询结果仅包含插入操作,不会修改已有记录时,使用 toAppendStream 方法将每条结果作为新记录追加到流中。
2. 撤回模式(Retract Mode)
如果查询涉及聚合或可能更新先前输出的结果(例如去重、统计等),则应使用 toRetractStream。该方法生成一个二元组流 DataStream[(Boolean, T)],其中布尔值表示变更类型:true 表示 INSERT(新增或更新),false 表示 DELETE(撤销旧记录)。
基本语法示例:
val tableEnv = TableEnvironment.getTableEnvironment(streamEnv)
// 假设有一个表:Table with schema (name: String, age: Int)
val resultTable: Table = ...
// 转换为追加流(仅插入)
val appendStream: DataStream[Row] = tableEnv.toAppendStream[Row](resultTable)
// 转换为指定元组类型的追加流
val tupleStream: DataStream[(String, Int)] =
tableEnv.toAppendStream[(String, Int)](resultTable)
// 转换为撤回流,用于支持更新和删除语义
val retractStream: DataStream[(Boolean, Row)] =
tableEnv.toRetractStream[Row](resultTable)
完整代码示例:
case class Message(user: Long, seq: Int, text: String)
object TableToDataStreamExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tEnv = TableEnvironment.getTableEnvironment(env)
// 创建输入数据并转为表
val inputList = List(
Message(1L, 1, "Hello"),
Message(2L, 2, "Hello"),
Message(7L, 7, "Hello World"),
Message(8L, 8, "Hello World"),
Message(20L, 20, "Hello World")
)
val dataStream = env.fromCollection(inputList)
val messageTable = tEnv.fromDataStream(dataStream)
// 使用追加模式转换为 DataStream
val appendStream: DataStream[Message] = tEnv.toAppendStream[Message](messageTable)
// 使用撤回模式转换(适用于有状态更新的场景)
val retractStream: DataStream[(Boolean, Message)] =
tEnv.toRetractStream[Message](messageTable)
retractStream.print()
env.execute("Table to DataStream Conversion")
}
}
将 Table 转换为 DataSet(批处理)
在批处理环境中,Table 可以直接转换为 DataSet。由于批处理不涉及实时更新,因此无需区分追加或撤回模式,所有结果都被视为最终且完整的。
语法结构:
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(batchEnv)
val batchTable: Table = ...
// 转换为 Row 类型的 DataSet
val rowDataSet: DataSet[Row] = tableEnv.toDataSet[Row](batchTable)
// 转换为特定样例类或元组类型的 DataSet
val typedDataSet: DataSet[Message] = tableEnv.toDataSet[Message](batchTable)
实际应用示例:
object TableToDataSetExample {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = TableEnvironment.getTableEnvironment(env)
val records = List(
Message(1L, 1, "Hello"),
Message(2L, 2, "Hello"),
Message(8L, 8, "Hello World"),
Message(20L, 20, "Hello World")
)
val inputDS = env.fromCollection(records)
val table = tableEnv.fromDataSet(inputDS)
// 将表还原为 DataSet
val outputDS: DataSet[Message] = tableEnv.toDataSet[Message](table)
outputDS.print()
}
}