当前位置:首页 > 技术 > 正文内容

Flink MySQL CDC 连接器基础配置与实现

访客 技术 2026年6月25日 1

Apache Flink MySQL CDC 连接器入门指南

环境准备

在开始使用 Flink MySQL CDC 连接器之前,需要确保以下环境已正确配置:

  • Flink 版本:1.18.0
  • Flink CDC 版本:3.2.0
  • MySQL 版本:8.0.26
  • Java 版本:1.8
  • Maven 版本:3.8.4

MySQL 数据库配置

启用二进制日志

MySQL CDC 需要启用 MySQL 的二进制日志功能才能捕获数据变更。编辑 MySQL 配置文件(通常是 /etc/my.cnf 或 /etc/mysql/my.cnf):

[mysqld]
log-bin=mysql-bin
binlog-format=row
expire_logs_days=7
max-binlog-size=100M

修改配置后,需要重启 MySQL 服务使配置生效。

创建专用用户

为 CDC 操作创建一个专用 MySQL 用户并授予必要的权限:

CREATE USER 'cdc_user'@'localhost' IDENTIFIED BY 'secure_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'localhost';
FLUSH PRIVILEGES;

准备测试数据

创建测试数据库和表:

CREATE DATABASE test_cdc;
USE test_cdc;

CREATE TABLE user_profile (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    bio TEXT,
    age INT,
    account_balance DECIMAL(10, 2),
    is_active BOOLEAN DEFAULT TRUE,
    register_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    birth_date DATE,
    identifier BIGINT,
    last_visit TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

INSERT INTO user_profile (username, bio, age, account_balance, is_active, register_time, birth_date, identifier, last_visit) VALUES
('john_doe', 'Software developer with 8 years of experience.', 32, 5800.75, TRUE, '2022-03-15 09:30:00', '1990-07-22', 12345678901234, '2023-06-15 14:20:00'),
('jane_smith', 'Data scientist specializing in machine learning.', 28, 7200.50, TRUE, '2022-05-22 11:45:00', '1994-11-08', 987654321054321, '2023-06-14 16:30:00'),
('mike_johnson', 'Project manager with 10 years experience.', 45, 9500.00, TRUE, '2021-01-10 14:20:00', '1978-04-30', 135792468012345, '2023-06-13 10:15:00'),
('sarah_wilson', 'UX/UI designer and researcher.', 31, 4200.25, FALSE, '2022-07-18 16:40:00', '1992-09-12', 24681357901234, '2023-06-12 13:45:00'),
('david_brown', 'DevOps engineer and cloud specialist.', 38, 6800.80, TRUE, '2021-11-05 08:15:00', '1985-02-18', 98765432102468, '2023-06-11 17:30:00');

Maven 项目配置

依赖管理

在 pom.xml 文件中添加必要的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-mysql-cdc-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.18.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <flinkcdc.version>3.2.0</flinkcdc.version>
        <mysql.version>8.0.26</mysql.version>
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <dependencies>
        <!-- Flink 核心依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <!-- Table API 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        
        <!-- MySQL CDC 连接器 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flinkcdc.version}</version>
        </dependency>
        
        <!-- MySQL 驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>
</project>

Flink CDC 实现方式

使用 DataStream API 实现

以下代码展示了如何使用 DataStream API 创建 MySQL CDC 数据源:

package com.example.cdc;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;

public class MysqlCDataStreamJob {
    public static void main(String[] args) throws Exception {
        // 创建 MySQL CDC 数据源
        MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
                .hostname("localhost")
                .port(3306)
                .username("cdc_user")
                .password("secure_password")
                .databaseList("test_cdc")
                .tableList("test_cdc.user_profile")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .serverTimeZone("Asia/Shanghai")
                .serverId("flink-cdc-server-01")
                .build();

        // 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // 启用检查点机制
        env.enableCheckpointing(5000);
        
        // 添加数据源并设置并行度
        env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source")
                .setParallelism(3)
                .print()
                .setParallelism(1);

        // 执行作业
        env.execute("MySQL CDC DataStream Job");
    }
}

使用 Table API 实现

以下代码展示了如何使用 Table API 创建 MySQL CDC 表:

package com.example.cdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class MysqlCdcTableJob {
    public static void main(String[] args) throws Exception {
        // 创建 Flink 执行环境和表环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建 MySQL CDC 表
        tableEnv.executeSql(
            "CREATE TABLE user_profile_cdc (" +
            " id INT," +
            " username STRING," +
            " bio STRING," +
            " age INT," +
            " account_balance DECIMAL(10, 2)," +
            " is_active BOOLEAN," +
            " register_time TIMESTAMP," +
            " birth_date DATE," +
            " identifier BIGINT," +
            " last_visit TIMESTAMP(3)," +
            " PRIMARY KEY (id) NOT ENFORCED " +
            ") WITH (" +
            " 'connector' = 'mysql-cdc'," +
            " 'hostname' = 'localhost'," +
            " 'port' = '3306'," +
            " 'username' = 'cdc_user'," +
            " 'password' = 'secure_password'," +
            " 'database-name' = 'test_cdc'," +
            " 'table-name' = 'user_profile'," +
            " 'server-time-zone' = 'Asia/Shanghai'," +
            " 'server-id' = 'flink-cdc-table-01'" +
            ")"
        );

        // 执行查询并打印结果
        TableResult result = tableEnv.executeSql("SELECT id, username, age, account_balance FROM user_profile_cdc WHERE age > 30");
        result.print();
        
        // 执行作业
        env.execute("MySQL CDC Table Job");
    }
}

高级配置选项

时区配置

MySQL CDC 读取 TIMESTAMP 类型数据时可能会遇到时区问题。可以通过以下方式解决:

// 在构建数据源时设置服务器时区
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
    // ... 其他配置
    .serverTimeZone("Asia/Shanghai")  // 设置时区为东八区
    // ... 其他配置
    .build();

服务器 ID 配置

每个读取 MySQL binlog 的客户端应该有唯一的服务器 ID:

// 为每个 CDC 作业设置不同的服务器 ID
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
    // ... 其他配置
    .serverId("unique-cdc-server-id")  // 设置唯一的服务器 ID
    // ... 其他配置
    .build();

增量快照读取

对于大型表,可以启用增量快照读取功能以减少内存使用:

// 启用增量快照读取
MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
    // ... 其他配置
    .scan.incrementalSnapshot.enabled(true)  // 启用增量快照
    .scan.incrementalSnapshot.chunk.size(8096)  // 设置快照块大小
    // ... 其他配置
    .build();

常见问题与解决方案

权限不足问题

错误:Access denied for user 'cdc_user'@'host'

解决方案:确保为 CDC 用户授予了正确的权限:

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'localhost';
FLUSH PRIVILEGES;

连接超时问题

错误:Connection timed out

解决方案:增加连接超时时间:

MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
    // ... 其他配置
    .connectionPoolSize(20)  // 增加连接池大小
    .connectTimeout(30000)  // 设置连接超时时间为30秒
    .connectMaxRetries(3)  // 设置最大重试次数
    // ... 其他配置
    .build();

数据类型映射问题

MySQL 和 Flink 之间的数据类型可能不完全匹配。可以通过自定义反序列化器来解决:

public class CustomDeserializationSchema implements DebeziumDeserializationSchema<RowData> {
    // 实现自定义反序列化逻辑
}
标签: Flink

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。