Flink MySQL CDC 连接器基础配置与实现
Apache Flink MySQL CDC 连接器入门指南
环境准备
在开始使用 Flink MySQL CDC 连接器之前,需要确保以下环境已正确配置:
- Flink 版本:1.18.0
- Flink CDC 版本:3.2.0
- MySQL 版本:8.0.26
- Java 版本:1.8
- Maven 版本:3.8.4
MySQL 数据库配置
启用二进制日志
MySQL CDC 需要启用 MySQL 的二进制日志功能才能捕获数据变更。编辑 MySQL 配置文件(通常是 /etc/my.cnf 或 /etc/mysql/my.cnf):
[mysqld]
log-bin=mysql-bin
binlog-format=row
expire_logs_days=7
max-binlog-size=100M
修改配置后,需要重启 MySQL 服务使配置生效。
创建专用用户
为 CDC 操作创建一个专用 MySQL 用户并授予必要的权限:
CREATE USER 'cdc_user'@'localhost' IDENTIFIED BY 'secure_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'localhost';
FLUSH PRIVILEGES;
准备测试数据
创建测试数据库和表:
CREATE DATABASE test_cdc;
USE test_cdc;
CREATE TABLE user_profile (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL,
bio TEXT,
age INT,
account_balance DECIMAL(10, 2),
is_active BOOLEAN DEFAULT TRUE,
register_time DATETIME DEFAULT CURRENT_TIMESTAMP,
birth_date DATE,
identifier BIGINT,
last_visit TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
INSERT INTO user_profile (username, bio, age, account_balance, is_active, register_time, birth_date, identifier, last_visit) VALUES
('john_doe', 'Software developer with 8 years of experience.', 32, 5800.75, TRUE, '2022-03-15 09:30:00', '1990-07-22', 12345678901234, '2023-06-15 14:20:00'),
('jane_smith', 'Data scientist specializing in machine learning.', 28, 7200.50, TRUE, '2022-05-22 11:45:00', '1994-11-08', 987654321054321, '2023-06-14 16:30:00'),
('mike_johnson', 'Project manager with 10 years experience.', 45, 9500.00, TRUE, '2021-01-10 14:20:00', '1978-04-30', 135792468012345, '2023-06-13 10:15:00'),
('sarah_wilson', 'UX/UI designer and researcher.', 31, 4200.25, FALSE, '2022-07-18 16:40:00', '1992-09-12', 24681357901234, '2023-06-12 13:45:00'),
('david_brown', 'DevOps engineer and cloud specialist.', 38, 6800.80, TRUE, '2021-11-05 08:15:00', '1985-02-18', 98765432102468, '2023-06-11 17:30:00');
Maven 项目配置
依赖管理
在 pom.xml 文件中添加必要的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-mysql-cdc-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.18.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<flinkcdc.version>3.2.0</flinkcdc.version>
<mysql.version>8.0.26</mysql.version>
<log4j.version>2.17.1</log4j.version>
</properties>
<dependencies>
<!-- Flink 核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MySQL CDC 连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- 日志依赖 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
</project>
Flink CDC 实现方式
使用 DataStream API 实现
以下代码展示了如何使用 DataStream API 创建 MySQL CDC 数据源:
package com.example.cdc;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
public class MysqlCDataStreamJob {
public static void main(String[] args) throws Exception {
// 创建 MySQL CDC 数据源
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
.hostname("localhost")
.port(3306)
.username("cdc_user")
.password("secure_password")
.databaseList("test_cdc")
.tableList("test_cdc.user_profile")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai")
.serverId("flink-cdc-server-01")
.build();
// 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点机制
env.enableCheckpointing(5000);
// 添加数据源并设置并行度
env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
.setParallelism(3)
.print()
.setParallelism(1);
// 执行作业
env.execute("MySQL CDC DataStream Job");
}
}
使用 Table API 实现
以下代码展示了如何使用 Table API 创建 MySQL CDC 表:
package com.example.cdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class MysqlCdcTableJob {
public static void main(String[] args) throws Exception {
// 创建 Flink 执行环境和表环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 MySQL CDC 表
tableEnv.executeSql(
"CREATE TABLE user_profile_cdc (" +
" id INT," +
" username STRING," +
" bio STRING," +
" age INT," +
" account_balance DECIMAL(10, 2)," +
" is_active BOOLEAN," +
" register_time TIMESTAMP," +
" birth_date DATE," +
" identifier BIGINT," +
" last_visit TIMESTAMP(3)," +
" PRIMARY KEY (id) NOT ENFORCED " +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'localhost'," +
" 'port' = '3306'," +
" 'username' = 'cdc_user'," +
" 'password' = 'secure_password'," +
" 'database-name' = 'test_cdc'," +
" 'table-name' = 'user_profile'," +
" 'server-time-zone' = 'Asia/Shanghai'," +
" 'server-id' = 'flink-cdc-table-01'" +
")"
);
// 执行查询并打印结果
TableResult result = tableEnv.executeSql("SELECT id, username, age, account_balance FROM user_profile_cdc WHERE age > 30");
result.print();
// 执行作业
env.execute("MySQL CDC Table Job");
}
}
高级配置选项
时区配置
MySQL CDC 读取 TIMESTAMP 类型数据时可能会遇到时区问题。可以通过以下方式解决:
// 在构建数据源时设置服务器时区
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
// ... 其他配置
.serverTimeZone("Asia/Shanghai") // 设置时区为东八区
// ... 其他配置
.build();
服务器 ID 配置
每个读取 MySQL binlog 的客户端应该有唯一的服务器 ID:
// 为每个 CDC 作业设置不同的服务器 ID
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
// ... 其他配置
.serverId("unique-cdc-server-id") // 设置唯一的服务器 ID
// ... 其他配置
.build();
增量快照读取
对于大型表,可以启用增量快照读取功能以减少内存使用:
// 启用增量快照读取
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
// ... 其他配置
.scan.incrementalSnapshot.enabled(true) // 启用增量快照
.scan.incrementalSnapshot.chunk.size(8096) // 设置快照块大小
// ... 其他配置
.build();
常见问题与解决方案
权限不足问题
错误:Access denied for user 'cdc_user'@'host'
解决方案:确保为 CDC 用户授予了正确的权限:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'localhost';
FLUSH PRIVILEGES;
连接超时问题
错误:Connection timed out
解决方案:增加连接超时时间:
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
// ... 其他配置
.connectionPoolSize(20) // 增加连接池大小
.connectTimeout(30000) // 设置连接超时时间为30秒
.connectMaxRetries(3) // 设置最大重试次数
// ... 其他配置
.build();
数据类型映射问题
MySQL 和 Flink 之间的数据类型可能不完全匹配。可以通过自定义反序列化器来解决:
public class CustomDeserializationSchema implements DebeziumDeserializationSchema<RowData> {
// 实现自定义反序列化逻辑
}